consul/testing/deployer/sprawl/catalog.go

429 lines
9.9 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package sprawl
import (
"fmt"
"net/http"
"time"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/testing/deployer/topology"
"github.com/hashicorp/consul/testing/deployer/util"
)
func (s *Sprawl) registerAllServicesToAgents() error {
for _, cluster := range s.topology.Clusters {
if err := s.registerServicesToAgents(cluster); err != nil {
return fmt.Errorf("registerServicesToAgents[%s]: %w", cluster.Name, err)
}
}
return nil
}
func (s *Sprawl) registerAllServicesForDataplaneInstances() error {
for _, cluster := range s.topology.Clusters {
if err := s.registerServicesForDataplaneInstances(cluster); err != nil {
return fmt.Errorf("registerServicesForDataplaneInstances[%s]: %w", cluster.Name, err)
}
}
return nil
}
func (s *Sprawl) registerServicesToAgents(cluster *topology.Cluster) error {
for _, node := range cluster.Nodes {
if !node.RunsWorkloads() || len(node.Services) == 0 || node.Disabled {
continue
}
if !node.IsAgent() {
continue
}
agentClient, err := util.ProxyAPIClient(
node.LocalProxyPort(),
node.LocalAddress(),
8500,
"", /*token will be in request*/
)
if err != nil {
return err
}
for _, svc := range node.Services {
if err := s.registerAgentService(agentClient, cluster, node, svc); err != nil {
return err
}
}
}
return nil
}
func (s *Sprawl) registerAgentService(
agentClient *api.Client,
cluster *topology.Cluster,
node *topology.Node,
svc *topology.Service,
) error {
if !node.IsAgent() {
panic("called wrong method type")
}
if svc.IsMeshGateway {
return nil // handled at startup time for agent-full, but won't be for agent-less
}
var (
logger = s.logger.With("cluster", cluster.Name)
)
reg := &api.AgentServiceRegistration{
ID: svc.ID.Name,
Name: svc.ID.Name,
Port: svc.Port,
Meta: svc.Meta,
}
if cluster.Enterprise {
reg.Namespace = svc.ID.Namespace
reg.Partition = svc.ID.Partition
}
if !svc.DisableServiceMesh {
var upstreams []api.Upstream
for _, u := range svc.Upstreams {
uAPI := api.Upstream{
DestinationPeer: u.Peer,
DestinationName: u.ID.Name,
LocalBindAddress: u.LocalAddress,
LocalBindPort: u.LocalPort,
// Config map[string]interface{} `json:",omitempty" bexpr:"-"`
// MeshGateway MeshGatewayConfig `json:",omitempty"`
}
if cluster.Enterprise {
uAPI.DestinationNamespace = u.ID.Namespace
if u.Peer == "" {
uAPI.DestinationPartition = u.ID.Partition
}
}
upstreams = append(upstreams, uAPI)
}
reg.Connect = &api.AgentServiceConnect{
SidecarService: &api.AgentServiceRegistration{
Proxy: &api.AgentServiceConnectProxyConfig{
Upstreams: upstreams,
},
},
}
}
switch {
case svc.CheckTCP != "":
chk := &api.AgentServiceCheck{
Name: "up",
TCP: svc.CheckTCP,
Interval: "5s",
Timeout: "1s",
}
reg.Checks = append(reg.Checks, chk)
case svc.CheckHTTP != "":
chk := &api.AgentServiceCheck{
Name: "up",
HTTP: svc.CheckHTTP,
Method: "GET",
Interval: "5s",
Timeout: "1s",
}
reg.Checks = append(reg.Checks, chk)
}
// Switch token for every request.
hdr := make(http.Header)
hdr.Set("X-Consul-Token", s.secrets.ReadServiceToken(cluster.Name, svc.ID))
agentClient.SetHeaders(hdr)
RETRY:
if err := agentClient.Agent().ServiceRegister(reg); err != nil {
if isACLNotFound(err) {
time.Sleep(50 * time.Millisecond)
goto RETRY
}
return fmt.Errorf("failed to register service %q to node %q: %w", svc.ID, node.ID(), err)
}
logger.Info("registered service to client agent",
"service", svc.ID.Name,
"node", node.Name,
"namespace", svc.ID.Namespace,
"partition", svc.ID.Partition,
)
return nil
}
func (s *Sprawl) registerServicesForDataplaneInstances(cluster *topology.Cluster) error {
for _, node := range cluster.Nodes {
if !node.RunsWorkloads() || len(node.Services) == 0 || node.Disabled {
continue
}
if !node.IsDataplane() {
continue
}
if err := s.registerCatalogNode(cluster, node); err != nil {
return fmt.Errorf("error registering virtual node: %w", err)
}
for _, svc := range node.Services {
if err := s.registerCatalogService(cluster, node, svc); err != nil {
return fmt.Errorf("error registering service: %w", err)
}
if !svc.DisableServiceMesh {
if err := s.registerCatalogSidecarService(cluster, node, svc); err != nil {
return fmt.Errorf("error registering sidecar service: %w", err)
}
}
}
}
return nil
}
func (s *Sprawl) registerCatalogNode(
cluster *topology.Cluster,
node *topology.Node,
) error {
if !node.IsDataplane() {
panic("called wrong method type")
}
var (
client = s.clients[cluster.Name]
logger = s.logger.With("cluster", cluster.Name)
)
reg := &api.CatalogRegistration{
Node: node.PodName(),
Address: node.LocalAddress(),
NodeMeta: map[string]string{
"dataplane-faux": "1",
},
}
if cluster.Enterprise {
reg.Partition = node.Partition
}
// register synthetic node
RETRY:
if _, err := client.Catalog().Register(reg, nil); err != nil {
if isACLNotFound(err) {
time.Sleep(50 * time.Millisecond)
goto RETRY
}
return fmt.Errorf("error registering virtual node %s: %w", node.ID(), err)
}
logger.Info("virtual node created",
"node", node.ID(),
)
return nil
}
func (s *Sprawl) registerCatalogService(
cluster *topology.Cluster,
node *topology.Node,
svc *topology.Service,
) error {
if !node.IsDataplane() {
panic("called wrong method type")
}
var (
client = s.clients[cluster.Name]
logger = s.logger.With("cluster", cluster.Name)
)
reg := serviceToCatalogRegistration(cluster, node, svc)
RETRY:
if _, err := client.Catalog().Register(reg, nil); err != nil {
if isACLNotFound(err) {
time.Sleep(50 * time.Millisecond)
goto RETRY
}
return fmt.Errorf("error registering service %s to node %s: %w", svc.ID, node.ID(), err)
}
logger.Info("dataplane service created",
"service", svc.ID,
"node", node.ID(),
)
return nil
}
func (s *Sprawl) registerCatalogSidecarService(
cluster *topology.Cluster,
node *topology.Node,
svc *topology.Service,
) error {
if !node.IsDataplane() {
panic("called wrong method type")
}
if svc.DisableServiceMesh {
panic("not valid")
}
var (
client = s.clients[cluster.Name]
logger = s.logger.With("cluster", cluster.Name)
)
pid, reg := serviceToSidecarCatalogRegistration(cluster, node, svc)
RETRY:
if _, err := client.Catalog().Register(reg, nil); err != nil {
if isACLNotFound(err) {
time.Sleep(50 * time.Millisecond)
goto RETRY
}
return fmt.Errorf("error registering service %s to node %s: %w", svc.ID, node.ID(), err)
}
logger.Info("dataplane sidecar service created",
"service", pid,
"node", node.ID(),
)
return nil
}
func serviceToCatalogRegistration(
cluster *topology.Cluster,
node *topology.Node,
svc *topology.Service,
) *api.CatalogRegistration {
reg := &api.CatalogRegistration{
Node: node.PodName(),
SkipNodeUpdate: true,
Service: &api.AgentService{
Kind: api.ServiceKindTypical,
ID: svc.ID.Name,
Service: svc.ID.Name,
Meta: svc.Meta,
Port: svc.Port,
Address: node.LocalAddress(),
},
}
if node.HasPublicAddress() {
reg.TaggedAddresses = map[string]string{
"lan": node.LocalAddress(),
"lan_ipv4": node.LocalAddress(),
"wan": node.PublicAddress(),
"wan_ipv4": node.PublicAddress(),
}
}
if cluster.Enterprise {
reg.Partition = svc.ID.Partition
reg.Service.Namespace = svc.ID.Namespace
reg.Service.Partition = svc.ID.Partition
}
if svc.HasCheck() {
chk := &api.HealthCheck{
Name: "external sync",
// Type: "external-sync",
Status: "passing", // TODO
ServiceID: svc.ID.Name,
ServiceName: svc.ID.Name,
Output: "",
}
if cluster.Enterprise {
chk.Namespace = svc.ID.Namespace
chk.Partition = svc.ID.Partition
}
switch {
case svc.CheckTCP != "":
chk.Definition.TCP = svc.CheckTCP
case svc.CheckHTTP != "":
chk.Definition.HTTP = svc.CheckHTTP
chk.Definition.Method = "GET"
}
reg.Checks = append(reg.Checks, chk)
}
return reg
}
func serviceToSidecarCatalogRegistration(
cluster *topology.Cluster,
node *topology.Node,
svc *topology.Service,
) (topology.ServiceID, *api.CatalogRegistration) {
pid := svc.ID
pid.Name += "-sidecar-proxy"
reg := &api.CatalogRegistration{
Node: node.PodName(),
SkipNodeUpdate: true,
Service: &api.AgentService{
Kind: api.ServiceKindConnectProxy,
ID: pid.Name,
Service: pid.Name,
Meta: svc.Meta,
Port: svc.EnvoyPublicListenerPort,
Address: node.LocalAddress(),
Proxy: &api.AgentServiceConnectProxyConfig{
DestinationServiceName: svc.ID.Name,
DestinationServiceID: svc.ID.Name,
LocalServicePort: svc.Port,
},
},
Checks: []*api.HealthCheck{{
Name: "external sync",
// Type: "external-sync",
Status: "passing", // TODO
ServiceID: pid.Name,
ServiceName: pid.Name,
Definition: api.HealthCheckDefinition{
TCP: fmt.Sprintf("%s:%d", node.LocalAddress(), svc.EnvoyPublicListenerPort),
},
Output: "",
}},
}
if node.HasPublicAddress() {
reg.TaggedAddresses = map[string]string{
"lan": node.LocalAddress(),
"lan_ipv4": node.LocalAddress(),
"wan": node.PublicAddress(),
"wan_ipv4": node.PublicAddress(),
}
}
if cluster.Enterprise {
reg.Partition = pid.Partition
reg.Service.Namespace = pid.Namespace
reg.Service.Partition = pid.Partition
reg.Checks[0].Namespace = pid.Namespace
reg.Checks[0].Partition = pid.Partition
}
for _, u := range svc.Upstreams {
pu := api.Upstream{
DestinationName: u.ID.Name,
DestinationPeer: u.Peer,
LocalBindAddress: u.LocalAddress,
LocalBindPort: u.LocalPort,
}
if cluster.Enterprise {
pu.DestinationNamespace = u.ID.Namespace
if u.Peer == "" {
pu.DestinationPartition = u.ID.Partition
}
}
reg.Service.Proxy.Upstreams = append(reg.Service.Proxy.Upstreams, pu)
}
return pid, reg
}