consul/test/integration/consul-container/libs/service/helpers.go

415 lines
13 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package service
import (
"context"
"fmt"
"testing"
"github.com/hashicorp/consul/api"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go"
libcluster "github.com/hashicorp/consul/test/integration/consul-container/libs/cluster"
"github.com/hashicorp/consul/test/integration/consul-container/libs/utils"
)
const (
StaticServerServiceName = "static-server"
StaticServer2ServiceName = "static-server-2"
StaticClientServiceName = "static-client"
)
type Checks struct {
Name string
TTL string
}
type SidecarService struct {
Port int
Proxy ConnectProxy
}
type ConnectProxy struct {
Mode string
}
type ServiceOpts struct {
Name string
ID string
Meta map[string]string
HTTPPort int
GRPCPort int
// if true, register GRPC port instead of HTTP (default)
RegisterGRPC bool
Checks Checks
Connect SidecarService
Namespace string
Partition string
Locality *api.Locality
Upstreams []api.Upstream
}
// createAndRegisterStaticServerAndSidecar register the services and launch static-server containers
func createAndRegisterStaticServerAndSidecar(node libcluster.Agent, httpPort int, grpcPort int, svc *api.AgentServiceRegistration, customContainerCfg func(testcontainers.ContainerRequest) testcontainers.ContainerRequest, containerArgs ...string) (Service, Service, error) {
// Do some trickery to ensure that partial completion is correctly torn
// down, but successful execution is not.
var deferClean utils.ResettableDefer
defer deferClean.Execute()
if err := node.GetClient().Agent().ServiceRegister(svc); err != nil {
return nil, nil, err
}
// Create a service and proxy instance
serverService, err := NewExampleService(context.Background(), svc.ID, httpPort, grpcPort, node, containerArgs...)
if err != nil {
return nil, nil, err
}
deferClean.Add(func() {
_ = serverService.Terminate()
})
sidecarCfg := SidecarConfig{
Name: fmt.Sprintf("%s-sidecar", svc.ID),
ServiceID: svc.ID,
Namespace: svc.Namespace,
Partition: svc.Partition,
EnableTProxy: svc.Connect != nil &&
svc.Connect.SidecarService != nil &&
svc.Connect.SidecarService.Proxy != nil &&
svc.Connect.SidecarService.Proxy.Mode == api.ProxyModeTransparent,
}
serverConnectProxy, err := NewConnectService(context.Background(), sidecarCfg, []int{svc.Port}, node, customContainerCfg) // bindPort not used
if err != nil {
return nil, nil, err
}
deferClean.Add(func() {
_ = serverConnectProxy.Terminate()
})
// disable cleanup functions now that we have an object with a Terminate() function
deferClean.Reset()
return serverService, serverConnectProxy, nil
}
// createAndRegisterCustomServiceAndSidecar creates a custom service from the given testcontainers.ContainerRequest
// and a sidecar proxy for the service. The customContainerCfg parameter is used to mutate the
// testcontainers.ContainerRequest for the sidecar proxy.
func createAndRegisterCustomServiceAndSidecar(node libcluster.Agent,
httpPort int,
grpcPort int,
svc *api.AgentServiceRegistration,
request testcontainers.ContainerRequest,
customContainerCfg func(testcontainers.ContainerRequest) testcontainers.ContainerRequest,
) (Service, Service, error) {
// Do some trickery to ensure that partial completion is correctly torn
// down, but successful execution is not.
var deferClean utils.ResettableDefer
defer deferClean.Execute()
if err := node.GetClient().Agent().ServiceRegister(svc); err != nil {
return nil, nil, err
}
// Create a service and proxy instance
serverService, err := NewCustomService(context.Background(), svc.ID, httpPort, grpcPort, node, request)
if err != nil {
return nil, nil, err
}
deferClean.Add(func() {
_ = serverService.Terminate()
})
sidecarCfg := SidecarConfig{
Name: fmt.Sprintf("%s-sidecar", svc.ID),
ServiceID: svc.ID,
Namespace: svc.Namespace,
EnableTProxy: svc.Connect != nil &&
svc.Connect.SidecarService != nil &&
svc.Connect.SidecarService.Proxy != nil &&
svc.Connect.SidecarService.Proxy.Mode == api.ProxyModeTransparent,
}
serverConnectProxy, err := NewConnectService(context.Background(), sidecarCfg, []int{svc.Port}, node, customContainerCfg) // bindPort not used
if err != nil {
return nil, nil, err
}
deferClean.Add(func() {
_ = serverConnectProxy.Terminate()
})
// disable cleanup functions now that we have an object with a Terminate() function
deferClean.Reset()
return serverService, serverConnectProxy, nil
}
func CreateAndRegisterCustomServiceAndSidecar(node libcluster.Agent,
serviceOpts *ServiceOpts,
request testcontainers.ContainerRequest,
customContainerCfg func(testcontainers.ContainerRequest) testcontainers.ContainerRequest) (Service, Service, error) {
// Register the static-server service and sidecar first to prevent race with sidecar
// trying to get xDS before it's ready
p := serviceOpts.HTTPPort
agentCheck := api.AgentServiceCheck{
Name: "Static Server Listening",
TCP: fmt.Sprintf("127.0.0.1:%d", p),
Interval: "10s",
Status: api.HealthPassing,
}
if serviceOpts.RegisterGRPC {
p = serviceOpts.GRPCPort
agentCheck.TCP = ""
agentCheck.GRPC = fmt.Sprintf("127.0.0.1:%d", p)
}
req := &api.AgentServiceRegistration{
Name: serviceOpts.Name,
ID: serviceOpts.ID,
Port: p,
Connect: &api.AgentServiceConnect{
SidecarService: &api.AgentServiceRegistration{
Proxy: &api.AgentServiceConnectProxyConfig{
Mode: api.ProxyMode(serviceOpts.Connect.Proxy.Mode),
},
},
},
Namespace: serviceOpts.Namespace,
Partition: serviceOpts.Partition,
Locality: serviceOpts.Locality,
Meta: serviceOpts.Meta,
Check: &agentCheck,
}
return createAndRegisterCustomServiceAndSidecar(node, serviceOpts.HTTPPort, serviceOpts.GRPCPort, req, request, customContainerCfg)
}
// CreateAndRegisterStaticServerAndSidecarWithCustomContainerConfig creates an example static server and a sidecar for
// the service. The customContainerCfg parameter is a function of testcontainers.ContainerRequest to
// testcontainers.ContainerRequest which can be used to mutate the container request for the sidecar proxy and inject
// custom configuration and lifecycle hooks.
func CreateAndRegisterStaticServerAndSidecarWithCustomContainerConfig(node libcluster.Agent,
serviceOpts *ServiceOpts,
customContainerCfg func(testcontainers.ContainerRequest) testcontainers.ContainerRequest,
containerArgs ...string) (Service, Service, error) {
// Register the static-server service and sidecar first to prevent race with sidecar
// trying to get xDS before it's ready
p := serviceOpts.HTTPPort
agentCheck := api.AgentServiceCheck{
Name: "Static Server Listening",
TCP: fmt.Sprintf("127.0.0.1:%d", p),
Interval: "10s",
Status: api.HealthPassing,
}
if serviceOpts.RegisterGRPC {
p = serviceOpts.GRPCPort
agentCheck.TCP = ""
agentCheck.GRPC = fmt.Sprintf("127.0.0.1:%d", p)
}
req := &api.AgentServiceRegistration{
Name: serviceOpts.Name,
ID: serviceOpts.ID,
Port: p,
Connect: &api.AgentServiceConnect{
SidecarService: &api.AgentServiceRegistration{
Proxy: &api.AgentServiceConnectProxyConfig{
Mode: api.ProxyMode(serviceOpts.Connect.Proxy.Mode),
},
},
},
Namespace: serviceOpts.Namespace,
Partition: serviceOpts.Partition,
Locality: serviceOpts.Locality,
Meta: serviceOpts.Meta,
Check: &agentCheck,
}
return createAndRegisterStaticServerAndSidecar(node, serviceOpts.HTTPPort, serviceOpts.GRPCPort, req, customContainerCfg, containerArgs...)
}
func CreateAndRegisterStaticServerAndSidecar(node libcluster.Agent, serviceOpts *ServiceOpts, containerArgs ...string) (Service, Service, error) {
return CreateAndRegisterStaticServerAndSidecarWithCustomContainerConfig(node, serviceOpts, nil, containerArgs...)
}
func CreateAndRegisterStaticServerAndSidecarWithChecks(node libcluster.Agent, serviceOpts *ServiceOpts) (Service, Service, error) {
// Register the static-server service and sidecar first to prevent race with sidecar
// trying to get xDS before it's ready
req := &api.AgentServiceRegistration{
Name: serviceOpts.Name,
ID: serviceOpts.ID,
Port: serviceOpts.HTTPPort,
Connect: &api.AgentServiceConnect{
SidecarService: &api.AgentServiceRegistration{
Proxy: &api.AgentServiceConnectProxyConfig{
Mode: api.ProxyMode(serviceOpts.Connect.Proxy.Mode),
},
Port: serviceOpts.Connect.Port,
},
},
Checks: api.AgentServiceChecks{
{
Name: serviceOpts.Checks.Name,
TTL: serviceOpts.Checks.TTL,
},
},
Meta: serviceOpts.Meta,
Namespace: serviceOpts.Namespace,
Partition: serviceOpts.Partition,
Locality: serviceOpts.Locality,
}
return createAndRegisterStaticServerAndSidecar(node, serviceOpts.HTTPPort, serviceOpts.GRPCPort, req, nil)
}
func CreateAndRegisterStaticClientSidecar(
node libcluster.Agent,
peerName string,
localMeshGateway bool,
enableTProxy bool,
serviceOpts *ServiceOpts,
) (*ConnectContainer, error) {
// Do some trickery to ensure that partial completion is correctly torn
// down, but successful execution is not.
var deferClean utils.ResettableDefer
defer deferClean.Execute()
var proxy *api.AgentServiceConnectProxyConfig
if enableTProxy {
proxy = &api.AgentServiceConnectProxyConfig{
Mode: "transparent",
}
} else {
mgwMode := api.MeshGatewayModeRemote
if localMeshGateway {
mgwMode = api.MeshGatewayModeLocal
}
proxy = &api.AgentServiceConnectProxyConfig{
Upstreams: []api.Upstream{{
DestinationName: StaticServerServiceName,
DestinationPeer: peerName,
LocalBindAddress: "0.0.0.0",
LocalBindPort: libcluster.ServiceUpstreamLocalBindPort,
MeshGateway: api.MeshGatewayConfig{
Mode: mgwMode,
},
}},
}
}
// Register the static-client service and sidecar first to prevent race with sidecar
// trying to get xDS before it's ready
req := &api.AgentServiceRegistration{
Name: StaticClientServiceName,
Port: 8080,
Connect: &api.AgentServiceConnect{
SidecarService: &api.AgentServiceRegistration{
Proxy: proxy,
},
},
}
// Set relevant fields for static client if opts are provided
if serviceOpts != nil {
if serviceOpts.Connect.Proxy.Mode != "" {
return nil, fmt.Errorf("this helper does not support directly setting connect proxy mode; use enableTProxy and/or localMeshGateway instead")
}
// These options are defaulted above, so only set them as overrides
if serviceOpts.Name != "" {
req.Name = serviceOpts.Name
}
if serviceOpts.HTTPPort != 0 {
req.Port = serviceOpts.HTTPPort
}
if serviceOpts.Connect.Port != 0 {
req.Connect.SidecarService.Port = serviceOpts.Connect.Port
}
if len(serviceOpts.Upstreams) > 0 {
req.Connect.SidecarService.Proxy.Upstreams = serviceOpts.Upstreams
}
req.Meta = serviceOpts.Meta
req.Namespace = serviceOpts.Namespace
req.Partition = serviceOpts.Partition
req.Locality = serviceOpts.Locality
}
if err := node.GetClient().Agent().ServiceRegister(req); err != nil {
return nil, err
}
// Create a service and proxy instance
sidecarCfg := SidecarConfig{
Name: fmt.Sprintf("%s-sidecar", StaticClientServiceName),
ServiceID: StaticClientServiceName,
EnableTProxy: enableTProxy,
}
clientConnectProxy, err := NewConnectService(context.Background(), sidecarCfg, []int{libcluster.ServiceUpstreamLocalBindPort}, node, nil)
if err != nil {
return nil, err
}
deferClean.Add(func() {
_ = clientConnectProxy.Terminate()
})
// disable cleanup functions now that we have an object with a Terminate() function
deferClean.Reset()
return clientConnectProxy, nil
}
func ClientsCreate(t *testing.T, numClients int, image, version string, cluster *libcluster.Cluster) {
opts := libcluster.BuildOptions{
ConsulImageName: image,
ConsulVersion: version,
}
ctx := libcluster.NewBuildContext(t, opts)
conf := libcluster.NewConfigBuilder(ctx).
Client().
ToAgentConfig(t)
t.Logf("Cluster client config:\n%s", conf.JSON)
require.NoError(t, cluster.AddN(*conf, numClients, true))
}
func ServiceCreate(t *testing.T, client *api.Client, serviceName string) uint64 {
require.NoError(t, client.Agent().ServiceRegister(&api.AgentServiceRegistration{
Name: serviceName,
Port: 9999,
Connect: &api.AgentServiceConnect{
SidecarService: &api.AgentServiceRegistration{
Port: 22005,
},
},
}))
service, meta, err := client.Catalog().Service(serviceName, "", &api.QueryOptions{})
require.NoError(t, err)
require.Len(t, service, 1)
require.Equal(t, serviceName, service[0].ServiceName)
require.Equal(t, 9999, service[0].ServicePort)
return meta.LastIndex
}
func ServiceHealthBlockingQuery(client *api.Client, serviceName string, waitIndex uint64) (chan []*api.ServiceEntry, chan error) {
var (
ch = make(chan []*api.ServiceEntry, 1)
errCh = make(chan error, 1)
)
go func() {
opts := &api.QueryOptions{WaitIndex: waitIndex}
service, q, err := client.Health().Service(serviceName, "", false, opts)
if err == nil && q.QueryBackend != api.QueryBackendStreaming {
err = fmt.Errorf("invalid backend for this test %s", q.QueryBackend)
}
if err != nil {
errCh <- err
} else {
ch <- service
}
}()
return ch, errCh
}