mirror of https://github.com/status-im/consul.git
testing/deployer: support tproxy in v2 for dataplane (#19094)
This updates the testing/deployer (aka "topology test") framework to allow for a v2-oriented topology to opt services into enabling TransparentProxy. The restrictions are similar to that of #19046 The multiport Ports map that was added in #19046 was changed to allow for the protocol to be specified at this time, but for now the only supported protocol is TCP as only L4 functions currently on main. As part of making transparent proxy work, the DNS server needed a new zonefile for responding to virtual.consul requests, since there is no Kubernetes DNS and the Consul DNS work for v2 has not happened yet. Once Consul DNS supports v2 we should switch over. For now the format of queries is: <service>--<namespace>--<partition>.virtual.consul Additionally: - All transparent proxy enabled services are assigned a virtual ip in the 10.244.0/24 range. This is something Consul will do in v2 at a later date, likely during 1.18. - All services with exposed ports (non-mesh) are assigned a virtual port number for use with tproxy - The consul-dataplane image has been made un-distroless, and gotten the necessary tools to execute consul connect redirect-traffic before running dataplane, thus simulating a kubernetes init container in plain docker.
This commit is contained in:
parent
aaac20f4a8
commit
4b85aa5a97
|
@ -5,7 +5,6 @@ package catalogv2
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
pbauth "github.com/hashicorp/consul/proto-public/pbauth/v2beta1"
|
||||
|
@ -18,7 +17,7 @@ import (
|
|||
"github.com/hashicorp/consul/test-integ/topoutil"
|
||||
)
|
||||
|
||||
// TestBasicL4ExplicitDestination sets up the following:
|
||||
// TestBasicL4ExplicitDestinations sets up the following:
|
||||
//
|
||||
// - 1 cluster (no peering / no wanfed)
|
||||
// - 3 servers in that cluster
|
||||
|
@ -37,8 +36,8 @@ import (
|
|||
// - part1/default
|
||||
// - default/nsa
|
||||
// - part1/nsa
|
||||
func TestBasicL4ExplicitDestination(t *testing.T) {
|
||||
cfg := testBasicL4ExplicitDestinationCreator{}.NewConfig(t)
|
||||
func TestBasicL4ExplicitDestinations(t *testing.T) {
|
||||
cfg := testBasicL4ExplicitDestinationsCreator{}.NewConfig(t)
|
||||
|
||||
sp := sprawltest.Launch(t, cfg)
|
||||
|
||||
|
@ -69,20 +68,11 @@ func TestBasicL4ExplicitDestination(t *testing.T) {
|
|||
for _, ship := range ships {
|
||||
t.Run("relationship: "+ship.String(), func(t *testing.T) {
|
||||
var (
|
||||
svc = ship.Caller
|
||||
u = ship.Upstream
|
||||
clusterPrefix string
|
||||
svc = ship.Caller
|
||||
u = ship.Upstream
|
||||
)
|
||||
|
||||
if u.Peer == "" {
|
||||
if u.ID.PartitionOrDefault() == "default" {
|
||||
clusterPrefix = strings.Join([]string{u.PortName, u.ID.Name, u.ID.Namespace, u.Cluster, "internal"}, ".")
|
||||
} else {
|
||||
clusterPrefix = strings.Join([]string{u.PortName, u.ID.Name, u.ID.Namespace, u.ID.Partition, u.Cluster, "internal-v1"}, ".")
|
||||
}
|
||||
} else {
|
||||
clusterPrefix = strings.Join([]string{u.ID.Name, u.ID.Namespace, u.Peer, "external"}, ".")
|
||||
}
|
||||
clusterPrefix := clusterPrefixForUpstream(u)
|
||||
|
||||
asserter.UpstreamEndpointStatus(t, svc, clusterPrefix+".", "HEALTHY", 1)
|
||||
asserter.HTTPServiceEchoes(t, svc, u.LocalPort, "")
|
||||
|
@ -91,9 +81,9 @@ func TestBasicL4ExplicitDestination(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
type testBasicL4ExplicitDestinationCreator struct{}
|
||||
type testBasicL4ExplicitDestinationsCreator struct{}
|
||||
|
||||
func (c testBasicL4ExplicitDestinationCreator) NewConfig(t *testing.T) *topology.Config {
|
||||
func (c testBasicL4ExplicitDestinationsCreator) NewConfig(t *testing.T) *topology.Config {
|
||||
const clusterName = "dc1"
|
||||
|
||||
servers := topoutil.NewTopologyServerSet(clusterName+"-server", 3, []string{clusterName, "wan"}, nil)
|
||||
|
@ -129,7 +119,7 @@ func (c testBasicL4ExplicitDestinationCreator) NewConfig(t *testing.T) *topology
|
|||
}
|
||||
}
|
||||
|
||||
func (c testBasicL4ExplicitDestinationCreator) topologyConfigAddNodes(
|
||||
func (c testBasicL4ExplicitDestinationsCreator) topologyConfigAddNodes(
|
||||
t *testing.T,
|
||||
cluster *topology.Cluster,
|
||||
nodeName func() string,
|
||||
|
|
|
@ -0,0 +1,22 @@
|
|||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: BUSL-1.1
|
||||
|
||||
package catalogv2
|
||||
|
||||
import (
|
||||
"strings"
|
||||
|
||||
"github.com/hashicorp/consul/testing/deployer/topology"
|
||||
)
|
||||
|
||||
func clusterPrefixForUpstream(u *topology.Upstream) string {
|
||||
if u.Peer == "" {
|
||||
if u.ID.PartitionOrDefault() == "default" {
|
||||
return strings.Join([]string{u.PortName, u.ID.Name, u.ID.Namespace, u.Cluster, "internal"}, ".")
|
||||
} else {
|
||||
return strings.Join([]string{u.PortName, u.ID.Name, u.ID.Namespace, u.ID.Partition, u.Cluster, "internal-v1"}, ".")
|
||||
}
|
||||
} else {
|
||||
return strings.Join([]string{u.ID.Name, u.ID.Namespace, u.Peer, "external"}, ".")
|
||||
}
|
||||
}
|
|
@ -0,0 +1,214 @@
|
|||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: BUSL-1.1
|
||||
|
||||
package catalogv2
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
pbauth "github.com/hashicorp/consul/proto-public/pbauth/v2beta1"
|
||||
"github.com/hashicorp/consul/proto-public/pbresource"
|
||||
libassert "github.com/hashicorp/consul/test/integration/consul-container/libs/assert"
|
||||
"github.com/hashicorp/consul/test/integration/consul-container/libs/utils"
|
||||
"github.com/hashicorp/consul/testing/deployer/sprawl/sprawltest"
|
||||
"github.com/hashicorp/consul/testing/deployer/topology"
|
||||
|
||||
"github.com/hashicorp/consul/test-integ/topoutil"
|
||||
)
|
||||
|
||||
// TestBasicL4ImplicitDestinations sets up the following:
|
||||
//
|
||||
// - 1 cluster (no peering / no wanfed)
|
||||
// - 3 servers in that cluster
|
||||
// - v2 arch is activated
|
||||
// - for each tenancy, only using v2 constructs:
|
||||
// - a server exposing 2 tcp ports
|
||||
// - a client with transparent proxy enabled and no explicit upstreams
|
||||
// - a traffic permission granting the client access to the service on all ports
|
||||
//
|
||||
// When this test is executed in CE it will only use the default/default
|
||||
// tenancy.
|
||||
//
|
||||
// When this test is executed in Enterprise it will additionally test the same
|
||||
// things within these tenancies:
|
||||
//
|
||||
// - part1/default
|
||||
// - default/nsa
|
||||
// - part1/nsa
|
||||
func TestBasicL4ImplicitDestinations(t *testing.T) {
|
||||
cfg := testBasicL4ImplicitDestinationsCreator{}.NewConfig(t)
|
||||
|
||||
sp := sprawltest.Launch(t, cfg)
|
||||
|
||||
var (
|
||||
asserter = topoutil.NewAsserter(sp)
|
||||
|
||||
topo = sp.Topology()
|
||||
cluster = topo.Clusters["dc1"]
|
||||
|
||||
ships = topo.ComputeRelationships()
|
||||
)
|
||||
|
||||
clientV2 := sp.ResourceServiceClientForCluster(cluster.Name)
|
||||
|
||||
t.Log(topology.RenderRelationships(ships))
|
||||
|
||||
// Make sure things are truly in v2 not v1.
|
||||
for _, name := range []string{
|
||||
"static-server",
|
||||
"static-client",
|
||||
} {
|
||||
libassert.CatalogV2ServiceHasEndpointCount(t, clientV2, name, nil, 1)
|
||||
}
|
||||
|
||||
// Check relationships
|
||||
for _, ship := range ships {
|
||||
t.Run("relationship: "+ship.String(), func(t *testing.T) {
|
||||
var (
|
||||
svc = ship.Caller
|
||||
u = ship.Upstream
|
||||
)
|
||||
|
||||
clusterPrefix := clusterPrefixForUpstream(u)
|
||||
|
||||
asserter.UpstreamEndpointStatus(t, svc, clusterPrefix+".", "HEALTHY", 1)
|
||||
if u.LocalPort > 0 {
|
||||
asserter.HTTPServiceEchoes(t, svc, u.LocalPort, "")
|
||||
}
|
||||
asserter.FortioFetch2FortioName(t, svc, u, cluster.Name, u.ID)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type testBasicL4ImplicitDestinationsCreator struct{}
|
||||
|
||||
func (c testBasicL4ImplicitDestinationsCreator) NewConfig(t *testing.T) *topology.Config {
|
||||
const clusterName = "dc1"
|
||||
|
||||
servers := topoutil.NewTopologyServerSet(clusterName+"-server", 3, []string{clusterName, "wan"}, nil)
|
||||
|
||||
cluster := &topology.Cluster{
|
||||
Enterprise: utils.IsEnterprise(),
|
||||
Name: clusterName,
|
||||
Nodes: servers,
|
||||
}
|
||||
|
||||
lastNode := 0
|
||||
nodeName := func() string {
|
||||
lastNode++
|
||||
return fmt.Sprintf("%s-box%d", clusterName, lastNode)
|
||||
}
|
||||
|
||||
c.topologyConfigAddNodes(t, cluster, nodeName, "default", "default")
|
||||
if cluster.Enterprise {
|
||||
c.topologyConfigAddNodes(t, cluster, nodeName, "part1", "default")
|
||||
c.topologyConfigAddNodes(t, cluster, nodeName, "part1", "nsa")
|
||||
c.topologyConfigAddNodes(t, cluster, nodeName, "default", "nsa")
|
||||
}
|
||||
|
||||
return &topology.Config{
|
||||
Images: topoutil.TargetImages(),
|
||||
Networks: []*topology.Network{
|
||||
{Name: clusterName},
|
||||
{Name: "wan", Type: "wan"},
|
||||
},
|
||||
Clusters: []*topology.Cluster{
|
||||
cluster,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (c testBasicL4ImplicitDestinationsCreator) topologyConfigAddNodes(
|
||||
t *testing.T,
|
||||
cluster *topology.Cluster,
|
||||
nodeName func() string,
|
||||
partition,
|
||||
namespace string,
|
||||
) {
|
||||
clusterName := cluster.Name
|
||||
|
||||
newServiceID := func(name string) topology.ServiceID {
|
||||
return topology.ServiceID{
|
||||
Partition: partition,
|
||||
Namespace: namespace,
|
||||
Name: name,
|
||||
}
|
||||
}
|
||||
|
||||
tenancy := &pbresource.Tenancy{
|
||||
Partition: partition,
|
||||
Namespace: namespace,
|
||||
PeerName: "local",
|
||||
}
|
||||
|
||||
serverNode := &topology.Node{
|
||||
Kind: topology.NodeKindDataplane,
|
||||
Version: topology.NodeVersionV2,
|
||||
Partition: partition,
|
||||
Name: nodeName(),
|
||||
Services: []*topology.Service{
|
||||
topoutil.NewFortioServiceWithDefaults(
|
||||
clusterName,
|
||||
newServiceID("static-server"),
|
||||
topology.NodeVersionV2,
|
||||
func(svc *topology.Service) {
|
||||
svc.EnableTransparentProxy = true
|
||||
},
|
||||
),
|
||||
},
|
||||
}
|
||||
clientNode := &topology.Node{
|
||||
Kind: topology.NodeKindDataplane,
|
||||
Version: topology.NodeVersionV2,
|
||||
Partition: partition,
|
||||
Name: nodeName(),
|
||||
Services: []*topology.Service{
|
||||
topoutil.NewFortioServiceWithDefaults(
|
||||
clusterName,
|
||||
newServiceID("static-client"),
|
||||
topology.NodeVersionV2,
|
||||
func(svc *topology.Service) {
|
||||
svc.EnableTransparentProxy = true
|
||||
svc.ImpliedUpstreams = []*topology.Upstream{
|
||||
{
|
||||
ID: newServiceID("static-server"),
|
||||
PortName: "http",
|
||||
},
|
||||
{
|
||||
ID: newServiceID("static-server"),
|
||||
PortName: "http-alt",
|
||||
},
|
||||
}
|
||||
},
|
||||
),
|
||||
},
|
||||
}
|
||||
trafficPerms := sprawltest.MustSetResourceData(t, &pbresource.Resource{
|
||||
Id: &pbresource.ID{
|
||||
Type: pbauth.TrafficPermissionsType,
|
||||
Name: "static-server-perms",
|
||||
Tenancy: tenancy,
|
||||
},
|
||||
}, &pbauth.TrafficPermissions{
|
||||
Destination: &pbauth.Destination{
|
||||
IdentityName: "static-server",
|
||||
},
|
||||
Action: pbauth.Action_ACTION_ALLOW,
|
||||
Permissions: []*pbauth.Permission{{
|
||||
Sources: []*pbauth.Source{{
|
||||
IdentityName: "static-client",
|
||||
Namespace: namespace,
|
||||
}},
|
||||
}},
|
||||
})
|
||||
|
||||
cluster.Nodes = append(cluster.Nodes,
|
||||
clientNode,
|
||||
serverNode,
|
||||
)
|
||||
|
||||
cluster.InitialResources = append(cluster.InitialResources,
|
||||
trafficPerms,
|
||||
)
|
||||
}
|
|
@ -11,14 +11,13 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
libassert "github.com/hashicorp/consul/test/integration/consul-container/libs/assert"
|
||||
"github.com/hashicorp/consul/testing/deployer/topology"
|
||||
"github.com/hashicorp/go-cleanhttp"
|
||||
"github.com/itchyny/gojq"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
libassert "github.com/hashicorp/consul/test/integration/consul-container/libs/assert"
|
||||
)
|
||||
|
||||
var ac3SvcDefaultsSuites []sharedTopoSuite = []sharedTopoSuite{
|
||||
|
@ -185,7 +184,7 @@ func (s *ac3SvcDefaultsSuite) test(t *testing.T, ct *commonTopo) {
|
|||
// TODO: what is default? namespace? partition?
|
||||
clusterName := fmt.Sprintf("%s.default.%s.external", s.upstream.ID.Name, s.upstream.Peer)
|
||||
nonceStatus := http.StatusInsufficientStorage
|
||||
url507 := fmt.Sprintf("http://localhost:%d/fortio/fetch2?url=%s", svcClient.ExposedPort,
|
||||
url507 := fmt.Sprintf("http://localhost:%d/fortio/fetch2?url=%s", svcClient.ExposedPort(""),
|
||||
url.QueryEscape(fmt.Sprintf("http://localhost:%d/?status=%d", s.upstream.LocalPort, nonceStatus)),
|
||||
)
|
||||
|
||||
|
@ -221,7 +220,7 @@ func (s *ac3SvcDefaultsSuite) test(t *testing.T, ct *commonTopo) {
|
|||
require.True(r, resultAsBool)
|
||||
})
|
||||
|
||||
url200 := fmt.Sprintf("http://localhost:%d/fortio/fetch2?url=%s", svcClient.ExposedPort,
|
||||
url200 := fmt.Sprintf("http://localhost:%d/fortio/fetch2?url=%s", svcClient.ExposedPort(""),
|
||||
url.QueryEscape(fmt.Sprintf("http://localhost:%d/", s.upstream.LocalPort)),
|
||||
)
|
||||
retry.RunWith(&retry.Timer{Timeout: time.Minute * 1, Wait: time.Millisecond * 500}, t, func(r *retry.R) {
|
||||
|
|
|
@ -180,11 +180,11 @@ func (s *ac4ProxyDefaultsSuite) test(t *testing.T, ct *commonTopo) {
|
|||
})
|
||||
|
||||
t.Run("HTTP service fails due to connection timeout", func(t *testing.T) {
|
||||
url504 := fmt.Sprintf("http://localhost:%d/fortio/fetch2?url=%s", client.ExposedPort,
|
||||
url504 := fmt.Sprintf("http://localhost:%d/fortio/fetch2?url=%s", client.ExposedPort(""),
|
||||
url.QueryEscape(fmt.Sprintf("http://localhost:%d/?delay=1000ms", s.upstream.LocalPort)),
|
||||
)
|
||||
|
||||
url200 := fmt.Sprintf("http://localhost:%d/fortio/fetch2?url=%s", client.ExposedPort,
|
||||
url200 := fmt.Sprintf("http://localhost:%d/fortio/fetch2?url=%s", client.ExposedPort(""),
|
||||
url.QueryEscape(fmt.Sprintf("http://localhost:%d/", s.upstream.LocalPort)),
|
||||
)
|
||||
|
||||
|
|
|
@ -234,10 +234,21 @@ func (a *Asserter) fortioFetch2Upstream(
|
|||
) (body []byte, res *http.Response) {
|
||||
t.Helper()
|
||||
|
||||
// TODO: fortioSvc.ID.Normalize()? or should that be up to the caller?
|
||||
var actualURL string
|
||||
if upstream.Implied {
|
||||
actualURL = fmt.Sprintf("http://%s--%s--%s.virtual.consul:%d/%s",
|
||||
upstream.ID.Name,
|
||||
upstream.ID.Namespace,
|
||||
upstream.ID.Partition,
|
||||
upstream.VirtualPort,
|
||||
path,
|
||||
)
|
||||
} else {
|
||||
actualURL = fmt.Sprintf("http://localhost:%d/%s", upstream.LocalPort, path)
|
||||
}
|
||||
|
||||
url := fmt.Sprintf("http://%s/fortio/fetch2?url=%s", addr,
|
||||
url.QueryEscape(fmt.Sprintf("http://localhost:%d/%s", upstream.LocalPort, path)),
|
||||
url.QueryEscape(actualURL),
|
||||
)
|
||||
|
||||
req, err := http.NewRequest(http.MethodPost, url, nil)
|
||||
|
@ -246,6 +257,7 @@ func (a *Asserter) fortioFetch2Upstream(
|
|||
res, err = client.Do(req)
|
||||
require.NoError(t, err)
|
||||
defer res.Body.Close()
|
||||
|
||||
// not sure when these happen, suspect it's when the mesh gateway in the peer is not yet ready
|
||||
require.NotEqual(t, http.StatusServiceUnavailable, res.StatusCode)
|
||||
require.NotEqual(t, http.StatusGatewayTimeout, res.StatusCode)
|
||||
|
@ -281,7 +293,13 @@ func (a *Asserter) FortioFetch2HeaderEcho(t *testing.T, fortioSvc *topology.Serv
|
|||
// similar to libassert.AssertFortioName,
|
||||
// uses the /fortio/fetch2 endpoint to hit the debug endpoint on the upstream,
|
||||
// and assert that the FORTIO_NAME == name
|
||||
func (a *Asserter) FortioFetch2FortioName(t *testing.T, fortioSvc *topology.Service, upstream *topology.Upstream, clusterName string, sid topology.ServiceID) {
|
||||
func (a *Asserter) FortioFetch2FortioName(
|
||||
t *testing.T,
|
||||
fortioSvc *topology.Service,
|
||||
upstream *topology.Upstream,
|
||||
clusterName string,
|
||||
sid topology.ServiceID,
|
||||
) {
|
||||
t.Helper()
|
||||
|
||||
var (
|
||||
|
@ -295,6 +313,7 @@ func (a *Asserter) FortioFetch2FortioName(t *testing.T, fortioSvc *topology.Serv
|
|||
|
||||
retry.RunWith(&retry.Timer{Timeout: 60 * time.Second, Wait: time.Millisecond * 500}, t, func(r *retry.R) {
|
||||
body, res := a.fortioFetch2Upstream(r, client, addr, upstream, path)
|
||||
|
||||
require.Equal(r, http.StatusOK, res.StatusCode)
|
||||
|
||||
// TODO: not sure we should retry these?
|
||||
|
|
|
@ -41,10 +41,14 @@ func NewFortioServiceWithDefaults(
|
|||
}
|
||||
|
||||
if nodeVersion == topology.NodeVersionV2 {
|
||||
svc.Ports = map[string]int{
|
||||
"http": httpPort,
|
||||
"http-alt": httpPort,
|
||||
"grpc": grpcPort,
|
||||
svc.Ports = map[string]*topology.Port{
|
||||
// TODO(rb/v2): once L7 works in v2 switch these back
|
||||
"http": {Number: httpPort, Protocol: "tcp"},
|
||||
"http-alt": {Number: httpPort, Protocol: "tcp"},
|
||||
"grpc": {Number: grpcPort, Protocol: "tcp"},
|
||||
// "http": {Number: httpPort, Protocol: "http"},
|
||||
// "http-alt": {Number: httpPort, Protocol: "http"},
|
||||
// "grpc": {Number: grpcPort, Protocol: "grpc"},
|
||||
}
|
||||
} else {
|
||||
svc.Port = httpPort
|
||||
|
|
|
@ -118,7 +118,7 @@ func AssertUpstreamEndpointStatusWithClient(
|
|||
clusterName, healthStatus)
|
||||
results, err := utils.JQFilter(clusters, filter)
|
||||
require.NoErrorf(r, err, "could not find cluster name %q: %v \n%s", clusterName, err, clusters)
|
||||
require.Len(r, results, 1) // the final part of the pipeline is "length" which only ever returns 1 result
|
||||
require.Len(r, results, 1, "clusters: "+clusters) // the final part of the pipeline is "length" which only ever returns 1 result
|
||||
|
||||
result, err := strconv.Atoi(results[0])
|
||||
assert.NoError(r, err)
|
||||
|
|
|
@ -63,7 +63,6 @@ func CatalogV2ServiceDoesNotExist(t *testing.T, client pbresource.ResourceServic
|
|||
// number of workload endpoints.
|
||||
func CatalogV2ServiceHasEndpointCount(t *testing.T, client pbresource.ResourceServiceClient, svc string, tenancy *pbresource.Tenancy, count int) {
|
||||
t.Helper()
|
||||
require.False(t, count == 0)
|
||||
|
||||
ctx := testutil.TestContext(t)
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
|
|
|
@ -196,8 +196,9 @@ func (s *Sprawl) registerServicesForDataplaneInstances(cluster *topology.Cluster
|
|||
if node.IsV2() {
|
||||
pending := serviceInstanceToResources(node, svc)
|
||||
|
||||
if _, ok := identityInfo[svc.ID]; !ok {
|
||||
identityInfo[svc.ID] = pending.WorkloadIdentity
|
||||
workloadID := topology.NewServiceID(svc.WorkloadIdentity, svc.ID.Namespace, svc.ID.Partition)
|
||||
if _, ok := identityInfo[workloadID]; !ok {
|
||||
identityInfo[workloadID] = pending.WorkloadIdentity
|
||||
}
|
||||
|
||||
// Write workload
|
||||
|
@ -230,6 +231,15 @@ func (s *Sprawl) registerServicesForDataplaneInstances(cluster *topology.Cluster
|
|||
return err
|
||||
}
|
||||
}
|
||||
if pending.ProxyConfiguration != nil {
|
||||
res, err := pending.ProxyConfiguration.Build()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error serializing resource %s: %w", util.IDToString(pending.ProxyConfiguration.Resource.Id), err)
|
||||
}
|
||||
if _, err := s.writeResource(cluster, res); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if err := s.registerCatalogServiceV1(cluster, node, svc); err != nil {
|
||||
return fmt.Errorf("error registering service: %w", err)
|
||||
|
@ -268,6 +278,7 @@ func (s *Sprawl) registerServicesForDataplaneInstances(cluster *topology.Cluster
|
|||
},
|
||||
Data: svcData,
|
||||
}
|
||||
|
||||
res, err := svcInfo.Build()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error serializing resource %s: %w", util.IDToString(svcInfo.Resource.Id), err)
|
||||
|
@ -482,10 +493,11 @@ func (r *Resource[V]) Build() (*pbresource.Resource, error) {
|
|||
}
|
||||
|
||||
type ServiceResources struct {
|
||||
Workload *Resource[*pbcatalog.Workload]
|
||||
HealthStatuses []*Resource[*pbcatalog.HealthStatus]
|
||||
Destinations *Resource[*pbmesh.Destinations]
|
||||
WorkloadIdentity *Resource[*pbauth.WorkloadIdentity]
|
||||
Workload *Resource[*pbcatalog.Workload]
|
||||
HealthStatuses []*Resource[*pbcatalog.HealthStatus]
|
||||
Destinations *Resource[*pbmesh.Destinations]
|
||||
WorkloadIdentity *Resource[*pbauth.WorkloadIdentity]
|
||||
ProxyConfiguration *Resource[*pbmesh.ProxyConfiguration]
|
||||
}
|
||||
|
||||
func serviceInstanceToResources(
|
||||
|
@ -506,8 +518,8 @@ func serviceInstanceToResources(
|
|||
)
|
||||
for name, port := range svc.Ports {
|
||||
wlPorts[name] = &pbcatalog.WorkloadPort{
|
||||
Port: uint32(port),
|
||||
Protocol: pbcatalog.Protocol_PROTOCOL_TCP,
|
||||
Port: uint32(port.Number),
|
||||
Protocol: port.ActualProtocol,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -534,21 +546,20 @@ func serviceInstanceToResources(
|
|||
},
|
||||
},
|
||||
}
|
||||
|
||||
worloadIdentityRes = &Resource[*pbauth.WorkloadIdentity]{
|
||||
workloadIdentityRes = &Resource[*pbauth.WorkloadIdentity]{
|
||||
Resource: &pbresource.Resource{
|
||||
Id: &pbresource.ID{
|
||||
Type: pbauth.WorkloadIdentityType,
|
||||
Name: svc.ID.Name,
|
||||
Name: svc.WorkloadIdentity,
|
||||
Tenancy: tenancy,
|
||||
},
|
||||
Metadata: svc.Meta,
|
||||
},
|
||||
Data: &pbauth.WorkloadIdentity{},
|
||||
}
|
||||
|
||||
healthResList []*Resource[*pbcatalog.HealthStatus]
|
||||
destinationsRes *Resource[*pbmesh.Destinations]
|
||||
proxyConfigRes *Resource[*pbmesh.ProxyConfiguration]
|
||||
)
|
||||
|
||||
if svc.HasCheck() {
|
||||
|
@ -577,11 +588,6 @@ func serviceInstanceToResources(
|
|||
}
|
||||
|
||||
if !svc.DisableServiceMesh {
|
||||
workloadRes.Data.Ports["mesh"] = &pbcatalog.WorkloadPort{
|
||||
Port: uint32(svc.EnvoyPublicListenerPort),
|
||||
Protocol: pbcatalog.Protocol_PROTOCOL_MESH,
|
||||
}
|
||||
|
||||
destinationsRes = &Resource[*pbmesh.Destinations]{
|
||||
Resource: &pbresource.Resource{
|
||||
Id: &pbresource.ID{
|
||||
|
@ -615,13 +621,32 @@ func serviceInstanceToResources(
|
|||
}
|
||||
destinationsRes.Data.Destinations = append(destinationsRes.Data.Destinations, dest)
|
||||
}
|
||||
|
||||
if svc.EnableTransparentProxy {
|
||||
proxyConfigRes = &Resource[*pbmesh.ProxyConfiguration]{
|
||||
Resource: &pbresource.Resource{
|
||||
Id: &pbresource.ID{
|
||||
Type: pbmesh.ProxyConfigurationType,
|
||||
Name: svc.Workload,
|
||||
Tenancy: tenancy,
|
||||
},
|
||||
},
|
||||
Data: &pbmesh.ProxyConfiguration{
|
||||
Workloads: selector,
|
||||
DynamicConfig: &pbmesh.DynamicConfig{
|
||||
Mode: pbmesh.ProxyMode_PROXY_MODE_TRANSPARENT,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return &ServiceResources{
|
||||
Workload: workloadRes,
|
||||
HealthStatuses: healthResList,
|
||||
Destinations: destinationsRes,
|
||||
WorkloadIdentity: worloadIdentityRes,
|
||||
Workload: workloadRes,
|
||||
HealthStatuses: healthResList,
|
||||
Destinations: destinationsRes,
|
||||
WorkloadIdentity: workloadIdentityRes,
|
||||
ProxyConfiguration: proxyConfigRes,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -72,7 +72,7 @@ func (s *Sprawl) PrintDetails() error {
|
|||
} else {
|
||||
ports := make(map[string]int)
|
||||
for name, port := range svc.Ports {
|
||||
ports[name] = node.ExposedPort(port)
|
||||
ports[name] = node.ExposedPort(port.Number)
|
||||
}
|
||||
cd.Apps = append(cd.Apps, appDetail{
|
||||
Type: "app",
|
||||
|
|
|
@ -35,6 +35,64 @@ USER 100:0
|
|||
ENTRYPOINT []
|
||||
`
|
||||
|
||||
const dockerfileDataplaneForTProxy = `
|
||||
ARG DATAPLANE_IMAGE
|
||||
ARG CONSUL_IMAGE
|
||||
FROM ${CONSUL_IMAGE} AS consul
|
||||
FROM ${DATAPLANE_IMAGE} AS distroless
|
||||
FROM debian:bullseye-slim
|
||||
|
||||
# undo the distroless aspect
|
||||
COPY --from=distroless /usr/local/bin/discover /usr/local/bin/
|
||||
COPY --from=distroless /usr/local/bin/envoy /usr/local/bin/
|
||||
COPY --from=distroless /usr/local/bin/consul-dataplane /usr/local/bin/
|
||||
COPY --from=distroless /licenses/copyright.txt /licenses/
|
||||
|
||||
COPY --from=consul /bin/consul /bin/
|
||||
|
||||
# Install iptables and sudo, needed for tproxy.
|
||||
RUN apt update -y \
|
||||
&& apt install -y iptables sudo curl dnsutils
|
||||
|
||||
RUN sed '/_apt/d' /etc/passwd > /etc/passwd.new \
|
||||
&& mv -f /etc/passwd.new /etc/passwd \
|
||||
&& adduser --uid=100 consul --no-create-home --disabled-password --system \
|
||||
&& adduser consul sudo \
|
||||
&& echo 'consul ALL=(ALL) NOPASSWD:ALL' >> /etc/sudoers
|
||||
|
||||
COPY <<'EOF' /bin/tproxy-startup.sh
|
||||
#!/bin/sh
|
||||
|
||||
set -ex
|
||||
|
||||
# HACK: UID of consul in the consul-client container
|
||||
# This is conveniently also the UID of apt in the envoy container
|
||||
CONSUL_UID=100
|
||||
ENVOY_UID=$(id -u)
|
||||
|
||||
# - We allow 19000 so that the test can directly visit the envoy admin page.
|
||||
# - We allow 20000 so that envoy can receive mTLS traffic from other nodes.
|
||||
# - We (reluctantly) allow 8080 so that we can bypass envoy and talk to fortio
|
||||
# to do test assertions.
|
||||
sudo consul connect redirect-traffic \
|
||||
-proxy-uid $ENVOY_UID \
|
||||
-exclude-uid $CONSUL_UID \
|
||||
-proxy-inbound-port=15001 \
|
||||
-exclude-inbound-port=19000 \
|
||||
-exclude-inbound-port=20000 \
|
||||
-exclude-inbound-port=8080
|
||||
exec "$@"
|
||||
EOF
|
||||
|
||||
RUN chmod +x /bin/tproxy-startup.sh \
|
||||
&& chown 100:0 /bin/tproxy-startup.sh
|
||||
|
||||
RUN echo 'consul ALL=(ALL) NOPASSWD:ALL' >> /etc/sudoers
|
||||
|
||||
USER 100:0
|
||||
ENTRYPOINT []
|
||||
`
|
||||
|
||||
func DockerImages(
|
||||
logger hclog.Logger,
|
||||
run *runner.Runner,
|
||||
|
@ -80,6 +138,25 @@ func DockerImages(
|
|||
|
||||
built[cdp] = struct{}{}
|
||||
}
|
||||
|
||||
cdpTproxy := n.Images.LocalDataplaneTProxyImage()
|
||||
if _, ok := built[cdpTproxy]; cdpTproxy != "" && !ok {
|
||||
logger.Info("building image", "image", cdpTproxy)
|
||||
err := run.DockerExec(context.TODO(), []string{
|
||||
"build",
|
||||
"--build-arg",
|
||||
"DATAPLANE_IMAGE=" + n.Images.Dataplane,
|
||||
"--build-arg",
|
||||
"CONSUL_IMAGE=" + n.Images.Consul,
|
||||
"-t", cdpTproxy,
|
||||
"-",
|
||||
}, logw, strings.NewReader(dockerfileDataplaneForTProxy))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
built[cdpTproxy] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -8,8 +8,11 @@ import (
|
|||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
"golang.org/x/exp/maps"
|
||||
|
||||
"github.com/hashicorp/consul/testing/deployer/topology"
|
||||
"github.com/hashicorp/consul/testing/deployer/util"
|
||||
)
|
||||
|
@ -63,17 +66,36 @@ func (g *Generator) writeCoreDNSFiles(net *topology.Network, dnsIPAddress string
|
|||
}
|
||||
}
|
||||
|
||||
// Until Consul DNS understands v2, simulate it.
|
||||
//
|
||||
// NOTE: this DNS is not quite what consul normally does. It's simpler
|
||||
// to simulate this format here.
|
||||
virtualNames := make(map[string][]string)
|
||||
for id, svcData := range cluster.Services {
|
||||
if len(svcData.VirtualIps) == 0 {
|
||||
continue
|
||||
}
|
||||
vips := svcData.VirtualIps
|
||||
|
||||
// <service>--<namespace>--<partition>.virtual.<domain>
|
||||
name := fmt.Sprintf("%s--%s--%s", id.Name, id.Namespace, id.Partition)
|
||||
virtualNames[name] = vips
|
||||
}
|
||||
|
||||
var (
|
||||
clusterDNSName = cluster.Name + "-consulcluster.lan"
|
||||
)
|
||||
virtualDNSName = "virtual.consul"
|
||||
|
||||
corefilePath := filepath.Join(rootdir, "Corefile")
|
||||
zonefilePath := filepath.Join(rootdir, "servers")
|
||||
corefilePath = filepath.Join(rootdir, "Corefile")
|
||||
zonefilePath = filepath.Join(rootdir, "servers")
|
||||
virtualZonefilePath = filepath.Join(rootdir, "virtual")
|
||||
)
|
||||
|
||||
_, err := UpdateFileIfDifferent(
|
||||
g.logger,
|
||||
generateCoreDNSConfigFile(
|
||||
clusterDNSName,
|
||||
virtualDNSName,
|
||||
addrs,
|
||||
),
|
||||
corefilePath,
|
||||
|
@ -105,7 +127,25 @@ func (g *Generator) writeCoreDNSFiles(net *topology.Network, dnsIPAddress string
|
|||
return false, nil, fmt.Errorf("error hashing %q: %w", zonefilePath, err)
|
||||
}
|
||||
|
||||
return true, []string{corefileHash, zonefileHash}, nil
|
||||
_, err = UpdateFileIfDifferent(
|
||||
g.logger,
|
||||
generateCoreDNSVirtualZoneFile(
|
||||
dnsIPAddress,
|
||||
virtualDNSName,
|
||||
virtualNames,
|
||||
),
|
||||
virtualZonefilePath,
|
||||
0644,
|
||||
)
|
||||
if err != nil {
|
||||
return false, nil, fmt.Errorf("error writing %q: %w", virtualZonefilePath, err)
|
||||
}
|
||||
virtualZonefileHash, err := util.HashFile(virtualZonefilePath)
|
||||
if err != nil {
|
||||
return false, nil, fmt.Errorf("error hashing %q: %w", virtualZonefilePath, err)
|
||||
}
|
||||
|
||||
return true, []string{corefileHash, zonefileHash, virtualZonefileHash}, nil
|
||||
}
|
||||
|
||||
return false, nil, nil
|
||||
|
@ -113,6 +153,7 @@ func (g *Generator) writeCoreDNSFiles(net *topology.Network, dnsIPAddress string
|
|||
|
||||
func generateCoreDNSConfigFile(
|
||||
clusterDNSName string,
|
||||
virtualDNSName string,
|
||||
addrs []string,
|
||||
) []byte {
|
||||
serverPart := ""
|
||||
|
@ -139,7 +180,14 @@ consul:53 {
|
|||
whoami
|
||||
}
|
||||
|
||||
%[2]s
|
||||
%[2]s:53 {
|
||||
file /config/virtual %[2]s
|
||||
log
|
||||
errors
|
||||
whoami
|
||||
}
|
||||
|
||||
%[3]s
|
||||
|
||||
.:53 {
|
||||
forward . 8.8.8.8:53
|
||||
|
@ -147,7 +195,7 @@ consul:53 {
|
|||
errors
|
||||
whoami
|
||||
}
|
||||
`, clusterDNSName, serverPart))
|
||||
`, clusterDNSName, virtualDNSName, serverPart))
|
||||
}
|
||||
|
||||
func generateCoreDNSZoneFile(
|
||||
|
@ -178,3 +226,38 @@ server IN A %s ; Consul server
|
|||
|
||||
return buf.Bytes()
|
||||
}
|
||||
|
||||
func generateCoreDNSVirtualZoneFile(
|
||||
dnsIPAddress string,
|
||||
virtualDNSName string,
|
||||
nameToAddr map[string][]string,
|
||||
) []byte {
|
||||
var buf bytes.Buffer
|
||||
buf.WriteString(fmt.Sprintf(`
|
||||
$TTL 60
|
||||
$ORIGIN %[1]s.
|
||||
@ IN SOA ns.%[1]s. webmaster.%[1]s. (
|
||||
2017042745 ; serial
|
||||
7200 ; refresh (2 hours)
|
||||
3600 ; retry (1 hour)
|
||||
1209600 ; expire (2 weeks)
|
||||
3600 ; minimum (1 hour)
|
||||
)
|
||||
@ IN NS ns.%[1]s. ; Name server
|
||||
ns IN A %[2]s ; self
|
||||
`, virtualDNSName, dnsIPAddress))
|
||||
|
||||
names := maps.Keys(nameToAddr)
|
||||
sort.Strings(names)
|
||||
|
||||
for _, name := range names {
|
||||
vips := nameToAddr[name]
|
||||
for _, vip := range vips {
|
||||
buf.WriteString(fmt.Sprintf(`
|
||||
%s IN A %s ; Consul server
|
||||
`, name, vip))
|
||||
}
|
||||
}
|
||||
|
||||
return buf.Bytes()
|
||||
}
|
||||
|
|
|
@ -122,8 +122,10 @@ func (s Step) String() string {
|
|||
}
|
||||
}
|
||||
|
||||
func (s Step) StartServers() bool { return s >= StepServers }
|
||||
func (s Step) StartAgents() bool { return s >= StepAgents }
|
||||
func (s Step) StartServers() bool { return s >= StepServers }
|
||||
|
||||
func (s Step) StartAgents() bool { return s >= StepAgents }
|
||||
|
||||
func (s Step) StartServices() bool { return s >= StepServices }
|
||||
|
||||
// func (s Step) InitiatePeering() bool { return s >= StepPeering }
|
||||
|
@ -260,6 +262,7 @@ func (g *Generator) Generate(step Step) error {
|
|||
addImage("", node.Images.Consul)
|
||||
addImage("", node.Images.EnvoyConsulImage())
|
||||
addImage("", node.Images.LocalDataplaneImage())
|
||||
addImage("", node.Images.LocalDataplaneTProxyImage())
|
||||
|
||||
if node.IsAgent() {
|
||||
addVolume(node.DockerName())
|
||||
|
|
|
@ -125,7 +125,11 @@ func (g *Generator) generateNodeContainers(
|
|||
var img string
|
||||
if node.IsDataplane() {
|
||||
tmpl = tfAppDataplaneT
|
||||
img = DockerImageResourceName(node.Images.LocalDataplaneImage())
|
||||
if svc.EnableTransparentProxy {
|
||||
img = DockerImageResourceName(node.Images.LocalDataplaneTProxyImage())
|
||||
} else {
|
||||
img = DockerImageResourceName(node.Images.LocalDataplaneImage())
|
||||
}
|
||||
} else {
|
||||
img = DockerImageResourceName(node.Images.EnvoyConsulImage())
|
||||
}
|
||||
|
|
|
@ -17,6 +17,13 @@ resource "docker_container" "{{.Node.DockerName}}-{{.Service.ID.TFString}}-sidec
|
|||
read_only = true
|
||||
}
|
||||
|
||||
{{ if .Service.EnableTransparentProxy }}
|
||||
capabilities {
|
||||
add = ["NET_ADMIN"]
|
||||
}
|
||||
entrypoint = [ "/bin/tproxy-startup.sh" ]
|
||||
{{ end }}
|
||||
|
||||
env = [
|
||||
"DP_CONSUL_ADDRESSES=server.{{.Node.Cluster}}-consulcluster.lan",
|
||||
{{ if .Node.IsV2 }}
|
||||
|
@ -39,6 +46,10 @@ resource "docker_container" "{{.Node.DockerName}}-{{.Service.ID.TFString}}-sidec
|
|||
"DP_CREDENTIAL_STATIC_TOKEN={{.Token}}",
|
||||
{{ end }}
|
||||
|
||||
{{ if .Service.EnableTransparentProxy }}
|
||||
"REDIRECT_TRAFFIC_ARGS=-exclude-inbound-port=19000",
|
||||
{{ end }}
|
||||
|
||||
// for demo purposes
|
||||
"DP_ENVOY_ADMIN_BIND_ADDRESS=0.0.0.0",
|
||||
"DP_ENVOY_ADMIN_BIND_PORT=19000",
|
||||
|
|
|
@ -317,6 +317,13 @@ func compile(logger hclog.Logger, raw *Config, prev *Topology) (*Topology, error
|
|||
return nil, fmt.Errorf("cluster %q node %q has more than one public address", c.Name, n.Name)
|
||||
}
|
||||
|
||||
if n.IsDataplane() && len(n.Services) > 1 {
|
||||
// Our use of consul-dataplane here is supposed to mimic that
|
||||
// of consul-k8s, which ultimately has one IP per Service, so
|
||||
// we introduce the same limitation here.
|
||||
return nil, fmt.Errorf("cluster %q node %q uses dataplane, but has more than one service", c.Name, n.Name)
|
||||
}
|
||||
|
||||
seenServices := make(map[ServiceID]struct{})
|
||||
for _, svc := range n.Services {
|
||||
if n.IsAgent() {
|
||||
|
@ -387,7 +394,7 @@ func compile(logger hclog.Logger, raw *Config, prev *Topology) (*Topology, error
|
|||
// return nil, fmt.Errorf("service has invalid protocol: %s", svc.Protocol)
|
||||
// }
|
||||
|
||||
for _, u := range svc.Upstreams {
|
||||
defaultUpstream := func(u *Upstream) error {
|
||||
// Default to that of the enclosing service.
|
||||
if u.Peer == "" {
|
||||
if u.ID.Partition == "" {
|
||||
|
@ -406,17 +413,43 @@ func compile(logger hclog.Logger, raw *Config, prev *Topology) (*Topology, error
|
|||
|
||||
addTenancy(u.ID.Partition, u.ID.Namespace)
|
||||
|
||||
if u.LocalAddress == "" {
|
||||
// v1 defaults to 127.0.0.1 but v2 does not. Safe to do this generally though.
|
||||
u.LocalAddress = "127.0.0.1"
|
||||
if u.Implied {
|
||||
if u.PortName == "" {
|
||||
return fmt.Errorf("implicit upstreams must use port names in v2")
|
||||
}
|
||||
} else {
|
||||
if u.LocalAddress == "" {
|
||||
// v1 defaults to 127.0.0.1 but v2 does not. Safe to do this generally though.
|
||||
u.LocalAddress = "127.0.0.1"
|
||||
}
|
||||
if u.PortName != "" && n.IsV1() {
|
||||
return fmt.Errorf("explicit upstreams cannot use port names in v1")
|
||||
}
|
||||
if u.PortName == "" && n.IsV2() {
|
||||
// Assume this is a v1->v2 conversion and name it.
|
||||
u.PortName = "legacy"
|
||||
}
|
||||
}
|
||||
|
||||
if u.PortName != "" && n.IsV1() {
|
||||
return nil, fmt.Errorf("explicit upstreams cannot use port names in v1")
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, u := range svc.Upstreams {
|
||||
if err := defaultUpstream(u); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if u.PortName == "" && n.IsV2() {
|
||||
// Assume this is a v1->v2 conversion and name it.
|
||||
u.PortName = "legacy"
|
||||
}
|
||||
|
||||
if n.IsV2() {
|
||||
for _, u := range svc.ImpliedUpstreams {
|
||||
u.Implied = true
|
||||
if err := defaultUpstream(u); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if len(svc.ImpliedUpstreams) > 0 {
|
||||
return nil, fmt.Errorf("v1 does not support implied upstreams yet")
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -424,31 +457,36 @@ func compile(logger hclog.Logger, raw *Config, prev *Topology) (*Topology, error
|
|||
return nil, fmt.Errorf("cluster %q node %q service %q is not valid: %w", c.Name, n.Name, svc.ID.String(), err)
|
||||
}
|
||||
|
||||
if svc.EnableTransparentProxy && !n.IsDataplane() {
|
||||
return nil, fmt.Errorf("cannot enable tproxy on a non-dataplane node")
|
||||
}
|
||||
|
||||
if n.IsV2() {
|
||||
if implicitV2Services {
|
||||
svc.V2Services = []string{svc.ID.Name}
|
||||
|
||||
var svcPorts []*pbcatalog.ServicePort
|
||||
for name := range svc.Ports {
|
||||
for name, cfg := range svc.Ports {
|
||||
svcPorts = append(svcPorts, &pbcatalog.ServicePort{
|
||||
TargetPort: name,
|
||||
Protocol: pbcatalog.Protocol_PROTOCOL_TCP, // TODO
|
||||
})
|
||||
}
|
||||
if !svc.DisableServiceMesh {
|
||||
svcPorts = append(svcPorts, &pbcatalog.ServicePort{
|
||||
TargetPort: "mesh", Protocol: pbcatalog.Protocol_PROTOCOL_MESH,
|
||||
Protocol: cfg.ActualProtocol,
|
||||
})
|
||||
}
|
||||
|
||||
v2svc := &pbcatalog.Service{
|
||||
Workloads: &pbcatalog.WorkloadSelector{
|
||||
Names: []string{svc.Workload},
|
||||
},
|
||||
Ports: svcPorts,
|
||||
Workloads: &pbcatalog.WorkloadSelector{},
|
||||
Ports: svcPorts,
|
||||
}
|
||||
|
||||
c.Services[svc.ID] = v2svc
|
||||
prev, ok := c.Services[svc.ID]
|
||||
if !ok {
|
||||
c.Services[svc.ID] = v2svc
|
||||
prev = v2svc
|
||||
}
|
||||
if prev.Workloads == nil {
|
||||
prev.Workloads = &pbcatalog.WorkloadSelector{}
|
||||
}
|
||||
prev.Workloads.Names = append(prev.Workloads.Names, svc.Workload)
|
||||
|
||||
} else {
|
||||
for _, name := range svc.V2Services {
|
||||
|
@ -466,20 +504,45 @@ func compile(logger hclog.Logger, raw *Config, prev *Topology) (*Topology, error
|
|||
}
|
||||
}
|
||||
|
||||
if len(svc.WorkloadIdentities) == 0 {
|
||||
svc.WorkloadIdentities = []string{svc.ID.Name}
|
||||
if svc.WorkloadIdentity == "" {
|
||||
svc.WorkloadIdentity = svc.ID.Name
|
||||
}
|
||||
} else {
|
||||
if len(svc.V2Services) > 0 {
|
||||
return nil, fmt.Errorf("cannot specify v2 services for v1")
|
||||
}
|
||||
if len(svc.WorkloadIdentities) > 0 {
|
||||
if svc.WorkloadIdentity != "" {
|
||||
return nil, fmt.Errorf("cannot specify workload identities for v1")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := assignVirtualIPs(c); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if c.EnableV2 {
|
||||
// Populate the VirtualPort field on all implied upstreams.
|
||||
for _, n := range c.Nodes {
|
||||
for _, svc := range n.Services {
|
||||
for _, u := range svc.ImpliedUpstreams {
|
||||
res, ok := c.Services[u.ID]
|
||||
if ok {
|
||||
for _, sp := range res.Ports {
|
||||
if sp.Protocol == pbcatalog.Protocol_PROTOCOL_MESH {
|
||||
continue
|
||||
}
|
||||
if sp.TargetPort == u.PortName {
|
||||
u.VirtualPort = sp.VirtualPort
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Explode this into the explicit list based on stray references made.
|
||||
c.Partitions = nil
|
||||
for ap, nsMap := range tenancies {
|
||||
|
@ -605,6 +668,21 @@ func compile(logger hclog.Logger, raw *Config, prev *Topology) (*Topology, error
|
|||
// this helps in generating fortio assertions; otherwise field is ignored
|
||||
u.ID.Partition = remotePeer.Link.Partition
|
||||
}
|
||||
for _, u := range svc.ImpliedUpstreams {
|
||||
if u.Peer == "" {
|
||||
u.Cluster = c.Name
|
||||
u.Peering = nil
|
||||
continue
|
||||
}
|
||||
remotePeer, ok := c.Peerings[u.Peer]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("not possible")
|
||||
}
|
||||
u.Cluster = remotePeer.Link.Name
|
||||
u.Peering = remotePeer.Link
|
||||
// this helps in generating fortio assertions; otherwise field is ignored
|
||||
u.ID.Partition = remotePeer.Link.Partition
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -671,6 +749,51 @@ func compile(logger hclog.Logger, raw *Config, prev *Topology) (*Topology, error
|
|||
return t, nil
|
||||
}
|
||||
|
||||
func assignVirtualIPs(c *Cluster) error {
|
||||
lastVIPIndex := 1
|
||||
for _, svcData := range c.Services {
|
||||
lastVIPIndex++
|
||||
if lastVIPIndex > 250 {
|
||||
return fmt.Errorf("too many ips using this approach to VIPs")
|
||||
}
|
||||
svcData.VirtualIps = []string{
|
||||
fmt.Sprintf("10.244.0.%d", lastVIPIndex),
|
||||
}
|
||||
|
||||
// populate virtual ports where we forgot them
|
||||
var (
|
||||
usedPorts = make(map[uint32]struct{})
|
||||
next = uint32(8080)
|
||||
)
|
||||
for _, sp := range svcData.Ports {
|
||||
if sp.Protocol == pbcatalog.Protocol_PROTOCOL_MESH {
|
||||
continue
|
||||
}
|
||||
if sp.VirtualPort > 0 {
|
||||
usedPorts[sp.VirtualPort] = struct{}{}
|
||||
}
|
||||
}
|
||||
for _, sp := range svcData.Ports {
|
||||
if sp.Protocol == pbcatalog.Protocol_PROTOCOL_MESH {
|
||||
continue
|
||||
}
|
||||
if sp.VirtualPort > 0 {
|
||||
continue
|
||||
}
|
||||
RETRY:
|
||||
attempt := next
|
||||
next++
|
||||
_, used := usedPorts[attempt]
|
||||
if used {
|
||||
goto RETRY
|
||||
}
|
||||
usedPorts[attempt] = struct{}{}
|
||||
sp.VirtualPort = attempt
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
const permutedWarning = "use the disabled node kind if you want to ignore a node"
|
||||
|
||||
func inheritAndValidateNodes(
|
||||
|
|
|
@ -38,13 +38,21 @@ func (i Images) LocalDataplaneImage() string {
|
|||
return "local/" + name + ":" + tag
|
||||
}
|
||||
|
||||
func (i Images) LocalDataplaneTProxyImage() string {
|
||||
return spliceImageNamesAndTags(i.Dataplane, i.Consul, "tproxy")
|
||||
}
|
||||
|
||||
func (i Images) EnvoyConsulImage() string {
|
||||
if i.Consul == "" || i.Envoy == "" {
|
||||
return spliceImageNamesAndTags(i.Consul, i.Envoy, "")
|
||||
}
|
||||
|
||||
func spliceImageNamesAndTags(base1, base2, nameSuffix string) string {
|
||||
if base1 == "" || base2 == "" {
|
||||
return ""
|
||||
}
|
||||
|
||||
img1, tag1, ok1 := strings.Cut(i.Consul, ":")
|
||||
img2, tag2, ok2 := strings.Cut(i.Envoy, ":")
|
||||
img1, tag1, ok1 := strings.Cut(base1, ":")
|
||||
img2, tag2, ok2 := strings.Cut(base2, ":")
|
||||
if !ok1 {
|
||||
tag1 = "latest"
|
||||
}
|
||||
|
@ -66,8 +74,12 @@ func (i Images) EnvoyConsulImage() string {
|
|||
name2 = repo2
|
||||
}
|
||||
|
||||
if nameSuffix != "" {
|
||||
nameSuffix = "-" + nameSuffix
|
||||
}
|
||||
|
||||
// ex: local/hashicorp-consul-and-envoyproxy-envoy:1.15.0-with-v1.26.2
|
||||
return "local/" + name1 + "-and-" + name2 + ":" + tag1 + "-with-" + tag2
|
||||
return "local/" + name1 + "-and-" + name2 + nameSuffix + ":" + tag1 + "-with-" + tag2
|
||||
}
|
||||
|
||||
// TODO: what is this for and why do we need to do this and why is it named this?
|
||||
|
|
|
@ -22,6 +22,12 @@ func (t *Topology) ComputeRelationships() []Relationship {
|
|||
Upstream: u,
|
||||
})
|
||||
}
|
||||
for _, u := range s.ImpliedUpstreams {
|
||||
out = append(out, Relationship{
|
||||
Caller: s,
|
||||
Upstream: u,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -35,6 +41,10 @@ func RenderRelationships(ships []Relationship) string {
|
|||
w := tabwriter.NewWriter(&buf, 0, 0, 3, ' ', tabwriter.Debug)
|
||||
fmt.Fprintf(w, "DOWN\tnode\tservice\tport\tUP\tservice\t\n")
|
||||
for _, r := range ships {
|
||||
suffix := ""
|
||||
if r.Upstream.Implied {
|
||||
suffix = " (implied)"
|
||||
}
|
||||
fmt.Fprintf(w,
|
||||
"%s\t%s\t%s\t%d\t%s\t%s\t\n",
|
||||
r.downCluster(),
|
||||
|
@ -42,7 +52,7 @@ func RenderRelationships(ships []Relationship) string {
|
|||
r.Caller.ID.String(),
|
||||
r.Upstream.LocalPort,
|
||||
r.upCluster(),
|
||||
r.Upstream.ID.String(),
|
||||
r.Upstream.ID.String()+suffix,
|
||||
)
|
||||
}
|
||||
fmt.Fprintf(w, "\t\t\t\t\t\t\n")
|
||||
|
@ -57,14 +67,19 @@ type Relationship struct {
|
|||
}
|
||||
|
||||
func (r Relationship) String() string {
|
||||
suffix := ""
|
||||
if r.Upstream.PortName != "" {
|
||||
suffix = " port " + r.Upstream.PortName
|
||||
}
|
||||
return fmt.Sprintf(
|
||||
"%s on %s in %s via :%d => %s in %s",
|
||||
"%s on %s in %s via :%d => %s in %s%s",
|
||||
r.Caller.ID.String(),
|
||||
r.Caller.Node.ID().String(),
|
||||
r.downCluster(),
|
||||
r.Upstream.LocalPort,
|
||||
r.Upstream.ID.String(),
|
||||
r.upCluster(),
|
||||
suffix,
|
||||
)
|
||||
}
|
||||
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
"net/netip"
|
||||
"reflect"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
"github.com/hashicorp/consul/api"
|
||||
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v2beta1"
|
||||
|
@ -717,6 +718,32 @@ type ServiceAndNode struct {
|
|||
Node *Node
|
||||
}
|
||||
|
||||
// Protocol is a convenience function to use when authoring topology configs.
|
||||
func Protocol(s string) (pbcatalog.Protocol, bool) {
|
||||
switch strings.ToLower(s) {
|
||||
case "tcp":
|
||||
return pbcatalog.Protocol_PROTOCOL_TCP, true
|
||||
case "http":
|
||||
return pbcatalog.Protocol_PROTOCOL_HTTP, true
|
||||
case "http2":
|
||||
return pbcatalog.Protocol_PROTOCOL_HTTP2, true
|
||||
case "grpc":
|
||||
return pbcatalog.Protocol_PROTOCOL_GRPC, true
|
||||
case "mesh":
|
||||
return pbcatalog.Protocol_PROTOCOL_MESH, true
|
||||
default:
|
||||
return pbcatalog.Protocol_PROTOCOL_UNSPECIFIED, false
|
||||
}
|
||||
}
|
||||
|
||||
type Port struct {
|
||||
Number int
|
||||
Protocol string `json:",omitempty"`
|
||||
|
||||
// denormalized at topology compile
|
||||
ActualProtocol pbcatalog.Protocol `json:",omitempty"`
|
||||
}
|
||||
|
||||
// TODO(rb): really this should now be called "workload" or "instance"
|
||||
type Service struct {
|
||||
ID ServiceID
|
||||
|
@ -728,15 +755,7 @@ type Service struct {
|
|||
// Ports is the v2 multi-port list for this service.
|
||||
//
|
||||
// This only applies for multi-port (v2).
|
||||
Ports map[string]int `json:",omitempty"`
|
||||
|
||||
// ExposedPort is the exposed docker port corresponding to 'Port'.
|
||||
ExposedPort int `json:",omitempty"`
|
||||
|
||||
// ExposedPorts are the exposed docker ports corresponding to 'Ports'.
|
||||
//
|
||||
// This only applies for multi-port (v2).
|
||||
ExposedPorts map[string]int `json:",omitempty"`
|
||||
Ports map[string]*Port `json:",omitempty"`
|
||||
|
||||
// V2Services contains service names (which are merged with the tenancy
|
||||
// info from ID) to resolve services in the Services slice in the Cluster
|
||||
|
@ -748,14 +767,14 @@ type Service struct {
|
|||
// This only applies for multi-port (v2).
|
||||
V2Services []string `json:",omitempty"`
|
||||
|
||||
// WorkloadIdentities contains named WorkloadIdentities to assign to this
|
||||
// WorkloadIdentity contains named WorkloadIdentity to assign to this
|
||||
// workload.
|
||||
//
|
||||
// If omitted it is inferred that the ID.Name field is the singular
|
||||
// identity for this workload.
|
||||
//
|
||||
// This only applies for multi-port (v2).
|
||||
WorkloadIdentities []string `json:",omitempty"`
|
||||
WorkloadIdentity string `json:",omitempty"`
|
||||
|
||||
Disabled bool `json:",omitempty"` // TODO
|
||||
|
||||
|
@ -774,9 +793,11 @@ type Service struct {
|
|||
Command []string `json:",omitempty"` // optional
|
||||
Env []string `json:",omitempty"` // optional
|
||||
|
||||
DisableServiceMesh bool `json:",omitempty"`
|
||||
IsMeshGateway bool `json:",omitempty"`
|
||||
Upstreams []*Upstream
|
||||
EnableTransparentProxy bool `json:",omitempty"`
|
||||
DisableServiceMesh bool `json:",omitempty"`
|
||||
IsMeshGateway bool `json:",omitempty"`
|
||||
Upstreams []*Upstream `json:",omitempty"`
|
||||
ImpliedUpstreams []*Upstream `json:",omitempty"`
|
||||
|
||||
// denormalized at topology compile
|
||||
Node *Node `json:"-"`
|
||||
|
@ -784,9 +805,28 @@ type Service struct {
|
|||
Workload string `json:"-"`
|
||||
}
|
||||
|
||||
func (s *Service) ExposedPort(name string) int {
|
||||
if s.Node == nil {
|
||||
panic("ExposedPort cannot be called until after Compile")
|
||||
}
|
||||
|
||||
var internalPort int
|
||||
if name == "" {
|
||||
internalPort = s.Port
|
||||
} else {
|
||||
port, ok := s.Ports[name]
|
||||
if !ok {
|
||||
panic("port with name " + name + " not present on service")
|
||||
}
|
||||
internalPort = port.Number
|
||||
}
|
||||
|
||||
return s.Node.ExposedPort(internalPort)
|
||||
}
|
||||
|
||||
func (s *Service) PortOrDefault(name string) int {
|
||||
if len(s.Ports) > 0 {
|
||||
return s.Ports[name]
|
||||
return s.Ports[name].Number
|
||||
}
|
||||
return s.Port
|
||||
}
|
||||
|
@ -800,8 +840,6 @@ func (s *Service) IsV1() bool {
|
|||
}
|
||||
|
||||
func (s *Service) inheritFromExisting(existing *Service) {
|
||||
s.ExposedPort = existing.ExposedPort
|
||||
s.ExposedPorts = existing.ExposedPorts
|
||||
s.ExposedEnvoyAdminPort = existing.ExposedEnvoyAdminPort
|
||||
}
|
||||
|
||||
|
@ -810,10 +848,10 @@ func (s *Service) ports() []int {
|
|||
if len(s.Ports) > 0 {
|
||||
seen := make(map[int]struct{})
|
||||
for _, port := range s.Ports {
|
||||
if _, ok := seen[port]; !ok {
|
||||
if _, ok := seen[port.Number]; !ok {
|
||||
// It's totally fine to expose the same port twice in a workload.
|
||||
seen[port] = struct{}{}
|
||||
out = append(out, port)
|
||||
seen[port.Number] = struct{}{}
|
||||
out = append(out, port.Number)
|
||||
}
|
||||
}
|
||||
} else if s.Port > 0 {
|
||||
|
@ -838,7 +876,6 @@ func (s *Service) HasCheck() bool {
|
|||
}
|
||||
|
||||
func (s *Service) DigestExposedPorts(ports map[int]int) {
|
||||
s.ExposedPort = ports[s.Port]
|
||||
if s.EnvoyAdminPort > 0 {
|
||||
s.ExposedEnvoyAdminPort = ports[s.EnvoyAdminPort]
|
||||
} else {
|
||||
|
@ -858,15 +895,39 @@ func (s *Service) Validate() error {
|
|||
return fmt.Errorf("cannot specify both singleport and multiport on service in v2")
|
||||
}
|
||||
if s.Port > 0 {
|
||||
s.Ports = map[string]int{"legacy": s.Port}
|
||||
s.Ports = map[string]*Port{
|
||||
"legacy": {
|
||||
Number: s.Port,
|
||||
Protocol: "tcp",
|
||||
},
|
||||
}
|
||||
s.Port = 0
|
||||
}
|
||||
|
||||
for name, port := range s.Ports {
|
||||
if port <= 0 {
|
||||
return fmt.Errorf("service has invalid port %q", name)
|
||||
if !s.DisableServiceMesh && s.EnvoyPublicListenerPort > 0 {
|
||||
s.Ports["mesh"] = &Port{
|
||||
Number: s.EnvoyPublicListenerPort,
|
||||
Protocol: "mesh",
|
||||
}
|
||||
}
|
||||
|
||||
for name, port := range s.Ports {
|
||||
if port == nil {
|
||||
return fmt.Errorf("cannot be nil")
|
||||
}
|
||||
if port.Number <= 0 {
|
||||
return fmt.Errorf("service has invalid port number %q", name)
|
||||
}
|
||||
if port.ActualProtocol != pbcatalog.Protocol_PROTOCOL_UNSPECIFIED {
|
||||
return fmt.Errorf("user cannot specify ActualProtocol field")
|
||||
}
|
||||
|
||||
proto, valid := Protocol(port.Protocol)
|
||||
if !valid {
|
||||
return fmt.Errorf("service has invalid port protocol %q", port.Protocol)
|
||||
}
|
||||
port.ActualProtocol = proto
|
||||
}
|
||||
} else {
|
||||
if len(s.Ports) > 0 {
|
||||
return fmt.Errorf("cannot specify mulitport on service in v1")
|
||||
|
@ -874,6 +935,9 @@ func (s *Service) Validate() error {
|
|||
if s.Port <= 0 {
|
||||
return fmt.Errorf("service has invalid port")
|
||||
}
|
||||
if s.EnableTransparentProxy {
|
||||
return fmt.Errorf("tproxy does not work with v1 yet")
|
||||
}
|
||||
}
|
||||
if s.DisableServiceMesh && s.IsMeshGateway {
|
||||
return fmt.Errorf("cannot disable service mesh and still run a mesh gateway")
|
||||
|
@ -881,6 +945,12 @@ func (s *Service) Validate() error {
|
|||
if s.DisableServiceMesh && len(s.Upstreams) > 0 {
|
||||
return fmt.Errorf("cannot disable service mesh and configure upstreams")
|
||||
}
|
||||
if s.DisableServiceMesh && len(s.ImpliedUpstreams) > 0 {
|
||||
return fmt.Errorf("cannot disable service mesh and configure implied upstreams")
|
||||
}
|
||||
if s.DisableServiceMesh && s.EnableTransparentProxy {
|
||||
return fmt.Errorf("cannot disable service mesh and activate tproxy")
|
||||
}
|
||||
|
||||
if s.DisableServiceMesh {
|
||||
if s.EnvoyAdminPort != 0 {
|
||||
|
@ -906,6 +976,20 @@ func (s *Service) Validate() error {
|
|||
return fmt.Errorf("upstream local address is invalid: %s", u.LocalAddress)
|
||||
}
|
||||
}
|
||||
if u.Implied {
|
||||
return fmt.Errorf("implied field cannot be set")
|
||||
}
|
||||
}
|
||||
for _, u := range s.ImpliedUpstreams {
|
||||
if u.ID.Name == "" {
|
||||
return fmt.Errorf("implied upstream service name is required")
|
||||
}
|
||||
if u.LocalPort > 0 {
|
||||
return fmt.Errorf("implied upstream local port cannot be set")
|
||||
}
|
||||
if u.LocalAddress != "" {
|
||||
return fmt.Errorf("implied upstream local address cannot be set")
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -924,8 +1008,10 @@ type Upstream struct {
|
|||
// TODO: what about mesh gateway mode overrides?
|
||||
|
||||
// computed at topology compile
|
||||
Cluster string `json:",omitempty"`
|
||||
Peering *PeerCluster `json:",omitempty"` // this will have Link!=nil
|
||||
Cluster string `json:",omitempty"`
|
||||
Peering *PeerCluster `json:",omitempty"` // this will have Link!=nil
|
||||
Implied bool `json:",omitempty"`
|
||||
VirtualPort uint32 `json:",omitempty"`
|
||||
}
|
||||
|
||||
type Peering struct {
|
||||
|
|
Loading…
Reference in New Issue