From 8d6545ec4399b7c14f6ecdb63298cf17c2f4d3eb Mon Sep 17 00:00:00 2001 From: cskh Date: Tue, 7 Nov 2023 17:29:13 -0500 Subject: [PATCH] test/deployer: add the method of deregistering services (#19525) --- test-integ/connect/snapshot_test.go | 2 +- testing/deployer/sprawl/boot.go | 17 +- testing/deployer/sprawl/catalog.go | 320 +++++++++++++++++++++------- 3 files changed, 258 insertions(+), 81 deletions(-) diff --git a/test-integ/connect/snapshot_test.go b/test-integ/connect/snapshot_test.go index 469562a160..643e64d4b6 100644 --- a/test-integ/connect/snapshot_test.go +++ b/test-integ/connect/snapshot_test.go @@ -204,7 +204,7 @@ func Test_Snapshot_Restore_Agentless(t *testing.T) { // Add a new static-server cfg = sp.Config() cluster = cfg.Cluster("dc1") - cluster.Nodes[3].Disabled = false // client 3 -- static-server + cluster.Nodes[3].Disabled = false // client 3 -- new static-server require.NoError(t, sp.Relaunch(cfg)) // Ensure the static-client connected to static-server diff --git a/testing/deployer/sprawl/boot.go b/testing/deployer/sprawl/boot.go index 3bbb9e864e..f749093fce 100644 --- a/testing/deployer/sprawl/boot.go +++ b/testing/deployer/sprawl/boot.go @@ -182,7 +182,7 @@ func (s *Sprawl) assignIPAddresses() error { return fmt.Errorf("unknown network %q", addr.Network) } addr.IPAddress = net.IPByIndex(node.Index) - s.logger.Info("assign addr", "node", node.Name, "addr", addr.IPAddress) + s.logger.Info("assign addr", "node", node.Name, "addr", addr.IPAddress, "enabled", !node.Disabled) } } } @@ -260,8 +260,11 @@ func (s *Sprawl) initConsulServers() error { return fmt.Errorf("populateInitialConfigEntries[%s]: %w", cluster.Name, err) } - if err := s.populateInitialResources(cluster); err != nil { - return fmt.Errorf("populateInitialResources[%s]: %w", cluster.Name, err) + if cluster.EnableV2 { + // Resources are available only in V2 + if err := s.populateInitialResources(cluster); err != nil { + return fmt.Errorf("populateInitialResources[%s]: %w", cluster.Name, err) + } } if err := s.createAnonymousToken(cluster); err != nil { @@ -300,8 +303,8 @@ func (s *Sprawl) createFirstTime() error { return fmt.Errorf("createAllServiceTokens: %w", err) } - if err := s.registerAllServicesForDataplaneInstances(); err != nil { - return fmt.Errorf("registerAllServicesForDataplaneInstances: %w", err) + if err := s.syncAllServicesForDataplaneInstances(); err != nil { + return fmt.Errorf("syncAllServicesForDataplaneInstances: %w", err) } // We can do this ahead, because we've incrementally run terraform as @@ -368,8 +371,8 @@ func (s *Sprawl) preRegenTasks() error { return fmt.Errorf("createAllServiceTokens: %w", err) } - if err := s.registerAllServicesForDataplaneInstances(); err != nil { - return fmt.Errorf("registerAllServicesForDataplaneInstances: %w", err) + if err := s.syncAllServicesForDataplaneInstances(); err != nil { + return fmt.Errorf("syncAllServicesForDataplaneInstances: %w", err) } return nil diff --git a/testing/deployer/sprawl/catalog.go b/testing/deployer/sprawl/catalog.go index fdc95c9f68..1a11665eaa 100644 --- a/testing/deployer/sprawl/catalog.go +++ b/testing/deployer/sprawl/catalog.go @@ -21,6 +21,7 @@ import ( "github.com/hashicorp/consul/testing/deployer/util" ) +// registerAllServicesToAgents registers services in agent-ful mode func (s *Sprawl) registerAllServicesToAgents() error { for _, cluster := range s.topology.Clusters { if err := s.registerServicesToAgents(cluster); err != nil { @@ -30,10 +31,10 @@ func (s *Sprawl) registerAllServicesToAgents() error { return nil } -func (s *Sprawl) registerAllServicesForDataplaneInstances() error { +func (s *Sprawl) syncAllServicesForDataplaneInstances() error { for _, cluster := range s.topology.Clusters { - if err := s.registerServicesForDataplaneInstances(cluster); err != nil { - return fmt.Errorf("registerServicesForDataplaneInstances[%s]: %w", cluster.Name, err) + if err := s.syncServicesForDataplaneInstances(cluster); err != nil { + return fmt.Errorf("syncServicesForDataplaneInstances[%s]: %w", cluster.Name, err) } } return nil @@ -86,7 +87,7 @@ func (s *Sprawl) registerAgentService( } if svc.IsMeshGateway { - return nil // handled at startup time for agent-full, but won't be for agent-less + return nil // handled at startup time for agent-ful, but won't be for agent-less } var ( @@ -176,11 +177,94 @@ RETRY: return nil } -func (s *Sprawl) registerServicesForDataplaneInstances(cluster *topology.Cluster) error { +// syncServicesForDataplaneInstances register/deregister services in the given cluster +func (s *Sprawl) syncServicesForDataplaneInstances(cluster *topology.Cluster) error { identityInfo := make(map[topology.ServiceID]*Resource[*pbauth.WorkloadIdentity]) + // registerServiceAtNode is called when node is not disabled + registerServiceAtNode := func(node *topology.Node, svc *topology.Service) error { + if node.IsV2() { + pending := serviceInstanceToResources(node, svc) + + workloadID := topology.NewServiceID(svc.WorkloadIdentity, svc.ID.Namespace, svc.ID.Partition) + if _, ok := identityInfo[workloadID]; !ok { + identityInfo[workloadID] = pending.WorkloadIdentity + } + + // Write workload + res, err := pending.Workload.Build() + if err != nil { + return fmt.Errorf("error serializing resource %s: %w", util.IDToString(pending.Workload.Resource.Id), err) + } + workload, err := s.writeResource(cluster, res) + if err != nil { + return err + } + // Write check linked to workload + for _, check := range pending.HealthStatuses { + check.Resource.Owner = workload.Id + res, err := check.Build() + if err != nil { + return fmt.Errorf("error serializing resource %s: %w", util.IDToString(check.Resource.Id), err) + } + if _, err := s.writeResource(cluster, res); err != nil { + return err + } + } + // maybe write destinations + if pending.Destinations != nil { + res, err := pending.Destinations.Build() + if err != nil { + return fmt.Errorf("error serializing resource %s: %w", util.IDToString(pending.Destinations.Resource.Id), err) + } + if _, err := s.writeResource(cluster, res); err != nil { + return err + } + } + if pending.ProxyConfiguration != nil { + res, err := pending.ProxyConfiguration.Build() + if err != nil { + return fmt.Errorf("error serializing resource %s: %w", util.IDToString(pending.ProxyConfiguration.Resource.Id), err) + } + if _, err := s.writeResource(cluster, res); err != nil { + return err + } + } + } else { + if err := s.registerCatalogServiceV1(cluster, node, svc); err != nil { + return fmt.Errorf("error registering service: %w", err) + } + if !svc.DisableServiceMesh { + if err := s.registerCatalogSidecarServiceV1(cluster, node, svc); err != nil { + return fmt.Errorf("error registering sidecar service: %w", err) + } + } + } + return nil + } + + // deregisterServiceAtNode is called when node is disabled + deregisterServiceAtNode := func(node *topology.Node, svc *topology.Service) error { + if node.IsV2() { + // TODO: implement deregister services for v2 + panic("deregister services is not implemented for V2") + } else { + if err := s.deregisterCatalogServiceV1(cluster, node, svc); err != nil { + return fmt.Errorf("error deregistering service: %w", err) + } + if !svc.DisableServiceMesh { + if err := s.deregisterCatalogSidecarServiceV1(cluster, node, svc); err != nil { + return fmt.Errorf("error deregistering sidecar service: %w", err) + } + } + } + return nil + } + + var syncService func(node *topology.Node, svc *topology.Service) error + for _, node := range cluster.Nodes { - if !node.RunsWorkloads() || len(node.Services) == 0 || node.Disabled { + if !node.RunsWorkloads() || len(node.Services) == 0 { continue } @@ -188,82 +272,42 @@ func (s *Sprawl) registerServicesForDataplaneInstances(cluster *topology.Cluster continue } - if err := s.registerCatalogNode(cluster, node); err != nil { - return fmt.Errorf("error registering virtual node: %w", err) + // Register virtual node service first if node is not disabled + if !node.Disabled { + if err := s.registerCatalogNode(cluster, node); err != nil { + return fmt.Errorf("error registering virtual node: %w", err) + } } + // Register/deregister services on the node for _, svc := range node.Services { - if node.IsV2() { - pending := serviceInstanceToResources(node, svc) - - workloadID := topology.NewServiceID(svc.WorkloadIdentity, svc.ID.Namespace, svc.ID.Partition) - if _, ok := identityInfo[workloadID]; !ok { - identityInfo[workloadID] = pending.WorkloadIdentity - } - - // Write workload - res, err := pending.Workload.Build() - if err != nil { - return fmt.Errorf("error serializing resource %s: %w", util.IDToString(pending.Workload.Resource.Id), err) - } - workload, err := s.writeResource(cluster, res) - if err != nil { - return err - } - // Write check linked to workload - for _, check := range pending.HealthStatuses { - check.Resource.Owner = workload.Id - res, err := check.Build() - if err != nil { - return fmt.Errorf("error serializing resource %s: %w", util.IDToString(check.Resource.Id), err) - } - if _, err := s.writeResource(cluster, res); err != nil { - return err - } - } - // maybe write destinations - if pending.Destinations != nil { - res, err := pending.Destinations.Build() - if err != nil { - return fmt.Errorf("error serializing resource %s: %w", util.IDToString(pending.Destinations.Resource.Id), err) - } - if _, err := s.writeResource(cluster, res); err != nil { - return err - } - } - if pending.ProxyConfiguration != nil { - res, err := pending.ProxyConfiguration.Build() - if err != nil { - return fmt.Errorf("error serializing resource %s: %w", util.IDToString(pending.ProxyConfiguration.Resource.Id), err) - } - if _, err := s.writeResource(cluster, res); err != nil { - return err - } - } + if !node.Disabled { + syncService = registerServiceAtNode } else { - if err := s.registerCatalogServiceV1(cluster, node, svc); err != nil { - return fmt.Errorf("error registering service: %w", err) - } - if !svc.DisableServiceMesh { - if err := s.registerCatalogSidecarServiceV1(cluster, node, svc); err != nil { - return fmt.Errorf("error registering sidecar service: %w", err) - } - } + syncService = deregisterServiceAtNode + } + syncService(node, svc) + } + + // Deregister the virtual node if node is disabled + if node.Disabled { + if err := s.deregisterCatalogNode(cluster, node); err != nil { + return fmt.Errorf("error deregistering virtual node: %w", err) } } } - for _, identity := range identityInfo { - res, err := identity.Build() - if err != nil { - return fmt.Errorf("error serializing resource %s: %w", util.IDToString(identity.Resource.Id), err) - } - if _, err := s.writeResource(cluster, res); err != nil { - return err - } - } - if cluster.EnableV2 { + for _, identity := range identityInfo { + res, err := identity.Build() + if err != nil { + return fmt.Errorf("error serializing resource %s: %w", util.IDToString(identity.Resource.Id), err) + } + if _, err := s.writeResource(cluster, res); err != nil { + return err + } + } + for id, svcData := range cluster.Services { svcInfo := &Resource[*pbcatalog.Service]{ Resource: &pbresource.Resource{ @@ -311,6 +355,16 @@ func (s *Sprawl) registerCatalogNode( return s.registerCatalogNodeV1(cluster, node) } +func (s *Sprawl) deregisterCatalogNode( + cluster *topology.Cluster, + node *topology.Node, +) error { + if node.IsV2() { + panic("deregister V2 node is not implemented") + } + return s.deregisterCatalogNodeV1(cluster, node) +} + func (s *Sprawl) registerCatalogNodeV2( cluster *topology.Cluster, node *topology.Node, @@ -413,6 +467,82 @@ RETRY: return nil } +func (s *Sprawl) deregisterCatalogNodeV1( + cluster *topology.Cluster, + node *topology.Node, +) error { + if !node.IsDataplane() { + panic("called wrong method type") + } + + var ( + client = s.clients[cluster.Name] + logger = s.logger.With("cluster", cluster.Name) + ) + + dereg := &api.CatalogDeregistration{ + Node: node.PodName(), + Address: node.LocalAddress(), + } + if cluster.Enterprise { + dereg.Partition = node.Partition + } + + // deregister synthetic node +RETRY: + if _, err := client.Catalog().Deregister(dereg, nil); err != nil { + if isACLNotFound(err) { + time.Sleep(50 * time.Millisecond) + goto RETRY + } + return fmt.Errorf("error deregistering virtual node %s: %w", node.ID(), err) + } + + logger.Info("virtual node removed", + "node", node.ID(), + ) + + return nil +} + +func (s *Sprawl) deregisterCatalogServiceV1( + cluster *topology.Cluster, + node *topology.Node, + svc *topology.Service, +) error { + if !node.IsDataplane() { + panic("called wrong method type") + } + if node.IsV2() { + panic("don't call this") + } + + var ( + client = s.clients[cluster.Name] + logger = s.logger.With("cluster", cluster.Name) + ) + + dereg := &api.CatalogDeregistration{ + Node: node.PodName(), + ServiceID: svc.ID.Name, + } +RETRY: + if _, err := client.Catalog().Deregister(dereg, nil); err != nil { + if isACLNotFound(err) { + time.Sleep(50 * time.Millisecond) + goto RETRY + } + return fmt.Errorf("error deregistering service %s at node %s: %w", svc.ID, node.ID(), err) + } + + logger.Info("dataplane service removed", + "service", svc.ID, + "node", node.ID(), + ) + + return nil +} + func (s *Sprawl) registerCatalogServiceV1( cluster *topology.Cluster, node *topology.Node, @@ -449,6 +579,50 @@ RETRY: return nil } +func (s *Sprawl) deregisterCatalogSidecarServiceV1( + cluster *topology.Cluster, + node *topology.Node, + svc *topology.Service, +) error { + if !node.IsDataplane() { + panic("called wrong method type") + } + if svc.DisableServiceMesh { + panic("not valid") + } + if node.IsV2() { + panic("don't call this") + } + + var ( + client = s.clients[cluster.Name] + logger = s.logger.With("cluster", cluster.Name) + ) + + pid := svc.ID + pid.Name += "-sidecar-proxy" + dereg := &api.CatalogDeregistration{ + Node: node.PodName(), + ServiceID: pid.Name, + } + +RETRY: + if _, err := client.Catalog().Deregister(dereg, nil); err != nil { + if isACLNotFound(err) { + time.Sleep(50 * time.Millisecond) + goto RETRY + } + return fmt.Errorf("error deregistering service %s to node %s: %w", svc.ID, node.ID(), err) + } + + logger.Info("dataplane sidecar service removed", + "service", pid, + "node", node.ID(), + ) + + return nil +} + func (s *Sprawl) registerCatalogSidecarServiceV1( cluster *topology.Cluster, node *topology.Node,