mirror of
https://github.com/status-im/consul.git
synced 2025-01-22 03:29:43 +00:00
61b7c0d76f
* Support locality testing in consul-container Support including locality in client sidecar config. Also align test config structs with Ent to avoid future conflicts. * Refactor consul-container fortio helpers Refactor fortio test helpers to separate HTTP retries from waiting on fortio result changes due to e.g. service startup and failovers.
311 lines
9.7 KiB
Go
311 lines
9.7 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: BUSL-1.1
|
|
|
|
package topology
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"testing"
|
|
|
|
"github.com/stretchr/testify/require"
|
|
"github.com/testcontainers/testcontainers-go"
|
|
|
|
"github.com/hashicorp/consul/api"
|
|
|
|
libassert "github.com/hashicorp/consul/test/integration/consul-container/libs/assert"
|
|
libcluster "github.com/hashicorp/consul/test/integration/consul-container/libs/cluster"
|
|
libservice "github.com/hashicorp/consul/test/integration/consul-container/libs/service"
|
|
"github.com/hashicorp/consul/test/integration/consul-container/libs/utils"
|
|
)
|
|
|
|
const (
|
|
AcceptingPeerName = "accepting-to-dialer"
|
|
DialingPeerName = "dialing-to-acceptor"
|
|
)
|
|
|
|
type BuiltCluster struct {
|
|
Cluster *libcluster.Cluster
|
|
Context *libcluster.BuildContext
|
|
Service libservice.Service
|
|
Container libservice.Service
|
|
Gateway libservice.Service
|
|
}
|
|
|
|
type PeeringClusterSize struct {
|
|
AcceptingNumServers int
|
|
AcceptingNumClients int
|
|
DialingNumServers int
|
|
DialingNumClients int
|
|
}
|
|
|
|
// BasicPeeringTwoClustersSetup sets up a scenario for testing peering, which consists of
|
|
//
|
|
// - an accepting cluster with 3 servers and 1 client agent. The client should be used to
|
|
// host a service for export: staticServerSvc.
|
|
// - an dialing cluster with 1 server and 1 client. The client should be used to host a
|
|
// service connecting to staticServerSvc.
|
|
// - Create the peering, export the service from accepting cluster, and verify service
|
|
// connectivity.
|
|
//
|
|
// It returns objects of the accepting cluster, dialing cluster, staticServerSvc, and staticClientSvcSidecar
|
|
func BasicPeeringTwoClustersSetup(
|
|
t *testing.T,
|
|
consulImage string,
|
|
consulVersion string,
|
|
pcs PeeringClusterSize,
|
|
peeringThroughMeshgateway bool,
|
|
) (*BuiltCluster, *BuiltCluster) {
|
|
acceptingCluster, acceptingCtx, acceptingClient := NewCluster(t, &ClusterConfig{
|
|
NumServers: pcs.AcceptingNumServers,
|
|
NumClients: pcs.AcceptingNumClients,
|
|
BuildOpts: &libcluster.BuildOptions{
|
|
Datacenter: "dc1",
|
|
ConsulImageName: consulImage,
|
|
ConsulVersion: consulVersion,
|
|
InjectAutoEncryption: true,
|
|
},
|
|
ApplyDefaultProxySettings: true,
|
|
})
|
|
|
|
dialingCluster, dialingCtx, dialingClient := NewCluster(t, &ClusterConfig{
|
|
NumServers: pcs.DialingNumServers,
|
|
NumClients: pcs.DialingNumClients,
|
|
BuildOpts: &libcluster.BuildOptions{
|
|
Datacenter: "dc2",
|
|
ConsulImageName: consulImage,
|
|
ConsulVersion: consulVersion,
|
|
InjectAutoEncryption: true,
|
|
},
|
|
ApplyDefaultProxySettings: true,
|
|
})
|
|
|
|
// Create the mesh gateway for dataplane traffic and peering control plane traffic (if enabled)
|
|
gwCfg := libservice.GatewayConfig{
|
|
Name: "mesh",
|
|
Kind: "mesh",
|
|
}
|
|
acceptingClusterGateway, err := libservice.NewGatewayService(context.Background(), gwCfg, acceptingCluster.Clients()[0])
|
|
require.NoError(t, err)
|
|
dialingClusterGateway, err := libservice.NewGatewayService(context.Background(), gwCfg, dialingCluster.Clients()[0])
|
|
require.NoError(t, err)
|
|
|
|
// Enable peering control plane traffic through mesh gateway
|
|
if peeringThroughMeshgateway {
|
|
req := &api.MeshConfigEntry{
|
|
Peering: &api.PeeringMeshConfig{
|
|
PeerThroughMeshGateways: true,
|
|
},
|
|
}
|
|
configCluster := func(cli *api.Client) error {
|
|
libassert.CatalogServiceExists(t, cli, "mesh", nil)
|
|
ok, _, err := cli.ConfigEntries().Set(req, &api.WriteOptions{})
|
|
if !ok {
|
|
return fmt.Errorf("config entry is not set")
|
|
}
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("error writing config entry: %s", err)
|
|
}
|
|
return nil
|
|
}
|
|
err = configCluster(dialingClient)
|
|
require.NoError(t, err)
|
|
err = configCluster(acceptingClient)
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
require.NoError(t, dialingCluster.PeerWithCluster(acceptingClient, AcceptingPeerName, DialingPeerName))
|
|
|
|
libassert.PeeringStatus(t, acceptingClient, AcceptingPeerName, api.PeeringStateActive)
|
|
// libassert.PeeringExports(t, acceptingClient, acceptingPeerName, 1)
|
|
|
|
// Register an static-server service in acceptingCluster and export to dialing cluster
|
|
var serverService, serverSidecarService libservice.Service
|
|
{
|
|
clientNode := acceptingCluster.Clients()[0]
|
|
|
|
// Create a service and proxy instance
|
|
var err error
|
|
// Create a service and proxy instance
|
|
serviceOpts := libservice.ServiceOpts{
|
|
Name: libservice.StaticServerServiceName,
|
|
ID: "static-server",
|
|
Meta: map[string]string{"version": ""},
|
|
HTTPPort: 8080,
|
|
GRPCPort: 8079,
|
|
}
|
|
serverService, serverSidecarService, err = libservice.CreateAndRegisterStaticServerAndSidecar(clientNode, &serviceOpts)
|
|
require.NoError(t, err)
|
|
|
|
libassert.CatalogServiceExists(t, acceptingClient, libservice.StaticServerServiceName, nil)
|
|
libassert.CatalogServiceExists(t, acceptingClient, "static-server-sidecar-proxy", nil)
|
|
|
|
require.NoError(t, serverService.Export("default", AcceptingPeerName, acceptingClient))
|
|
}
|
|
|
|
// Register an static-client service in dialing cluster and set upstream to static-server service
|
|
var clientSidecarService *libservice.ConnectContainer
|
|
{
|
|
clientNode := dialingCluster.Clients()[0]
|
|
|
|
// Create a service and proxy instance
|
|
var err error
|
|
clientSidecarService, err = libservice.CreateAndRegisterStaticClientSidecar(clientNode, DialingPeerName, true, false, nil)
|
|
require.NoError(t, err)
|
|
|
|
libassert.CatalogServiceExists(t, dialingClient, "static-client-sidecar-proxy", nil)
|
|
|
|
}
|
|
|
|
_, adminPort := clientSidecarService.GetAdminAddr()
|
|
libassert.AssertUpstreamEndpointStatus(t, adminPort, fmt.Sprintf("static-server.default.%s.external", DialingPeerName), "HEALTHY", 1)
|
|
_, port := clientSidecarService.GetAddr()
|
|
libassert.HTTPServiceEchoes(t, "localhost", port, "")
|
|
libassert.AssertFortioName(t, fmt.Sprintf("http://localhost:%d", port), libservice.StaticServerServiceName, "")
|
|
|
|
return &BuiltCluster{
|
|
Cluster: acceptingCluster,
|
|
Context: acceptingCtx,
|
|
Service: serverSidecarService,
|
|
Container: serverSidecarService,
|
|
Gateway: acceptingClusterGateway,
|
|
},
|
|
&BuiltCluster{
|
|
Cluster: dialingCluster,
|
|
Context: dialingCtx,
|
|
Service: nil,
|
|
Container: clientSidecarService,
|
|
Gateway: dialingClusterGateway,
|
|
}
|
|
}
|
|
|
|
type ClusterConfig struct {
|
|
NumServers int
|
|
NumClients int
|
|
ApplyDefaultProxySettings bool
|
|
BuildOpts *libcluster.BuildOptions
|
|
Cmd string
|
|
LogConsumer *TestLogConsumer
|
|
|
|
// Exposed Ports are available on the cluster's pause container for the purposes
|
|
// of adding external communication to the cluster. An example would be a listener
|
|
// on a gateway.
|
|
ExposedPorts []int
|
|
}
|
|
|
|
func NewCluster(
|
|
t *testing.T,
|
|
config *ClusterConfig,
|
|
) (*libcluster.Cluster, *libcluster.BuildContext, *api.Client) {
|
|
return NewClusterWithConfig(t, config, "", "")
|
|
}
|
|
|
|
// NewCluster creates a cluster with peering enabled. It also creates
|
|
// and registers a mesh-gateway at the client agent. The API client returned is
|
|
// pointed at the client agent.
|
|
// - proxy-defaults.protocol = tcp
|
|
func NewClusterWithConfig(
|
|
t *testing.T,
|
|
config *ClusterConfig,
|
|
serverHclConfig string,
|
|
clientHclConfig string,
|
|
) (*libcluster.Cluster, *libcluster.BuildContext, *api.Client) {
|
|
var (
|
|
cluster *libcluster.Cluster
|
|
err error
|
|
)
|
|
require.NotEmpty(t, config.BuildOpts.Datacenter)
|
|
require.True(t, config.NumServers > 0)
|
|
|
|
opts := libcluster.BuildOptions{
|
|
Datacenter: config.BuildOpts.Datacenter,
|
|
InjectAutoEncryption: config.BuildOpts.InjectAutoEncryption,
|
|
InjectGossipEncryption: true,
|
|
AllowHTTPAnyway: true,
|
|
ConsulVersion: config.BuildOpts.ConsulVersion,
|
|
ACLEnabled: config.BuildOpts.ACLEnabled,
|
|
LogStore: config.BuildOpts.LogStore,
|
|
}
|
|
ctx := libcluster.NewBuildContext(t, opts)
|
|
|
|
serverConf := libcluster.NewConfigBuilder(ctx).
|
|
Bootstrap(config.NumServers).
|
|
Peering(true).
|
|
ToAgentConfig(t)
|
|
t.Logf("%s server config: \n%s", opts.Datacenter, serverConf.JSON)
|
|
|
|
// optional
|
|
if config.LogConsumer != nil {
|
|
serverConf.LogConsumer = config.LogConsumer
|
|
}
|
|
|
|
t.Logf("Cluster config:\n%s", serverConf.JSON)
|
|
|
|
// optional custom cmd
|
|
if config.Cmd != "" {
|
|
serverConf.Cmd = append(serverConf.Cmd, config.Cmd)
|
|
}
|
|
|
|
if serverHclConfig != "" {
|
|
serverConf.MutatebyAgentConfig(serverHclConfig)
|
|
}
|
|
|
|
if config.ExposedPorts != nil {
|
|
cluster, err = libcluster.New(t, []libcluster.Config{*serverConf}, config.ExposedPorts...)
|
|
} else {
|
|
cluster, err = libcluster.NewN(t, *serverConf, config.NumServers)
|
|
}
|
|
require.NoError(t, err)
|
|
// builder generates certs for us, so copy them back
|
|
if opts.InjectAutoEncryption {
|
|
cluster.CACert = serverConf.CACert
|
|
}
|
|
|
|
var retryJoin []string
|
|
for i := 0; i < config.NumServers; i++ {
|
|
retryJoin = append(retryJoin, fmt.Sprintf("agent-%d", i))
|
|
}
|
|
|
|
// Add numClients static clients to register the service
|
|
configbuiilder := libcluster.NewConfigBuilder(ctx).
|
|
Client().
|
|
Peering(true).
|
|
RetryJoin(retryJoin...)
|
|
clientConf := configbuiilder.ToAgentConfig(t)
|
|
t.Logf("%s client config: \n%s", opts.Datacenter, clientConf.JSON)
|
|
if clientHclConfig != "" {
|
|
clientConf.MutatebyAgentConfig(clientHclConfig)
|
|
}
|
|
|
|
require.NoError(t, cluster.AddN(*clientConf, config.NumClients, true))
|
|
|
|
// Use the client agent as the HTTP endpoint since we will not rotate it in many tests.
|
|
var client *api.Client
|
|
if config.NumClients > 0 {
|
|
clientNode := cluster.Agents[config.NumServers]
|
|
client = clientNode.GetClient()
|
|
} else {
|
|
client = cluster.Agents[0].GetClient()
|
|
}
|
|
libcluster.WaitForLeader(t, cluster, client)
|
|
libcluster.WaitForMembers(t, client, config.NumServers+config.NumClients)
|
|
|
|
// Default Proxy Settings
|
|
if config.ApplyDefaultProxySettings {
|
|
ok, err := utils.ApplyDefaultProxySettings(client)
|
|
require.NoError(t, err)
|
|
require.True(t, ok)
|
|
}
|
|
|
|
return cluster, ctx, client
|
|
}
|
|
|
|
type TestLogConsumer struct {
|
|
Msgs []string
|
|
}
|
|
|
|
func (g *TestLogConsumer) Accept(l testcontainers.Log) {
|
|
g.Msgs = append(g.Msgs, string(l.Content))
|
|
}
|