consul/testing/deployer/sprawl/catalog.go

671 lines
16 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package sprawl
import (
"context"
"fmt"
"net/http"
"time"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/proto-public/pbresource"
"github.com/hashicorp/consul/testing/deployer/topology"
"github.com/hashicorp/consul/testing/deployer/util"
)
// registerAllServicesToAgents registers services in agent-ful mode
func (s *Sprawl) registerAllServicesToAgents() error {
for _, cluster := range s.topology.Clusters {
if err := s.registerServicesToAgents(cluster); err != nil {
return fmt.Errorf("registerServicesToAgents[%s]: %w", cluster.Name, err)
}
}
return nil
}
func (s *Sprawl) syncAllServicesForDataplaneInstances() error {
for _, cluster := range s.topology.Clusters {
if err := s.syncWorkloadsForDataplaneInstances(cluster); err != nil {
return fmt.Errorf("syncWorkloadsForDataplaneInstances[%s]: %w", cluster.Name, err)
}
}
return nil
}
func (s *Sprawl) registerServicesToAgents(cluster *topology.Cluster) error {
for _, node := range cluster.Nodes {
if !node.RunsWorkloads() || len(node.Workloads) == 0 || node.Disabled {
continue
}
if !node.IsAgent() {
continue
}
agentClient, err := util.ProxyAPIClient(
node.LocalProxyPort(),
node.LocalAddress(),
8500,
"", /*token will be in request*/
)
if err != nil {
return err
}
for _, wrk := range node.Workloads {
if err := s.registerAgentService(agentClient, cluster, node, wrk); err != nil {
return err
}
}
}
return nil
}
func (s *Sprawl) registerAgentService(
agentClient *api.Client,
cluster *topology.Cluster,
node *topology.Node,
wrk *topology.Workload,
) error {
if !node.IsAgent() {
panic("called wrong method type")
}
if wrk.IsMeshGateway {
return nil // handled at startup time for agent-ful, but won't be for agent-less
}
var (
logger = s.logger.With("cluster", cluster.Name)
)
reg := &api.AgentServiceRegistration{
ID: wrk.ID.Name,
Name: wrk.ID.Name,
Port: wrk.Port,
Meta: wrk.Meta,
}
if cluster.Enterprise {
reg.Namespace = wrk.ID.Namespace
reg.Partition = wrk.ID.Partition
}
if !wrk.DisableServiceMesh {
var upstreams []api.Upstream
for _, us := range wrk.Upstreams {
uAPI := api.Upstream{
DestinationPeer: us.Peer,
DestinationName: us.ID.Name,
LocalBindAddress: us.LocalAddress,
LocalBindPort: us.LocalPort,
// Config map[string]interface{} `json:",omitempty" bexpr:"-"`
// MeshGateway MeshGatewayConfig `json:",omitempty"`
}
if cluster.Enterprise {
uAPI.DestinationNamespace = us.ID.Namespace
if us.Peer == "" {
uAPI.DestinationPartition = us.ID.Partition
}
}
upstreams = append(upstreams, uAPI)
}
reg.Connect = &api.AgentServiceConnect{
SidecarService: &api.AgentServiceRegistration{
Proxy: &api.AgentServiceConnectProxyConfig{
Upstreams: upstreams,
},
},
}
}
switch {
case wrk.CheckTCP != "":
chk := &api.AgentServiceCheck{
Name: "up",
TCP: wrk.CheckTCP,
Interval: "5s",
Timeout: "1s",
}
reg.Checks = append(reg.Checks, chk)
case wrk.CheckHTTP != "":
chk := &api.AgentServiceCheck{
Name: "up",
HTTP: wrk.CheckHTTP,
Method: "GET",
Interval: "5s",
Timeout: "1s",
}
reg.Checks = append(reg.Checks, chk)
}
// Switch token for every request.
hdr := make(http.Header)
hdr.Set("X-Consul-Token", s.secrets.ReadWorkloadToken(cluster.Name, wrk.ID))
agentClient.SetHeaders(hdr)
RETRY:
if err := agentClient.Agent().ServiceRegister(reg); err != nil {
if isACLNotFound(err) {
time.Sleep(50 * time.Millisecond)
goto RETRY
}
return fmt.Errorf("failed to register workload %q to node %q: %w", wrk.ID, node.ID(), err)
}
logger.Debug("registered workload to client agent",
"workload", wrk.ID.Name,
"node", node.Name,
"namespace", wrk.ID.Namespace,
"partition", wrk.ID.Partition,
)
return nil
}
// syncWorkloadsForDataplaneInstances register/deregister services in the given cluster
func (s *Sprawl) syncWorkloadsForDataplaneInstances(cluster *topology.Cluster) error {
// registerWorkloadToNode is called when node is not disabled
registerWorkloadToNode := func(node *topology.Node, wrk *topology.Workload) error {
if err := s.registerCatalogServiceV1(cluster, node, wrk); err != nil {
return fmt.Errorf("error registering service: %w", err)
}
if !wrk.DisableServiceMesh {
if err := s.registerCatalogSidecarServiceV1(cluster, node, wrk); err != nil {
return fmt.Errorf("error registering sidecar service: %w", err)
}
}
return nil
}
// deregisterWorkloadFromNode is called when node is disabled
deregisterWorkloadFromNode := func(node *topology.Node, wrk *topology.Workload) error {
if err := s.deregisterCatalogServiceV1(cluster, node, wrk); err != nil {
return fmt.Errorf("error deregistering service: %w", err)
}
if !wrk.DisableServiceMesh {
if err := s.deregisterCatalogSidecarServiceV1(cluster, node, wrk); err != nil {
return fmt.Errorf("error deregistering sidecar service: %w", err)
}
}
return nil
}
var syncWorkload func(node *topology.Node, wrk *topology.Workload) error
for _, node := range cluster.Nodes {
if !node.RunsWorkloads() || len(node.Workloads) == 0 {
continue
}
if !node.IsDataplane() {
continue
}
// Register virtual node service first if node is not disabled
if !node.Disabled {
if err := s.registerCatalogNode(cluster, node); err != nil {
return fmt.Errorf("error registering virtual node: %w", err)
}
}
// Register/deregister services on the node
for _, wrk := range node.Workloads {
if !node.Disabled {
syncWorkload = registerWorkloadToNode
} else {
syncWorkload = deregisterWorkloadFromNode
}
if err := syncWorkload(node, wrk); err != nil {
return err
}
}
// Deregister the virtual node if node is disabled
if node.Disabled {
if err := s.deregisterCatalogNode(cluster, node); err != nil {
return fmt.Errorf("error deregistering virtual node: %w", err)
}
}
}
return nil
}
func (s *Sprawl) registerCatalogNode(
cluster *topology.Cluster,
node *topology.Node,
) error {
return s.registerCatalogNodeV1(cluster, node)
}
func (s *Sprawl) deregisterCatalogNode(
cluster *topology.Cluster,
node *topology.Node,
) error {
return s.deregisterCatalogNodeV1(cluster, node)
}
func (s *Sprawl) writeResource(cluster *topology.Cluster, res *pbresource.Resource) (*pbresource.Resource, error) {
var (
client = s.getResourceClient(cluster.Name)
logger = s.logger.With("cluster", cluster.Name)
)
ctx := s.getManagementTokenContext(context.Background(), cluster.Name)
RETRY:
wrote, err := client.Write(ctx, &pbresource.WriteRequest{
Resource: res,
})
if err != nil {
if isACLNotFound(err) { // TODO: is this right for v2?
time.Sleep(50 * time.Millisecond)
goto RETRY
}
return nil, fmt.Errorf("error creating resource %s: %w", util.IDToString(res.Id), err)
}
logger.Info("resource upserted", "id", util.IDToString(res.Id))
return wrote.Resource, nil
}
func (s *Sprawl) registerCatalogNodeV1(
cluster *topology.Cluster,
node *topology.Node,
) error {
if !node.IsDataplane() {
panic("called wrong method type")
}
var (
client = s.clients[cluster.Name]
logger = s.logger.With("cluster", cluster.Name)
)
reg := &api.CatalogRegistration{
Node: node.PodName(),
Address: node.LocalAddress(),
NodeMeta: map[string]string{
"dataplane-faux": "1",
},
}
if cluster.Enterprise {
reg.Partition = node.Partition
}
// register synthetic node
RETRY:
if _, err := client.Catalog().Register(reg, nil); err != nil {
if isACLNotFound(err) {
time.Sleep(50 * time.Millisecond)
goto RETRY
}
return fmt.Errorf("error registering virtual node %s: %w", node.ID(), err)
}
logger.Debug("virtual node created",
"node", node.ID(),
)
return nil
}
func (s *Sprawl) deregisterCatalogNodeV1(
cluster *topology.Cluster,
node *topology.Node,
) error {
if !node.IsDataplane() {
panic("called wrong method type")
}
var (
client = s.clients[cluster.Name]
logger = s.logger.With("cluster", cluster.Name)
)
dereg := &api.CatalogDeregistration{
Node: node.PodName(),
Address: node.LocalAddress(),
}
if cluster.Enterprise {
dereg.Partition = node.Partition
}
// deregister synthetic node
RETRY:
if _, err := client.Catalog().Deregister(dereg, nil); err != nil {
if isACLNotFound(err) {
time.Sleep(50 * time.Millisecond)
goto RETRY
}
return fmt.Errorf("error deregistering virtual node %s: %w", node.ID(), err)
}
logger.Info("virtual node removed",
"node", node.ID(),
)
return nil
}
func (s *Sprawl) deregisterCatalogServiceV1(
cluster *topology.Cluster,
node *topology.Node,
wrk *topology.Workload,
) error {
if !node.IsDataplane() {
panic("called wrong method type")
}
var (
client = s.clients[cluster.Name]
logger = s.logger.With("cluster", cluster.Name)
)
dereg := &api.CatalogDeregistration{
Node: node.PodName(),
ServiceID: wrk.ID.Name,
}
RETRY:
if _, err := client.Catalog().Deregister(dereg, nil); err != nil {
if isACLNotFound(err) {
time.Sleep(50 * time.Millisecond)
goto RETRY
}
return fmt.Errorf("error deregistering service %s at node %s: %w", wrk.ID, node.ID(), err)
}
logger.Info("dataplane service removed",
"service", wrk.ID,
"node", node.ID(),
)
return nil
}
func (s *Sprawl) registerCatalogServiceV1(
cluster *topology.Cluster,
node *topology.Node,
wrk *topology.Workload,
) error {
if !node.IsDataplane() {
panic("called wrong method type")
}
var (
client = s.clients[cluster.Name]
logger = s.logger.With("cluster", cluster.Name)
)
reg := workloadToCatalogRegistration(cluster, node, wrk)
RETRY:
if _, err := client.Catalog().Register(reg, nil); err != nil {
if isACLNotFound(err) {
time.Sleep(50 * time.Millisecond)
goto RETRY
}
return fmt.Errorf("error registering service %s to node %s: %w", wrk.ID, node.ID(), err)
}
logger.Debug("dataplane service created",
"service", wrk.ID,
"node", node.ID(),
)
return nil
}
func (s *Sprawl) deregisterCatalogSidecarServiceV1(
cluster *topology.Cluster,
node *topology.Node,
wrk *topology.Workload,
) error {
if !node.IsDataplane() {
panic("called wrong method type")
}
if wrk.DisableServiceMesh {
panic("not valid")
}
var (
client = s.clients[cluster.Name]
logger = s.logger.With("cluster", cluster.Name)
)
pid := wrk.ID
pid.Name += "-sidecar-proxy"
dereg := &api.CatalogDeregistration{
Node: node.PodName(),
ServiceID: pid.Name,
}
RETRY:
if _, err := client.Catalog().Deregister(dereg, nil); err != nil {
if isACLNotFound(err) {
time.Sleep(50 * time.Millisecond)
goto RETRY
}
return fmt.Errorf("error deregistering service %s to node %s: %w", wrk.ID, node.ID(), err)
}
logger.Info("dataplane sidecar service removed",
"service", pid,
"node", node.ID(),
)
return nil
}
func (s *Sprawl) registerCatalogSidecarServiceV1(
cluster *topology.Cluster,
node *topology.Node,
wrk *topology.Workload,
) error {
if !node.IsDataplane() {
panic("called wrong method type")
}
if wrk.DisableServiceMesh {
panic("not valid")
}
var (
client = s.clients[cluster.Name]
logger = s.logger.With("cluster", cluster.Name)
)
pid, reg := workloadToSidecarCatalogRegistration(cluster, node, wrk)
RETRY:
if _, err := client.Catalog().Register(reg, nil); err != nil {
if isACLNotFound(err) {
time.Sleep(50 * time.Millisecond)
goto RETRY
}
return fmt.Errorf("error registering service %s to node %s: %w", wrk.ID, node.ID(), err)
}
logger.Debug("dataplane sidecar service created",
"service", pid,
"node", node.ID(),
)
return nil
}
type Resource[V proto.Message] struct {
Resource *pbresource.Resource
Data V
}
func (r *Resource[V]) Build() (*pbresource.Resource, error) {
anyData, err := anypb.New(r.Data)
if err != nil {
return nil, err
}
r.Resource.Data = anyData
return r.Resource, nil
}
func workloadToCatalogRegistration(
cluster *topology.Cluster,
node *topology.Node,
wrk *topology.Workload,
) *api.CatalogRegistration {
reg := &api.CatalogRegistration{
Node: node.PodName(),
SkipNodeUpdate: true,
Service: &api.AgentService{
Kind: api.ServiceKindTypical,
ID: wrk.ID.Name,
Service: wrk.ID.Name,
Meta: wrk.Meta,
Port: wrk.Port,
Address: node.LocalAddress(),
},
}
if wrk.IsMeshGateway {
reg.Service.Kind = api.ServiceKindMeshGateway
reg.Service.Proxy = &api.AgentServiceConnectProxyConfig{
Config: map[string]interface{}{
"envoy_gateway_no_default_bind": true,
"envoy_gateway_bind_tagged_addresses": true,
},
MeshGateway: api.MeshGatewayConfig{
Mode: api.MeshGatewayModeLocal,
},
}
}
if node.HasPublicAddress() {
reg.TaggedAddresses = map[string]string{
"lan": node.LocalAddress(),
"lan_ipv4": node.LocalAddress(),
"wan": node.PublicAddress(),
"wan_ipv4": node.PublicAddress(),
}
// TODO: not sure what the difference is between these, but with just the
// top-level set, it appeared to not get set in either :/
reg.Service.TaggedAddresses = map[string]api.ServiceAddress{
"lan": {
Address: node.LocalAddress(),
Port: wrk.Port,
},
"lan_ipv4": {
Address: node.LocalAddress(),
Port: wrk.Port,
},
"wan": {
Address: node.PublicAddress(),
Port: wrk.Port,
},
"wan_ipv4": {
Address: node.PublicAddress(),
Port: wrk.Port,
},
}
}
if cluster.Enterprise {
reg.Partition = wrk.ID.Partition
reg.Service.Namespace = wrk.ID.Namespace
reg.Service.Partition = wrk.ID.Partition
}
if wrk.HasCheck() {
chk := &api.HealthCheck{
Name: "external sync",
// Type: "external-sync",
Status: "passing", // TODO
ServiceID: wrk.ID.Name,
ServiceName: wrk.ID.Name,
Output: "",
}
if cluster.Enterprise {
chk.Namespace = wrk.ID.Namespace
chk.Partition = wrk.ID.Partition
}
switch {
case wrk.CheckTCP != "":
chk.Definition.TCP = wrk.CheckTCP
case wrk.CheckHTTP != "":
chk.Definition.HTTP = wrk.CheckHTTP
chk.Definition.Method = "GET"
}
reg.Checks = append(reg.Checks, chk)
}
return reg
}
func workloadToSidecarCatalogRegistration(
cluster *topology.Cluster,
node *topology.Node,
wrk *topology.Workload,
) (topology.ID, *api.CatalogRegistration) {
pid := wrk.ID
pid.Name += "-sidecar-proxy"
reg := &api.CatalogRegistration{
Node: node.PodName(),
SkipNodeUpdate: true,
Service: &api.AgentService{
Kind: api.ServiceKindConnectProxy,
ID: pid.Name,
Service: pid.Name,
Meta: wrk.Meta,
Port: wrk.EnvoyPublicListenerPort,
Address: node.LocalAddress(),
Proxy: &api.AgentServiceConnectProxyConfig{
DestinationServiceName: wrk.ID.Name,
DestinationServiceID: wrk.ID.Name,
LocalServicePort: wrk.Port,
},
},
Checks: []*api.HealthCheck{{
Name: "external sync",
// Type: "external-sync",
Status: "passing", // TODO
ServiceID: pid.Name,
ServiceName: pid.Name,
Definition: api.HealthCheckDefinition{
TCP: fmt.Sprintf("%s:%d", node.LocalAddress(), wrk.EnvoyPublicListenerPort),
},
Output: "",
}},
}
if node.HasPublicAddress() {
reg.TaggedAddresses = map[string]string{
"lan": node.LocalAddress(),
"lan_ipv4": node.LocalAddress(),
"wan": node.PublicAddress(),
"wan_ipv4": node.PublicAddress(),
}
}
if cluster.Enterprise {
reg.Partition = pid.Partition
reg.Service.Namespace = pid.Namespace
reg.Service.Partition = pid.Partition
reg.Checks[0].Namespace = pid.Namespace
reg.Checks[0].Partition = pid.Partition
}
for _, us := range wrk.Upstreams {
pu := api.Upstream{
DestinationName: us.ID.Name,
DestinationPeer: us.Peer,
LocalBindAddress: us.LocalAddress,
LocalBindPort: us.LocalPort,
}
if cluster.Enterprise {
pu.DestinationNamespace = us.ID.Namespace
if us.Peer == "" {
pu.DestinationPartition = us.ID.Partition
}
}
reg.Service.Proxy.Upstreams = append(reg.Service.Proxy.Upstreams, pu)
}
return pid, reg
}