test/deployer: add the method of deregistering services (#19525)

This commit is contained in:
cskh 2023-11-07 17:29:13 -05:00 committed by GitHub
parent 48d7d4a0fe
commit 8d6545ec43
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 258 additions and 81 deletions

View File

@ -204,7 +204,7 @@ func Test_Snapshot_Restore_Agentless(t *testing.T) {
// Add a new static-server
cfg = sp.Config()
cluster = cfg.Cluster("dc1")
cluster.Nodes[3].Disabled = false // client 3 -- static-server
cluster.Nodes[3].Disabled = false // client 3 -- new static-server
require.NoError(t, sp.Relaunch(cfg))
// Ensure the static-client connected to static-server

View File

@ -182,7 +182,7 @@ func (s *Sprawl) assignIPAddresses() error {
return fmt.Errorf("unknown network %q", addr.Network)
}
addr.IPAddress = net.IPByIndex(node.Index)
s.logger.Info("assign addr", "node", node.Name, "addr", addr.IPAddress)
s.logger.Info("assign addr", "node", node.Name, "addr", addr.IPAddress, "enabled", !node.Disabled)
}
}
}
@ -260,8 +260,11 @@ func (s *Sprawl) initConsulServers() error {
return fmt.Errorf("populateInitialConfigEntries[%s]: %w", cluster.Name, err)
}
if err := s.populateInitialResources(cluster); err != nil {
return fmt.Errorf("populateInitialResources[%s]: %w", cluster.Name, err)
if cluster.EnableV2 {
// Resources are available only in V2
if err := s.populateInitialResources(cluster); err != nil {
return fmt.Errorf("populateInitialResources[%s]: %w", cluster.Name, err)
}
}
if err := s.createAnonymousToken(cluster); err != nil {
@ -300,8 +303,8 @@ func (s *Sprawl) createFirstTime() error {
return fmt.Errorf("createAllServiceTokens: %w", err)
}
if err := s.registerAllServicesForDataplaneInstances(); err != nil {
return fmt.Errorf("registerAllServicesForDataplaneInstances: %w", err)
if err := s.syncAllServicesForDataplaneInstances(); err != nil {
return fmt.Errorf("syncAllServicesForDataplaneInstances: %w", err)
}
// We can do this ahead, because we've incrementally run terraform as
@ -368,8 +371,8 @@ func (s *Sprawl) preRegenTasks() error {
return fmt.Errorf("createAllServiceTokens: %w", err)
}
if err := s.registerAllServicesForDataplaneInstances(); err != nil {
return fmt.Errorf("registerAllServicesForDataplaneInstances: %w", err)
if err := s.syncAllServicesForDataplaneInstances(); err != nil {
return fmt.Errorf("syncAllServicesForDataplaneInstances: %w", err)
}
return nil

View File

@ -21,6 +21,7 @@ import (
"github.com/hashicorp/consul/testing/deployer/util"
)
// registerAllServicesToAgents registers services in agent-ful mode
func (s *Sprawl) registerAllServicesToAgents() error {
for _, cluster := range s.topology.Clusters {
if err := s.registerServicesToAgents(cluster); err != nil {
@ -30,10 +31,10 @@ func (s *Sprawl) registerAllServicesToAgents() error {
return nil
}
func (s *Sprawl) registerAllServicesForDataplaneInstances() error {
func (s *Sprawl) syncAllServicesForDataplaneInstances() error {
for _, cluster := range s.topology.Clusters {
if err := s.registerServicesForDataplaneInstances(cluster); err != nil {
return fmt.Errorf("registerServicesForDataplaneInstances[%s]: %w", cluster.Name, err)
if err := s.syncServicesForDataplaneInstances(cluster); err != nil {
return fmt.Errorf("syncServicesForDataplaneInstances[%s]: %w", cluster.Name, err)
}
}
return nil
@ -86,7 +87,7 @@ func (s *Sprawl) registerAgentService(
}
if svc.IsMeshGateway {
return nil // handled at startup time for agent-full, but won't be for agent-less
return nil // handled at startup time for agent-ful, but won't be for agent-less
}
var (
@ -176,11 +177,94 @@ RETRY:
return nil
}
func (s *Sprawl) registerServicesForDataplaneInstances(cluster *topology.Cluster) error {
// syncServicesForDataplaneInstances register/deregister services in the given cluster
func (s *Sprawl) syncServicesForDataplaneInstances(cluster *topology.Cluster) error {
identityInfo := make(map[topology.ServiceID]*Resource[*pbauth.WorkloadIdentity])
// registerServiceAtNode is called when node is not disabled
registerServiceAtNode := func(node *topology.Node, svc *topology.Service) error {
if node.IsV2() {
pending := serviceInstanceToResources(node, svc)
workloadID := topology.NewServiceID(svc.WorkloadIdentity, svc.ID.Namespace, svc.ID.Partition)
if _, ok := identityInfo[workloadID]; !ok {
identityInfo[workloadID] = pending.WorkloadIdentity
}
// Write workload
res, err := pending.Workload.Build()
if err != nil {
return fmt.Errorf("error serializing resource %s: %w", util.IDToString(pending.Workload.Resource.Id), err)
}
workload, err := s.writeResource(cluster, res)
if err != nil {
return err
}
// Write check linked to workload
for _, check := range pending.HealthStatuses {
check.Resource.Owner = workload.Id
res, err := check.Build()
if err != nil {
return fmt.Errorf("error serializing resource %s: %w", util.IDToString(check.Resource.Id), err)
}
if _, err := s.writeResource(cluster, res); err != nil {
return err
}
}
// maybe write destinations
if pending.Destinations != nil {
res, err := pending.Destinations.Build()
if err != nil {
return fmt.Errorf("error serializing resource %s: %w", util.IDToString(pending.Destinations.Resource.Id), err)
}
if _, err := s.writeResource(cluster, res); err != nil {
return err
}
}
if pending.ProxyConfiguration != nil {
res, err := pending.ProxyConfiguration.Build()
if err != nil {
return fmt.Errorf("error serializing resource %s: %w", util.IDToString(pending.ProxyConfiguration.Resource.Id), err)
}
if _, err := s.writeResource(cluster, res); err != nil {
return err
}
}
} else {
if err := s.registerCatalogServiceV1(cluster, node, svc); err != nil {
return fmt.Errorf("error registering service: %w", err)
}
if !svc.DisableServiceMesh {
if err := s.registerCatalogSidecarServiceV1(cluster, node, svc); err != nil {
return fmt.Errorf("error registering sidecar service: %w", err)
}
}
}
return nil
}
// deregisterServiceAtNode is called when node is disabled
deregisterServiceAtNode := func(node *topology.Node, svc *topology.Service) error {
if node.IsV2() {
// TODO: implement deregister services for v2
panic("deregister services is not implemented for V2")
} else {
if err := s.deregisterCatalogServiceV1(cluster, node, svc); err != nil {
return fmt.Errorf("error deregistering service: %w", err)
}
if !svc.DisableServiceMesh {
if err := s.deregisterCatalogSidecarServiceV1(cluster, node, svc); err != nil {
return fmt.Errorf("error deregistering sidecar service: %w", err)
}
}
}
return nil
}
var syncService func(node *topology.Node, svc *topology.Service) error
for _, node := range cluster.Nodes {
if !node.RunsWorkloads() || len(node.Services) == 0 || node.Disabled {
if !node.RunsWorkloads() || len(node.Services) == 0 {
continue
}
@ -188,82 +272,42 @@ func (s *Sprawl) registerServicesForDataplaneInstances(cluster *topology.Cluster
continue
}
if err := s.registerCatalogNode(cluster, node); err != nil {
return fmt.Errorf("error registering virtual node: %w", err)
// Register virtual node service first if node is not disabled
if !node.Disabled {
if err := s.registerCatalogNode(cluster, node); err != nil {
return fmt.Errorf("error registering virtual node: %w", err)
}
}
// Register/deregister services on the node
for _, svc := range node.Services {
if node.IsV2() {
pending := serviceInstanceToResources(node, svc)
workloadID := topology.NewServiceID(svc.WorkloadIdentity, svc.ID.Namespace, svc.ID.Partition)
if _, ok := identityInfo[workloadID]; !ok {
identityInfo[workloadID] = pending.WorkloadIdentity
}
// Write workload
res, err := pending.Workload.Build()
if err != nil {
return fmt.Errorf("error serializing resource %s: %w", util.IDToString(pending.Workload.Resource.Id), err)
}
workload, err := s.writeResource(cluster, res)
if err != nil {
return err
}
// Write check linked to workload
for _, check := range pending.HealthStatuses {
check.Resource.Owner = workload.Id
res, err := check.Build()
if err != nil {
return fmt.Errorf("error serializing resource %s: %w", util.IDToString(check.Resource.Id), err)
}
if _, err := s.writeResource(cluster, res); err != nil {
return err
}
}
// maybe write destinations
if pending.Destinations != nil {
res, err := pending.Destinations.Build()
if err != nil {
return fmt.Errorf("error serializing resource %s: %w", util.IDToString(pending.Destinations.Resource.Id), err)
}
if _, err := s.writeResource(cluster, res); err != nil {
return err
}
}
if pending.ProxyConfiguration != nil {
res, err := pending.ProxyConfiguration.Build()
if err != nil {
return fmt.Errorf("error serializing resource %s: %w", util.IDToString(pending.ProxyConfiguration.Resource.Id), err)
}
if _, err := s.writeResource(cluster, res); err != nil {
return err
}
}
if !node.Disabled {
syncService = registerServiceAtNode
} else {
if err := s.registerCatalogServiceV1(cluster, node, svc); err != nil {
return fmt.Errorf("error registering service: %w", err)
}
if !svc.DisableServiceMesh {
if err := s.registerCatalogSidecarServiceV1(cluster, node, svc); err != nil {
return fmt.Errorf("error registering sidecar service: %w", err)
}
}
syncService = deregisterServiceAtNode
}
syncService(node, svc)
}
// Deregister the virtual node if node is disabled
if node.Disabled {
if err := s.deregisterCatalogNode(cluster, node); err != nil {
return fmt.Errorf("error deregistering virtual node: %w", err)
}
}
}
for _, identity := range identityInfo {
res, err := identity.Build()
if err != nil {
return fmt.Errorf("error serializing resource %s: %w", util.IDToString(identity.Resource.Id), err)
}
if _, err := s.writeResource(cluster, res); err != nil {
return err
}
}
if cluster.EnableV2 {
for _, identity := range identityInfo {
res, err := identity.Build()
if err != nil {
return fmt.Errorf("error serializing resource %s: %w", util.IDToString(identity.Resource.Id), err)
}
if _, err := s.writeResource(cluster, res); err != nil {
return err
}
}
for id, svcData := range cluster.Services {
svcInfo := &Resource[*pbcatalog.Service]{
Resource: &pbresource.Resource{
@ -311,6 +355,16 @@ func (s *Sprawl) registerCatalogNode(
return s.registerCatalogNodeV1(cluster, node)
}
func (s *Sprawl) deregisterCatalogNode(
cluster *topology.Cluster,
node *topology.Node,
) error {
if node.IsV2() {
panic("deregister V2 node is not implemented")
}
return s.deregisterCatalogNodeV1(cluster, node)
}
func (s *Sprawl) registerCatalogNodeV2(
cluster *topology.Cluster,
node *topology.Node,
@ -413,6 +467,82 @@ RETRY:
return nil
}
func (s *Sprawl) deregisterCatalogNodeV1(
cluster *topology.Cluster,
node *topology.Node,
) error {
if !node.IsDataplane() {
panic("called wrong method type")
}
var (
client = s.clients[cluster.Name]
logger = s.logger.With("cluster", cluster.Name)
)
dereg := &api.CatalogDeregistration{
Node: node.PodName(),
Address: node.LocalAddress(),
}
if cluster.Enterprise {
dereg.Partition = node.Partition
}
// deregister synthetic node
RETRY:
if _, err := client.Catalog().Deregister(dereg, nil); err != nil {
if isACLNotFound(err) {
time.Sleep(50 * time.Millisecond)
goto RETRY
}
return fmt.Errorf("error deregistering virtual node %s: %w", node.ID(), err)
}
logger.Info("virtual node removed",
"node", node.ID(),
)
return nil
}
func (s *Sprawl) deregisterCatalogServiceV1(
cluster *topology.Cluster,
node *topology.Node,
svc *topology.Service,
) error {
if !node.IsDataplane() {
panic("called wrong method type")
}
if node.IsV2() {
panic("don't call this")
}
var (
client = s.clients[cluster.Name]
logger = s.logger.With("cluster", cluster.Name)
)
dereg := &api.CatalogDeregistration{
Node: node.PodName(),
ServiceID: svc.ID.Name,
}
RETRY:
if _, err := client.Catalog().Deregister(dereg, nil); err != nil {
if isACLNotFound(err) {
time.Sleep(50 * time.Millisecond)
goto RETRY
}
return fmt.Errorf("error deregistering service %s at node %s: %w", svc.ID, node.ID(), err)
}
logger.Info("dataplane service removed",
"service", svc.ID,
"node", node.ID(),
)
return nil
}
func (s *Sprawl) registerCatalogServiceV1(
cluster *topology.Cluster,
node *topology.Node,
@ -449,6 +579,50 @@ RETRY:
return nil
}
func (s *Sprawl) deregisterCatalogSidecarServiceV1(
cluster *topology.Cluster,
node *topology.Node,
svc *topology.Service,
) error {
if !node.IsDataplane() {
panic("called wrong method type")
}
if svc.DisableServiceMesh {
panic("not valid")
}
if node.IsV2() {
panic("don't call this")
}
var (
client = s.clients[cluster.Name]
logger = s.logger.With("cluster", cluster.Name)
)
pid := svc.ID
pid.Name += "-sidecar-proxy"
dereg := &api.CatalogDeregistration{
Node: node.PodName(),
ServiceID: pid.Name,
}
RETRY:
if _, err := client.Catalog().Deregister(dereg, nil); err != nil {
if isACLNotFound(err) {
time.Sleep(50 * time.Millisecond)
goto RETRY
}
return fmt.Errorf("error deregistering service %s to node %s: %w", svc.ID, node.ID(), err)
}
logger.Info("dataplane sidecar service removed",
"service", pid,
"node", node.ID(),
)
return nil
}
func (s *Sprawl) registerCatalogSidecarServiceV1(
cluster *topology.Cluster,
node *topology.Node,