mirror of https://github.com/status-im/consul.git
671 lines
16 KiB
Go
671 lines
16 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: BUSL-1.1
|
|
|
|
package sprawl
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net/http"
|
|
"time"
|
|
|
|
"google.golang.org/protobuf/proto"
|
|
"google.golang.org/protobuf/types/known/anypb"
|
|
|
|
"github.com/hashicorp/consul/api"
|
|
"github.com/hashicorp/consul/proto-public/pbresource"
|
|
"github.com/hashicorp/consul/testing/deployer/topology"
|
|
"github.com/hashicorp/consul/testing/deployer/util"
|
|
)
|
|
|
|
// registerAllServicesToAgents registers services in agent-ful mode
|
|
func (s *Sprawl) registerAllServicesToAgents() error {
|
|
for _, cluster := range s.topology.Clusters {
|
|
if err := s.registerServicesToAgents(cluster); err != nil {
|
|
return fmt.Errorf("registerServicesToAgents[%s]: %w", cluster.Name, err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *Sprawl) syncAllServicesForDataplaneInstances() error {
|
|
for _, cluster := range s.topology.Clusters {
|
|
if err := s.syncWorkloadsForDataplaneInstances(cluster); err != nil {
|
|
return fmt.Errorf("syncWorkloadsForDataplaneInstances[%s]: %w", cluster.Name, err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *Sprawl) registerServicesToAgents(cluster *topology.Cluster) error {
|
|
for _, node := range cluster.Nodes {
|
|
if !node.RunsWorkloads() || len(node.Workloads) == 0 || node.Disabled {
|
|
continue
|
|
}
|
|
|
|
if !node.IsAgent() {
|
|
continue
|
|
}
|
|
|
|
agentClient, err := util.ProxyAPIClient(
|
|
node.LocalProxyPort(),
|
|
node.LocalAddress(),
|
|
8500,
|
|
"", /*token will be in request*/
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, wrk := range node.Workloads {
|
|
if err := s.registerAgentService(agentClient, cluster, node, wrk); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Sprawl) registerAgentService(
|
|
agentClient *api.Client,
|
|
cluster *topology.Cluster,
|
|
node *topology.Node,
|
|
wrk *topology.Workload,
|
|
) error {
|
|
if !node.IsAgent() {
|
|
panic("called wrong method type")
|
|
}
|
|
|
|
if wrk.IsMeshGateway {
|
|
return nil // handled at startup time for agent-ful, but won't be for agent-less
|
|
}
|
|
|
|
var (
|
|
logger = s.logger.With("cluster", cluster.Name)
|
|
)
|
|
|
|
reg := &api.AgentServiceRegistration{
|
|
ID: wrk.ID.Name,
|
|
Name: wrk.ID.Name,
|
|
Port: wrk.Port,
|
|
Meta: wrk.Meta,
|
|
}
|
|
if cluster.Enterprise {
|
|
reg.Namespace = wrk.ID.Namespace
|
|
reg.Partition = wrk.ID.Partition
|
|
}
|
|
|
|
if !wrk.DisableServiceMesh {
|
|
var upstreams []api.Upstream
|
|
for _, us := range wrk.Upstreams {
|
|
uAPI := api.Upstream{
|
|
DestinationPeer: us.Peer,
|
|
DestinationName: us.ID.Name,
|
|
LocalBindAddress: us.LocalAddress,
|
|
LocalBindPort: us.LocalPort,
|
|
// Config map[string]interface{} `json:",omitempty" bexpr:"-"`
|
|
// MeshGateway MeshGatewayConfig `json:",omitempty"`
|
|
}
|
|
if cluster.Enterprise {
|
|
uAPI.DestinationNamespace = us.ID.Namespace
|
|
if us.Peer == "" {
|
|
uAPI.DestinationPartition = us.ID.Partition
|
|
}
|
|
}
|
|
upstreams = append(upstreams, uAPI)
|
|
}
|
|
reg.Connect = &api.AgentServiceConnect{
|
|
SidecarService: &api.AgentServiceRegistration{
|
|
Proxy: &api.AgentServiceConnectProxyConfig{
|
|
Upstreams: upstreams,
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
switch {
|
|
case wrk.CheckTCP != "":
|
|
chk := &api.AgentServiceCheck{
|
|
Name: "up",
|
|
TCP: wrk.CheckTCP,
|
|
Interval: "5s",
|
|
Timeout: "1s",
|
|
}
|
|
reg.Checks = append(reg.Checks, chk)
|
|
case wrk.CheckHTTP != "":
|
|
chk := &api.AgentServiceCheck{
|
|
Name: "up",
|
|
HTTP: wrk.CheckHTTP,
|
|
Method: "GET",
|
|
Interval: "5s",
|
|
Timeout: "1s",
|
|
}
|
|
reg.Checks = append(reg.Checks, chk)
|
|
}
|
|
|
|
// Switch token for every request.
|
|
hdr := make(http.Header)
|
|
hdr.Set("X-Consul-Token", s.secrets.ReadWorkloadToken(cluster.Name, wrk.ID))
|
|
agentClient.SetHeaders(hdr)
|
|
|
|
RETRY:
|
|
if err := agentClient.Agent().ServiceRegister(reg); err != nil {
|
|
if isACLNotFound(err) {
|
|
time.Sleep(50 * time.Millisecond)
|
|
goto RETRY
|
|
}
|
|
return fmt.Errorf("failed to register workload %q to node %q: %w", wrk.ID, node.ID(), err)
|
|
}
|
|
|
|
logger.Debug("registered workload to client agent",
|
|
"workload", wrk.ID.Name,
|
|
"node", node.Name,
|
|
"namespace", wrk.ID.Namespace,
|
|
"partition", wrk.ID.Partition,
|
|
)
|
|
|
|
return nil
|
|
}
|
|
|
|
// syncWorkloadsForDataplaneInstances register/deregister services in the given cluster
|
|
func (s *Sprawl) syncWorkloadsForDataplaneInstances(cluster *topology.Cluster) error {
|
|
// registerWorkloadToNode is called when node is not disabled
|
|
registerWorkloadToNode := func(node *topology.Node, wrk *topology.Workload) error {
|
|
if err := s.registerCatalogServiceV1(cluster, node, wrk); err != nil {
|
|
return fmt.Errorf("error registering service: %w", err)
|
|
}
|
|
if !wrk.DisableServiceMesh {
|
|
if err := s.registerCatalogSidecarServiceV1(cluster, node, wrk); err != nil {
|
|
return fmt.Errorf("error registering sidecar service: %w", err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// deregisterWorkloadFromNode is called when node is disabled
|
|
deregisterWorkloadFromNode := func(node *topology.Node, wrk *topology.Workload) error {
|
|
if err := s.deregisterCatalogServiceV1(cluster, node, wrk); err != nil {
|
|
return fmt.Errorf("error deregistering service: %w", err)
|
|
}
|
|
if !wrk.DisableServiceMesh {
|
|
if err := s.deregisterCatalogSidecarServiceV1(cluster, node, wrk); err != nil {
|
|
return fmt.Errorf("error deregistering sidecar service: %w", err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
var syncWorkload func(node *topology.Node, wrk *topology.Workload) error
|
|
|
|
for _, node := range cluster.Nodes {
|
|
if !node.RunsWorkloads() || len(node.Workloads) == 0 {
|
|
continue
|
|
}
|
|
|
|
if !node.IsDataplane() {
|
|
continue
|
|
}
|
|
|
|
// Register virtual node service first if node is not disabled
|
|
if !node.Disabled {
|
|
if err := s.registerCatalogNode(cluster, node); err != nil {
|
|
return fmt.Errorf("error registering virtual node: %w", err)
|
|
}
|
|
}
|
|
|
|
// Register/deregister services on the node
|
|
for _, wrk := range node.Workloads {
|
|
if !node.Disabled {
|
|
syncWorkload = registerWorkloadToNode
|
|
} else {
|
|
syncWorkload = deregisterWorkloadFromNode
|
|
}
|
|
if err := syncWorkload(node, wrk); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Deregister the virtual node if node is disabled
|
|
if node.Disabled {
|
|
if err := s.deregisterCatalogNode(cluster, node); err != nil {
|
|
return fmt.Errorf("error deregistering virtual node: %w", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Sprawl) registerCatalogNode(
|
|
cluster *topology.Cluster,
|
|
node *topology.Node,
|
|
) error {
|
|
return s.registerCatalogNodeV1(cluster, node)
|
|
}
|
|
|
|
func (s *Sprawl) deregisterCatalogNode(
|
|
cluster *topology.Cluster,
|
|
node *topology.Node,
|
|
) error {
|
|
return s.deregisterCatalogNodeV1(cluster, node)
|
|
}
|
|
|
|
func (s *Sprawl) writeResource(cluster *topology.Cluster, res *pbresource.Resource) (*pbresource.Resource, error) {
|
|
var (
|
|
client = s.getResourceClient(cluster.Name)
|
|
logger = s.logger.With("cluster", cluster.Name)
|
|
)
|
|
|
|
ctx := s.getManagementTokenContext(context.Background(), cluster.Name)
|
|
RETRY:
|
|
wrote, err := client.Write(ctx, &pbresource.WriteRequest{
|
|
Resource: res,
|
|
})
|
|
if err != nil {
|
|
if isACLNotFound(err) { // TODO: is this right for v2?
|
|
time.Sleep(50 * time.Millisecond)
|
|
goto RETRY
|
|
}
|
|
return nil, fmt.Errorf("error creating resource %s: %w", util.IDToString(res.Id), err)
|
|
}
|
|
|
|
logger.Info("resource upserted", "id", util.IDToString(res.Id))
|
|
return wrote.Resource, nil
|
|
}
|
|
|
|
func (s *Sprawl) registerCatalogNodeV1(
|
|
cluster *topology.Cluster,
|
|
node *topology.Node,
|
|
) error {
|
|
if !node.IsDataplane() {
|
|
panic("called wrong method type")
|
|
}
|
|
|
|
var (
|
|
client = s.clients[cluster.Name]
|
|
logger = s.logger.With("cluster", cluster.Name)
|
|
)
|
|
|
|
reg := &api.CatalogRegistration{
|
|
Node: node.PodName(),
|
|
Address: node.LocalAddress(),
|
|
NodeMeta: map[string]string{
|
|
"dataplane-faux": "1",
|
|
},
|
|
}
|
|
if cluster.Enterprise {
|
|
reg.Partition = node.Partition
|
|
}
|
|
|
|
// register synthetic node
|
|
RETRY:
|
|
if _, err := client.Catalog().Register(reg, nil); err != nil {
|
|
if isACLNotFound(err) {
|
|
time.Sleep(50 * time.Millisecond)
|
|
goto RETRY
|
|
}
|
|
return fmt.Errorf("error registering virtual node %s: %w", node.ID(), err)
|
|
}
|
|
|
|
logger.Debug("virtual node created",
|
|
"node", node.ID(),
|
|
)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Sprawl) deregisterCatalogNodeV1(
|
|
cluster *topology.Cluster,
|
|
node *topology.Node,
|
|
) error {
|
|
if !node.IsDataplane() {
|
|
panic("called wrong method type")
|
|
}
|
|
|
|
var (
|
|
client = s.clients[cluster.Name]
|
|
logger = s.logger.With("cluster", cluster.Name)
|
|
)
|
|
|
|
dereg := &api.CatalogDeregistration{
|
|
Node: node.PodName(),
|
|
Address: node.LocalAddress(),
|
|
}
|
|
if cluster.Enterprise {
|
|
dereg.Partition = node.Partition
|
|
}
|
|
|
|
// deregister synthetic node
|
|
RETRY:
|
|
if _, err := client.Catalog().Deregister(dereg, nil); err != nil {
|
|
if isACLNotFound(err) {
|
|
time.Sleep(50 * time.Millisecond)
|
|
goto RETRY
|
|
}
|
|
return fmt.Errorf("error deregistering virtual node %s: %w", node.ID(), err)
|
|
}
|
|
|
|
logger.Info("virtual node removed",
|
|
"node", node.ID(),
|
|
)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Sprawl) deregisterCatalogServiceV1(
|
|
cluster *topology.Cluster,
|
|
node *topology.Node,
|
|
wrk *topology.Workload,
|
|
) error {
|
|
if !node.IsDataplane() {
|
|
panic("called wrong method type")
|
|
}
|
|
|
|
var (
|
|
client = s.clients[cluster.Name]
|
|
logger = s.logger.With("cluster", cluster.Name)
|
|
)
|
|
|
|
dereg := &api.CatalogDeregistration{
|
|
Node: node.PodName(),
|
|
ServiceID: wrk.ID.Name,
|
|
}
|
|
RETRY:
|
|
if _, err := client.Catalog().Deregister(dereg, nil); err != nil {
|
|
if isACLNotFound(err) {
|
|
time.Sleep(50 * time.Millisecond)
|
|
goto RETRY
|
|
}
|
|
return fmt.Errorf("error deregistering service %s at node %s: %w", wrk.ID, node.ID(), err)
|
|
}
|
|
|
|
logger.Info("dataplane service removed",
|
|
"service", wrk.ID,
|
|
"node", node.ID(),
|
|
)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Sprawl) registerCatalogServiceV1(
|
|
cluster *topology.Cluster,
|
|
node *topology.Node,
|
|
wrk *topology.Workload,
|
|
) error {
|
|
if !node.IsDataplane() {
|
|
panic("called wrong method type")
|
|
}
|
|
|
|
var (
|
|
client = s.clients[cluster.Name]
|
|
logger = s.logger.With("cluster", cluster.Name)
|
|
)
|
|
|
|
reg := workloadToCatalogRegistration(cluster, node, wrk)
|
|
|
|
RETRY:
|
|
if _, err := client.Catalog().Register(reg, nil); err != nil {
|
|
if isACLNotFound(err) {
|
|
time.Sleep(50 * time.Millisecond)
|
|
goto RETRY
|
|
}
|
|
return fmt.Errorf("error registering service %s to node %s: %w", wrk.ID, node.ID(), err)
|
|
}
|
|
|
|
logger.Debug("dataplane service created",
|
|
"service", wrk.ID,
|
|
"node", node.ID(),
|
|
)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Sprawl) deregisterCatalogSidecarServiceV1(
|
|
cluster *topology.Cluster,
|
|
node *topology.Node,
|
|
wrk *topology.Workload,
|
|
) error {
|
|
if !node.IsDataplane() {
|
|
panic("called wrong method type")
|
|
}
|
|
if wrk.DisableServiceMesh {
|
|
panic("not valid")
|
|
}
|
|
|
|
var (
|
|
client = s.clients[cluster.Name]
|
|
logger = s.logger.With("cluster", cluster.Name)
|
|
)
|
|
|
|
pid := wrk.ID
|
|
pid.Name += "-sidecar-proxy"
|
|
dereg := &api.CatalogDeregistration{
|
|
Node: node.PodName(),
|
|
ServiceID: pid.Name,
|
|
}
|
|
|
|
RETRY:
|
|
if _, err := client.Catalog().Deregister(dereg, nil); err != nil {
|
|
if isACLNotFound(err) {
|
|
time.Sleep(50 * time.Millisecond)
|
|
goto RETRY
|
|
}
|
|
return fmt.Errorf("error deregistering service %s to node %s: %w", wrk.ID, node.ID(), err)
|
|
}
|
|
|
|
logger.Info("dataplane sidecar service removed",
|
|
"service", pid,
|
|
"node", node.ID(),
|
|
)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Sprawl) registerCatalogSidecarServiceV1(
|
|
cluster *topology.Cluster,
|
|
node *topology.Node,
|
|
wrk *topology.Workload,
|
|
) error {
|
|
if !node.IsDataplane() {
|
|
panic("called wrong method type")
|
|
}
|
|
if wrk.DisableServiceMesh {
|
|
panic("not valid")
|
|
}
|
|
|
|
var (
|
|
client = s.clients[cluster.Name]
|
|
logger = s.logger.With("cluster", cluster.Name)
|
|
)
|
|
|
|
pid, reg := workloadToSidecarCatalogRegistration(cluster, node, wrk)
|
|
RETRY:
|
|
if _, err := client.Catalog().Register(reg, nil); err != nil {
|
|
if isACLNotFound(err) {
|
|
time.Sleep(50 * time.Millisecond)
|
|
goto RETRY
|
|
}
|
|
return fmt.Errorf("error registering service %s to node %s: %w", wrk.ID, node.ID(), err)
|
|
}
|
|
|
|
logger.Debug("dataplane sidecar service created",
|
|
"service", pid,
|
|
"node", node.ID(),
|
|
)
|
|
|
|
return nil
|
|
}
|
|
|
|
type Resource[V proto.Message] struct {
|
|
Resource *pbresource.Resource
|
|
Data V
|
|
}
|
|
|
|
func (r *Resource[V]) Build() (*pbresource.Resource, error) {
|
|
anyData, err := anypb.New(r.Data)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
r.Resource.Data = anyData
|
|
return r.Resource, nil
|
|
}
|
|
|
|
func workloadToCatalogRegistration(
|
|
cluster *topology.Cluster,
|
|
node *topology.Node,
|
|
wrk *topology.Workload,
|
|
) *api.CatalogRegistration {
|
|
reg := &api.CatalogRegistration{
|
|
Node: node.PodName(),
|
|
SkipNodeUpdate: true,
|
|
Service: &api.AgentService{
|
|
Kind: api.ServiceKindTypical,
|
|
ID: wrk.ID.Name,
|
|
Service: wrk.ID.Name,
|
|
Meta: wrk.Meta,
|
|
Port: wrk.Port,
|
|
Address: node.LocalAddress(),
|
|
},
|
|
}
|
|
if wrk.IsMeshGateway {
|
|
reg.Service.Kind = api.ServiceKindMeshGateway
|
|
reg.Service.Proxy = &api.AgentServiceConnectProxyConfig{
|
|
Config: map[string]interface{}{
|
|
"envoy_gateway_no_default_bind": true,
|
|
"envoy_gateway_bind_tagged_addresses": true,
|
|
},
|
|
MeshGateway: api.MeshGatewayConfig{
|
|
Mode: api.MeshGatewayModeLocal,
|
|
},
|
|
}
|
|
}
|
|
if node.HasPublicAddress() {
|
|
reg.TaggedAddresses = map[string]string{
|
|
"lan": node.LocalAddress(),
|
|
"lan_ipv4": node.LocalAddress(),
|
|
"wan": node.PublicAddress(),
|
|
"wan_ipv4": node.PublicAddress(),
|
|
}
|
|
// TODO: not sure what the difference is between these, but with just the
|
|
// top-level set, it appeared to not get set in either :/
|
|
reg.Service.TaggedAddresses = map[string]api.ServiceAddress{
|
|
"lan": {
|
|
Address: node.LocalAddress(),
|
|
Port: wrk.Port,
|
|
},
|
|
"lan_ipv4": {
|
|
Address: node.LocalAddress(),
|
|
Port: wrk.Port,
|
|
},
|
|
"wan": {
|
|
Address: node.PublicAddress(),
|
|
Port: wrk.Port,
|
|
},
|
|
"wan_ipv4": {
|
|
Address: node.PublicAddress(),
|
|
Port: wrk.Port,
|
|
},
|
|
}
|
|
}
|
|
if cluster.Enterprise {
|
|
reg.Partition = wrk.ID.Partition
|
|
reg.Service.Namespace = wrk.ID.Namespace
|
|
reg.Service.Partition = wrk.ID.Partition
|
|
}
|
|
|
|
if wrk.HasCheck() {
|
|
chk := &api.HealthCheck{
|
|
Name: "external sync",
|
|
// Type: "external-sync",
|
|
Status: "passing", // TODO
|
|
ServiceID: wrk.ID.Name,
|
|
ServiceName: wrk.ID.Name,
|
|
Output: "",
|
|
}
|
|
if cluster.Enterprise {
|
|
chk.Namespace = wrk.ID.Namespace
|
|
chk.Partition = wrk.ID.Partition
|
|
}
|
|
switch {
|
|
case wrk.CheckTCP != "":
|
|
chk.Definition.TCP = wrk.CheckTCP
|
|
case wrk.CheckHTTP != "":
|
|
chk.Definition.HTTP = wrk.CheckHTTP
|
|
chk.Definition.Method = "GET"
|
|
}
|
|
reg.Checks = append(reg.Checks, chk)
|
|
}
|
|
return reg
|
|
}
|
|
|
|
func workloadToSidecarCatalogRegistration(
|
|
cluster *topology.Cluster,
|
|
node *topology.Node,
|
|
wrk *topology.Workload,
|
|
) (topology.ID, *api.CatalogRegistration) {
|
|
pid := wrk.ID
|
|
pid.Name += "-sidecar-proxy"
|
|
reg := &api.CatalogRegistration{
|
|
Node: node.PodName(),
|
|
SkipNodeUpdate: true,
|
|
Service: &api.AgentService{
|
|
Kind: api.ServiceKindConnectProxy,
|
|
ID: pid.Name,
|
|
Service: pid.Name,
|
|
Meta: wrk.Meta,
|
|
Port: wrk.EnvoyPublicListenerPort,
|
|
Address: node.LocalAddress(),
|
|
Proxy: &api.AgentServiceConnectProxyConfig{
|
|
DestinationServiceName: wrk.ID.Name,
|
|
DestinationServiceID: wrk.ID.Name,
|
|
LocalServicePort: wrk.Port,
|
|
},
|
|
},
|
|
Checks: []*api.HealthCheck{{
|
|
Name: "external sync",
|
|
// Type: "external-sync",
|
|
Status: "passing", // TODO
|
|
ServiceID: pid.Name,
|
|
ServiceName: pid.Name,
|
|
Definition: api.HealthCheckDefinition{
|
|
TCP: fmt.Sprintf("%s:%d", node.LocalAddress(), wrk.EnvoyPublicListenerPort),
|
|
},
|
|
Output: "",
|
|
}},
|
|
}
|
|
if node.HasPublicAddress() {
|
|
reg.TaggedAddresses = map[string]string{
|
|
"lan": node.LocalAddress(),
|
|
"lan_ipv4": node.LocalAddress(),
|
|
"wan": node.PublicAddress(),
|
|
"wan_ipv4": node.PublicAddress(),
|
|
}
|
|
}
|
|
if cluster.Enterprise {
|
|
reg.Partition = pid.Partition
|
|
reg.Service.Namespace = pid.Namespace
|
|
reg.Service.Partition = pid.Partition
|
|
reg.Checks[0].Namespace = pid.Namespace
|
|
reg.Checks[0].Partition = pid.Partition
|
|
}
|
|
|
|
for _, us := range wrk.Upstreams {
|
|
pu := api.Upstream{
|
|
DestinationName: us.ID.Name,
|
|
DestinationPeer: us.Peer,
|
|
LocalBindAddress: us.LocalAddress,
|
|
LocalBindPort: us.LocalPort,
|
|
}
|
|
if cluster.Enterprise {
|
|
pu.DestinationNamespace = us.ID.Namespace
|
|
if us.Peer == "" {
|
|
pu.DestinationPartition = us.ID.Partition
|
|
}
|
|
}
|
|
reg.Service.Proxy.Upstreams = append(reg.Service.Proxy.Upstreams, pu)
|
|
}
|
|
|
|
return pid, reg
|
|
}
|