mirror of
https://github.com/status-im/consul.git
synced 2025-01-12 14:55:02 +00:00
NET-5799 - ensure catalog controllers and dependency mappers function correctly for tenancy fields (#19142)
* use bimapper * WIP * clean up * PR feedback
This commit is contained in:
parent
60b75a55f7
commit
6da4798e05
@ -109,7 +109,10 @@ func (r *workloadHealthReconciler) Reconcile(ctx context.Context, rt controller.
|
||||
r.nodeMap.UntrackWorkload(res.Id)
|
||||
}
|
||||
|
||||
workloadHealth, err := getWorkloadHealth(ctx, rt, req.ID)
|
||||
// passing the workload from the response because getWorkloadHealth uses
|
||||
// resourceClient.ListByOwner which requires ownerID have a Uid and this is the
|
||||
// safest way for application and test code to ensure Uid is provided.
|
||||
workloadHealth, err := getWorkloadHealth(ctx, rt, rsp.Resource.Id)
|
||||
if err != nil {
|
||||
// This should be impossible under normal operations and will not be exercised
|
||||
// within the unit tests. This can only fail if the resource service fails
|
||||
@ -199,6 +202,7 @@ func getNodeHealth(ctx context.Context, rt controller.Runtime, nodeRef *pbresour
|
||||
}
|
||||
|
||||
func getWorkloadHealth(ctx context.Context, rt controller.Runtime, workloadRef *pbresource.ID) (pbcatalog.Health, error) {
|
||||
rt.Logger.Trace("getWorkloadHealth", "workloadRef", workloadRef)
|
||||
rsp, err := rt.Client.ListByOwner(ctx, &pbresource.ListByOwnerRequest{
|
||||
Owner: workloadRef,
|
||||
})
|
||||
|
@ -6,7 +6,10 @@ package workloadhealth
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/hashicorp/consul/internal/resource"
|
||||
"google.golang.org/protobuf/testing/protocmp"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/stretchr/testify/suite"
|
||||
@ -44,13 +47,9 @@ var (
|
||||
|
||||
func resourceID(rtype *pbresource.Type, name string) *pbresource.ID {
|
||||
return &pbresource.ID{
|
||||
Type: rtype,
|
||||
Tenancy: &pbresource.Tenancy{
|
||||
Partition: "default",
|
||||
Namespace: "default",
|
||||
PeerName: "local",
|
||||
},
|
||||
Name: name,
|
||||
Type: rtype,
|
||||
Tenancy: resource.DefaultNamespacedTenancy(),
|
||||
Name: name,
|
||||
}
|
||||
}
|
||||
|
||||
@ -167,7 +166,8 @@ func (suite *workloadHealthControllerTestSuite) testReconcileWithNode(nodeHealth
|
||||
reqs, err := suite.mapper.MapNodeToWorkloads(context.Background(), suite.runtime, node)
|
||||
require.NoError(suite.T(), err)
|
||||
require.Len(suite.T(), reqs, 1)
|
||||
prototest.AssertDeepEqual(suite.T(), reqs[0].ID, workload.Id)
|
||||
protocmp.Transform()
|
||||
prototest.AssertDeepEqual(suite.T(), workload.Id, reqs[0].ID, protocmp.IgnoreFields(workload.Id, "uid"))
|
||||
|
||||
suite.T().Cleanup(func() {
|
||||
// future calls to reconcile would normally have done this as the resource was
|
||||
@ -388,6 +388,7 @@ func (suite *workloadHealthControllerTestSuite) TestReconcileNotFound() {
|
||||
WithData(suite.T(), workloadData("test-node")).
|
||||
// don't write this because then in the call to reconcile the resource
|
||||
// would be found and defeat the purpose of the tes
|
||||
WithTenancy(resource.DefaultNamespacedTenancy()).
|
||||
Build()
|
||||
|
||||
node := resourcetest.Resource(pbcatalog.NodeType, "test-node").
|
||||
@ -508,7 +509,7 @@ func (suite *workloadHealthControllerTestSuite) TestController() {
|
||||
// Wait for reconciliation to occur and mark the workload as passing.
|
||||
suite.waitForReconciliation(workload.Id, "HEALTH_PASSING")
|
||||
|
||||
// Simulate a node unhealty
|
||||
// Simulate a node unhealthy
|
||||
suite.injectNodeWithStatus("test-node", pbcatalog.Health_HEALTH_WARNING)
|
||||
|
||||
// Wait for reconciliation to occur and mark the workload as warning
|
||||
@ -545,18 +546,19 @@ func (suite *workloadHealthControllerTestSuite) TestController() {
|
||||
func (suite *workloadHealthControllerTestSuite) waitForReconciliation(id *pbresource.ID, reason string) {
|
||||
suite.T().Helper()
|
||||
|
||||
retry.Run(suite.T(), func(r *retry.R) {
|
||||
rsp, err := suite.client.Read(context.Background(), &pbresource.ReadRequest{
|
||||
Id: id,
|
||||
})
|
||||
require.NoError(r, err)
|
||||
retry.RunWith(&retry.Timer{Wait: 100 * time.Millisecond, Timeout: 5 * time.Second},
|
||||
suite.T(), func(r *retry.R) {
|
||||
rsp, err := suite.client.Read(context.Background(), &pbresource.ReadRequest{
|
||||
Id: id,
|
||||
})
|
||||
require.NoError(r, err)
|
||||
|
||||
status, found := rsp.Resource.Status[StatusKey]
|
||||
require.True(r, found)
|
||||
require.Equal(r, rsp.Resource.Generation, status.ObservedGeneration)
|
||||
require.Len(r, status.Conditions, 1)
|
||||
require.Equal(r, reason, status.Conditions[0].Reason)
|
||||
})
|
||||
status, found := rsp.Resource.Status[StatusKey]
|
||||
require.True(r, found)
|
||||
require.Equal(r, rsp.Resource.Generation, status.ObservedGeneration)
|
||||
require.Len(r, status.Conditions, 1)
|
||||
require.Equal(r, reason, status.Conditions[0].Reason)
|
||||
})
|
||||
}
|
||||
|
||||
func TestWorkloadHealthController(t *testing.T) {
|
||||
|
@ -5,24 +5,20 @@ package nodemapper
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/hashicorp/consul/internal/controller"
|
||||
"github.com/hashicorp/consul/internal/resource"
|
||||
"github.com/hashicorp/consul/internal/resource/mappers/bimapper"
|
||||
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v2beta1"
|
||||
"github.com/hashicorp/consul/proto-public/pbresource"
|
||||
)
|
||||
|
||||
type NodeMapper struct {
|
||||
lock sync.Mutex
|
||||
nodesToWorkloads map[string][]controller.Request
|
||||
workloadsToNodes map[string]string
|
||||
b *bimapper.Mapper
|
||||
}
|
||||
|
||||
func New() *NodeMapper {
|
||||
return &NodeMapper{
|
||||
workloadsToNodes: make(map[string]string),
|
||||
nodesToWorkloads: make(map[string][]controller.Request),
|
||||
b: bimapper.New(pbcatalog.WorkloadType, pbcatalog.NodeType),
|
||||
}
|
||||
}
|
||||
|
||||
@ -39,68 +35,20 @@ func (m *NodeMapper) NodeIDFromWorkload(workload *pbresource.Resource, workloadD
|
||||
// MapNodeToWorkloads will take a Node resource and return controller requests
|
||||
// for all Workloads associated with the Node.
|
||||
func (m *NodeMapper) MapNodeToWorkloads(_ context.Context, _ controller.Runtime, res *pbresource.Resource) ([]controller.Request, error) {
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
return m.nodesToWorkloads[res.Id.Name], nil
|
||||
ids := m.b.ItemIDsForLink(res.Id)
|
||||
return controller.MakeRequests(pbcatalog.WorkloadType, ids), nil
|
||||
}
|
||||
|
||||
// TrackWorkload instructs the NodeMapper to associate the given workload
|
||||
// ID with the given node ID.
|
||||
func (m *NodeMapper) TrackWorkload(workloadID *pbresource.ID, nodeID *pbresource.ID) {
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
|
||||
if previousNode, found := m.workloadsToNodes[workloadID.Name]; found && previousNode == nodeID.Name {
|
||||
return
|
||||
} else if found {
|
||||
// the node association is being changed
|
||||
m.untrackWorkloadFromNode(workloadID, previousNode)
|
||||
}
|
||||
|
||||
// Now set up the latest tracking
|
||||
m.nodesToWorkloads[nodeID.Name] = append(m.nodesToWorkloads[nodeID.Name], controller.Request{ID: workloadID})
|
||||
m.workloadsToNodes[workloadID.Name] = nodeID.Name
|
||||
m.b.TrackItem(workloadID, []resource.ReferenceOrID{
|
||||
nodeID,
|
||||
})
|
||||
}
|
||||
|
||||
// UntrackWorkload will cause the node mapper to forget about the specified
|
||||
// workload if it is currently tracking it.
|
||||
func (m *NodeMapper) UntrackWorkload(workloadID *pbresource.ID) {
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
|
||||
node, found := m.workloadsToNodes[workloadID.Name]
|
||||
if !found {
|
||||
return
|
||||
}
|
||||
m.untrackWorkloadFromNode(workloadID, node)
|
||||
}
|
||||
|
||||
// untrackWorkloadFromNode will disassociate the specified workload and node.
|
||||
// This method will clean up unnecessary tracking entries if the node name
|
||||
// is no longer associated with any workloads.
|
||||
func (m *NodeMapper) untrackWorkloadFromNode(workloadID *pbresource.ID, node string) {
|
||||
foundIdx := -1
|
||||
for idx, req := range m.nodesToWorkloads[node] {
|
||||
if resource.EqualID(req.ID, workloadID) {
|
||||
foundIdx = idx
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if foundIdx != -1 {
|
||||
workloads := m.nodesToWorkloads[node]
|
||||
l := len(workloads)
|
||||
|
||||
if l == 1 {
|
||||
delete(m.nodesToWorkloads, node)
|
||||
} else if foundIdx == l-1 {
|
||||
m.nodesToWorkloads[node] = workloads[:foundIdx]
|
||||
} else if foundIdx == 0 {
|
||||
m.nodesToWorkloads[node] = workloads[1:]
|
||||
} else {
|
||||
m.nodesToWorkloads[node] = append(workloads[:foundIdx], workloads[foundIdx+1:]...)
|
||||
}
|
||||
}
|
||||
|
||||
delete(m.workloadsToNodes, workloadID.Name)
|
||||
m.b.UntrackItem(workloadID)
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user