From 6da4798e05b8a9c2267f607ab9efaf0d1d79abe6 Mon Sep 17 00:00:00 2001 From: John Murret Date: Wed, 11 Oct 2023 20:07:50 -0600 Subject: [PATCH] NET-5799 - ensure catalog controllers and dependency mappers function correctly for tenancy fields (#19142) * use bimapper * WIP * clean up * PR feedback --- .../controllers/workloadhealth/controller.go | 6 +- .../workloadhealth/controller_test.go | 42 +++++------ .../mappers/nodemapper/node_mapper.go | 70 +++---------------- 3 files changed, 36 insertions(+), 82 deletions(-) diff --git a/internal/catalog/internal/controllers/workloadhealth/controller.go b/internal/catalog/internal/controllers/workloadhealth/controller.go index ab25346454..05cfd36a84 100644 --- a/internal/catalog/internal/controllers/workloadhealth/controller.go +++ b/internal/catalog/internal/controllers/workloadhealth/controller.go @@ -109,7 +109,10 @@ func (r *workloadHealthReconciler) Reconcile(ctx context.Context, rt controller. r.nodeMap.UntrackWorkload(res.Id) } - workloadHealth, err := getWorkloadHealth(ctx, rt, req.ID) + // passing the workload from the response because getWorkloadHealth uses + // resourceClient.ListByOwner which requires ownerID have a Uid and this is the + // safest way for application and test code to ensure Uid is provided. + workloadHealth, err := getWorkloadHealth(ctx, rt, rsp.Resource.Id) if err != nil { // This should be impossible under normal operations and will not be exercised // within the unit tests. This can only fail if the resource service fails @@ -199,6 +202,7 @@ func getNodeHealth(ctx context.Context, rt controller.Runtime, nodeRef *pbresour } func getWorkloadHealth(ctx context.Context, rt controller.Runtime, workloadRef *pbresource.ID) (pbcatalog.Health, error) { + rt.Logger.Trace("getWorkloadHealth", "workloadRef", workloadRef) rsp, err := rt.Client.ListByOwner(ctx, &pbresource.ListByOwnerRequest{ Owner: workloadRef, }) diff --git a/internal/catalog/internal/controllers/workloadhealth/controller_test.go b/internal/catalog/internal/controllers/workloadhealth/controller_test.go index 56ce526c80..9a00a940a0 100644 --- a/internal/catalog/internal/controllers/workloadhealth/controller_test.go +++ b/internal/catalog/internal/controllers/workloadhealth/controller_test.go @@ -6,7 +6,10 @@ package workloadhealth import ( "context" "fmt" + "github.com/hashicorp/consul/internal/resource" + "google.golang.org/protobuf/testing/protocmp" "testing" + "time" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -44,13 +47,9 @@ var ( func resourceID(rtype *pbresource.Type, name string) *pbresource.ID { return &pbresource.ID{ - Type: rtype, - Tenancy: &pbresource.Tenancy{ - Partition: "default", - Namespace: "default", - PeerName: "local", - }, - Name: name, + Type: rtype, + Tenancy: resource.DefaultNamespacedTenancy(), + Name: name, } } @@ -167,7 +166,8 @@ func (suite *workloadHealthControllerTestSuite) testReconcileWithNode(nodeHealth reqs, err := suite.mapper.MapNodeToWorkloads(context.Background(), suite.runtime, node) require.NoError(suite.T(), err) require.Len(suite.T(), reqs, 1) - prototest.AssertDeepEqual(suite.T(), reqs[0].ID, workload.Id) + protocmp.Transform() + prototest.AssertDeepEqual(suite.T(), workload.Id, reqs[0].ID, protocmp.IgnoreFields(workload.Id, "uid")) suite.T().Cleanup(func() { // future calls to reconcile would normally have done this as the resource was @@ -388,6 +388,7 @@ func (suite *workloadHealthControllerTestSuite) TestReconcileNotFound() { WithData(suite.T(), workloadData("test-node")). // don't write this because then in the call to reconcile the resource // would be found and defeat the purpose of the tes + WithTenancy(resource.DefaultNamespacedTenancy()). Build() node := resourcetest.Resource(pbcatalog.NodeType, "test-node"). @@ -508,7 +509,7 @@ func (suite *workloadHealthControllerTestSuite) TestController() { // Wait for reconciliation to occur and mark the workload as passing. suite.waitForReconciliation(workload.Id, "HEALTH_PASSING") - // Simulate a node unhealty + // Simulate a node unhealthy suite.injectNodeWithStatus("test-node", pbcatalog.Health_HEALTH_WARNING) // Wait for reconciliation to occur and mark the workload as warning @@ -545,18 +546,19 @@ func (suite *workloadHealthControllerTestSuite) TestController() { func (suite *workloadHealthControllerTestSuite) waitForReconciliation(id *pbresource.ID, reason string) { suite.T().Helper() - retry.Run(suite.T(), func(r *retry.R) { - rsp, err := suite.client.Read(context.Background(), &pbresource.ReadRequest{ - Id: id, - }) - require.NoError(r, err) + retry.RunWith(&retry.Timer{Wait: 100 * time.Millisecond, Timeout: 5 * time.Second}, + suite.T(), func(r *retry.R) { + rsp, err := suite.client.Read(context.Background(), &pbresource.ReadRequest{ + Id: id, + }) + require.NoError(r, err) - status, found := rsp.Resource.Status[StatusKey] - require.True(r, found) - require.Equal(r, rsp.Resource.Generation, status.ObservedGeneration) - require.Len(r, status.Conditions, 1) - require.Equal(r, reason, status.Conditions[0].Reason) - }) + status, found := rsp.Resource.Status[StatusKey] + require.True(r, found) + require.Equal(r, rsp.Resource.Generation, status.ObservedGeneration) + require.Len(r, status.Conditions, 1) + require.Equal(r, reason, status.Conditions[0].Reason) + }) } func TestWorkloadHealthController(t *testing.T) { diff --git a/internal/catalog/internal/mappers/nodemapper/node_mapper.go b/internal/catalog/internal/mappers/nodemapper/node_mapper.go index 4a9a3cfe38..9c17478d76 100644 --- a/internal/catalog/internal/mappers/nodemapper/node_mapper.go +++ b/internal/catalog/internal/mappers/nodemapper/node_mapper.go @@ -5,24 +5,20 @@ package nodemapper import ( "context" - "sync" - "github.com/hashicorp/consul/internal/controller" "github.com/hashicorp/consul/internal/resource" + "github.com/hashicorp/consul/internal/resource/mappers/bimapper" pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v2beta1" "github.com/hashicorp/consul/proto-public/pbresource" ) type NodeMapper struct { - lock sync.Mutex - nodesToWorkloads map[string][]controller.Request - workloadsToNodes map[string]string + b *bimapper.Mapper } func New() *NodeMapper { return &NodeMapper{ - workloadsToNodes: make(map[string]string), - nodesToWorkloads: make(map[string][]controller.Request), + b: bimapper.New(pbcatalog.WorkloadType, pbcatalog.NodeType), } } @@ -39,68 +35,20 @@ func (m *NodeMapper) NodeIDFromWorkload(workload *pbresource.Resource, workloadD // MapNodeToWorkloads will take a Node resource and return controller requests // for all Workloads associated with the Node. func (m *NodeMapper) MapNodeToWorkloads(_ context.Context, _ controller.Runtime, res *pbresource.Resource) ([]controller.Request, error) { - m.lock.Lock() - defer m.lock.Unlock() - return m.nodesToWorkloads[res.Id.Name], nil + ids := m.b.ItemIDsForLink(res.Id) + return controller.MakeRequests(pbcatalog.WorkloadType, ids), nil } // TrackWorkload instructs the NodeMapper to associate the given workload // ID with the given node ID. func (m *NodeMapper) TrackWorkload(workloadID *pbresource.ID, nodeID *pbresource.ID) { - m.lock.Lock() - defer m.lock.Unlock() - - if previousNode, found := m.workloadsToNodes[workloadID.Name]; found && previousNode == nodeID.Name { - return - } else if found { - // the node association is being changed - m.untrackWorkloadFromNode(workloadID, previousNode) - } - - // Now set up the latest tracking - m.nodesToWorkloads[nodeID.Name] = append(m.nodesToWorkloads[nodeID.Name], controller.Request{ID: workloadID}) - m.workloadsToNodes[workloadID.Name] = nodeID.Name + m.b.TrackItem(workloadID, []resource.ReferenceOrID{ + nodeID, + }) } // UntrackWorkload will cause the node mapper to forget about the specified // workload if it is currently tracking it. func (m *NodeMapper) UntrackWorkload(workloadID *pbresource.ID) { - m.lock.Lock() - defer m.lock.Unlock() - - node, found := m.workloadsToNodes[workloadID.Name] - if !found { - return - } - m.untrackWorkloadFromNode(workloadID, node) -} - -// untrackWorkloadFromNode will disassociate the specified workload and node. -// This method will clean up unnecessary tracking entries if the node name -// is no longer associated with any workloads. -func (m *NodeMapper) untrackWorkloadFromNode(workloadID *pbresource.ID, node string) { - foundIdx := -1 - for idx, req := range m.nodesToWorkloads[node] { - if resource.EqualID(req.ID, workloadID) { - foundIdx = idx - break - } - } - - if foundIdx != -1 { - workloads := m.nodesToWorkloads[node] - l := len(workloads) - - if l == 1 { - delete(m.nodesToWorkloads, node) - } else if foundIdx == l-1 { - m.nodesToWorkloads[node] = workloads[:foundIdx] - } else if foundIdx == 0 { - m.nodesToWorkloads[node] = workloads[1:] - } else { - m.nodesToWorkloads[node] = append(workloads[:foundIdx], workloads[foundIdx+1:]...) - } - } - - delete(m.workloadsToNodes, workloadID.Name) + m.b.UntrackItem(workloadID) }