mirror of https://github.com/status-im/consul.git
internal/hcp: prevent write loop on telemetrystate resource updates (#20435)
* internal/hcp: prevent write loop on telemetrystate resource updates * Update controller.go Co-authored-by: Nick Cellino <nick.cellino@hashicorp.com> * internal/hcp: add assertion for looping controller --------- Co-authored-by: Nick Cellino <nick.cellino@hashicorp.com>
This commit is contained in:
parent
c029b20615
commit
9d4ad74a63
|
@ -8,6 +8,7 @@ import (
|
||||||
|
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
"google.golang.org/grpc/status"
|
"google.golang.org/grpc/status"
|
||||||
|
"google.golang.org/protobuf/proto"
|
||||||
"google.golang.org/protobuf/types/known/anypb"
|
"google.golang.org/protobuf/types/known/anypb"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/internal/controller"
|
"github.com/hashicorp/consul/internal/controller"
|
||||||
|
@ -109,20 +110,7 @@ func (r *telemetryStateReconciler) Reconcile(ctx context.Context, rt controller.
|
||||||
state.Metrics.IncludeList = []string{tCfg.MetricsConfig.Filters.String()}
|
state.Metrics.IncludeList = []string{tCfg.MetricsConfig.Filters.String()}
|
||||||
}
|
}
|
||||||
|
|
||||||
stateData, err := anypb.New(state)
|
if err := writeTelemetryStateIfUpdated(ctx, rt, state); err != nil {
|
||||||
if err != nil {
|
|
||||||
rt.Logger.Error("error marshalling telemetry-state data", "error", err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = rt.Client.Write(ctx, &pbresource.WriteRequest{Resource: &pbresource.Resource{
|
|
||||||
Id: &pbresource.ID{
|
|
||||||
Name: "global",
|
|
||||||
Type: pbhcp.TelemetryStateType,
|
|
||||||
},
|
|
||||||
Data: stateData,
|
|
||||||
}})
|
|
||||||
if err != nil {
|
|
||||||
rt.Logger.Error("error updating telemetry-state", "error", err)
|
rt.Logger.Error("error updating telemetry-state", "error", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -153,10 +141,33 @@ func ensureTelemetryStateDeleted(ctx context.Context, rt controller.Runtime) err
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// getLinkResource returns the cluster scoped pbhcp.Link resource. If the resource is not found a nil
|
func writeTelemetryStateIfUpdated(ctx context.Context, rt controller.Runtime, state *pbhcp.TelemetryState) error {
|
||||||
// pointer and no error will be returned.
|
currentState, err := getTelemetryStateResource(ctx, rt)
|
||||||
func getLinkResource(ctx context.Context, rt controller.Runtime) (*types.DecodedLink, error) {
|
if err != nil {
|
||||||
resp, err := rt.Client.Read(ctx, &pbresource.ReadRequest{Id: &pbresource.ID{Name: "global", Type: pbhcp.LinkType}})
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if currentState != nil && proto.Equal(currentState.GetData(), state) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
stateData, err := anypb.New(state)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = rt.Client.Write(ctx, &pbresource.WriteRequest{Resource: &pbresource.Resource{
|
||||||
|
Id: &pbresource.ID{
|
||||||
|
Name: "global",
|
||||||
|
Type: pbhcp.TelemetryStateType,
|
||||||
|
},
|
||||||
|
Data: stateData,
|
||||||
|
}})
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func getGlobalResource(ctx context.Context, rt controller.Runtime, t *pbresource.Type) (*pbresource.Resource, error) {
|
||||||
|
resp, err := rt.Client.Read(ctx, &pbresource.ReadRequest{Id: &pbresource.ID{Name: "global", Type: t}})
|
||||||
switch {
|
switch {
|
||||||
case status.Code(err) == codes.NotFound:
|
case status.Code(err) == codes.NotFound:
|
||||||
return nil, nil
|
return nil, nil
|
||||||
|
@ -164,5 +175,29 @@ func getLinkResource(ctx context.Context, rt controller.Runtime) (*types.Decoded
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return resource.Decode[*pbhcp.Link](resp.GetResource())
|
return resp.GetResource(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// getLinkResource returns the cluster scoped pbhcp.Link resource. If the resource is not found a nil
|
||||||
|
// pointer and no error will be returned.
|
||||||
|
func getLinkResource(ctx context.Context, rt controller.Runtime) (*types.DecodedLink, error) {
|
||||||
|
res, err := getGlobalResource(ctx, rt, pbhcp.LinkType)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if res == nil {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
return resource.Decode[*pbhcp.Link](res)
|
||||||
|
}
|
||||||
|
|
||||||
|
func getTelemetryStateResource(ctx context.Context, rt controller.Runtime) (*types.DecodedTelemetryState, error) {
|
||||||
|
res, err := getGlobalResource(ctx, rt, pbhcp.TelemetryStateType)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if res == nil {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
return resource.Decode[*pbhcp.TelemetryState](res)
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,8 +33,10 @@ type controllerSuite struct {
|
||||||
client *rtest.Client
|
client *rtest.Client
|
||||||
rt controller.Runtime
|
rt controller.Runtime
|
||||||
|
|
||||||
ctl telemetryStateReconciler
|
ctl *controller.TestController
|
||||||
tenancies []*pbresource.Tenancy
|
tenancies []*pbresource.Tenancy
|
||||||
|
|
||||||
|
hcpMock *hcpclient.MockClient
|
||||||
}
|
}
|
||||||
|
|
||||||
func mockHcpClientFn(t *testing.T) (*hcpclient.MockClient, link.HCPClientFn) {
|
func mockHcpClientFn(t *testing.T) (*hcpclient.MockClient, link.HCPClientFn) {
|
||||||
|
@ -55,10 +57,12 @@ func (suite *controllerSuite) SetupTest() {
|
||||||
WithTenancies(suite.tenancies...).
|
WithTenancies(suite.tenancies...).
|
||||||
Run(suite.T())
|
Run(suite.T())
|
||||||
|
|
||||||
suite.rt = controller.Runtime{
|
hcpMock, hcpClientFn := mockHcpClientFn(suite.T())
|
||||||
Client: client,
|
suite.hcpMock = hcpMock
|
||||||
Logger: testutil.Logger(suite.T()),
|
suite.ctl = controller.NewTestController(TelemetryStateController(hcpClientFn), client).
|
||||||
}
|
WithLogger(testutil.Logger(suite.T()))
|
||||||
|
|
||||||
|
suite.rt = suite.ctl.Runtime()
|
||||||
suite.client = rtest.NewClient(client)
|
suite.client = rtest.NewClient(client)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -93,23 +97,12 @@ func (suite *controllerSuite) TestController_Ok() {
|
||||||
mgr.SetRaftLeader(true)
|
mgr.SetRaftLeader(true)
|
||||||
go mgr.Run(suite.ctx)
|
go mgr.Run(suite.ctx)
|
||||||
|
|
||||||
linkData := &pbhcp.Link{
|
link := suite.writeLinkResource()
|
||||||
ClientId: "abc",
|
|
||||||
ClientSecret: "abc",
|
|
||||||
ResourceId: types.GenerateTestResourceID(suite.T()),
|
|
||||||
}
|
|
||||||
|
|
||||||
link := rtest.Resource(pbhcp.LinkType, "global").
|
|
||||||
WithData(suite.T(), linkData).
|
|
||||||
WithStatus(link.StatusKey, &pbresource.Status{Conditions: []*pbresource.Condition{link.ConditionLinked(linkData.ResourceId)}}).
|
|
||||||
Write(suite.T(), suite.client)
|
|
||||||
|
|
||||||
suite.T().Cleanup(suite.deleteResourceFunc(link.Id))
|
|
||||||
|
|
||||||
tsRes := suite.client.WaitForResourceExists(suite.T(), &pbresource.ID{Name: "global", Type: pbhcp.TelemetryStateType})
|
tsRes := suite.client.WaitForResourceExists(suite.T(), &pbresource.ID{Name: "global", Type: pbhcp.TelemetryStateType})
|
||||||
decodedState, err := resource.Decode[*pbhcp.TelemetryState](tsRes)
|
decodedState, err := resource.Decode[*pbhcp.TelemetryState](tsRes)
|
||||||
require.NoError(suite.T(), err)
|
require.NoError(suite.T(), err)
|
||||||
require.Equal(suite.T(), linkData.ResourceId, decodedState.GetData().ResourceId)
|
require.Equal(suite.T(), link.GetData().GetResourceId(), decodedState.GetData().ResourceId)
|
||||||
require.Equal(suite.T(), "xxx", decodedState.GetData().ClientId)
|
require.Equal(suite.T(), "xxx", decodedState.GetData().ClientId)
|
||||||
require.Equal(suite.T(), "http://localhost/test", decodedState.GetData().Metrics.Endpoint)
|
require.Equal(suite.T(), "http://localhost/test", decodedState.GetData().Metrics.Endpoint)
|
||||||
|
|
||||||
|
@ -117,6 +110,27 @@ func (suite *controllerSuite) TestController_Ok() {
|
||||||
suite.client.WaitForDeletion(suite.T(), tsRes.Id)
|
suite.client.WaitForDeletion(suite.T(), tsRes.Id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (suite *controllerSuite) TestReconcile_AvoidReconciliationWriteLoop() {
|
||||||
|
suite.hcpMock.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&hcpclient.TelemetryConfig{
|
||||||
|
MetricsConfig: &hcpclient.MetricsConfig{
|
||||||
|
Endpoint: &url.URL{
|
||||||
|
Scheme: "http",
|
||||||
|
Host: "localhost",
|
||||||
|
Path: "/test",
|
||||||
|
},
|
||||||
|
Labels: map[string]string{"foo": "bar"},
|
||||||
|
Filters: regexp.MustCompile(".*"),
|
||||||
|
},
|
||||||
|
RefreshConfig: &hcpclient.RefreshConfig{},
|
||||||
|
}, nil)
|
||||||
|
link := suite.writeLinkResource()
|
||||||
|
suite.hcpMock.EXPECT().GetObservabilitySecret(mock.Anything).Return("xxx", "yyy", nil)
|
||||||
|
suite.NoError(suite.ctl.Reconcile(context.Background(), controller.Request{ID: link.Id}))
|
||||||
|
tsRes := suite.client.WaitForResourceExists(suite.T(), &pbresource.ID{Name: "global", Type: pbhcp.TelemetryStateType})
|
||||||
|
suite.NoError(suite.ctl.Reconcile(context.Background(), controller.Request{ID: tsRes.Id}))
|
||||||
|
suite.client.RequireVersionUnchanged(suite.T(), tsRes.Id, tsRes.Version)
|
||||||
|
}
|
||||||
|
|
||||||
func (suite *controllerSuite) TestController_LinkingDisabled() {
|
func (suite *controllerSuite) TestController_LinkingDisabled() {
|
||||||
// Run the controller manager
|
// Run the controller manager
|
||||||
mgr := controller.NewManager(suite.client, suite.rt.Logger)
|
mgr := controller.NewManager(suite.client, suite.rt.Logger)
|
||||||
|
@ -138,3 +152,23 @@ func (suite *controllerSuite) TestController_LinkingDisabled() {
|
||||||
|
|
||||||
suite.client.WaitForDeletion(suite.T(), &pbresource.ID{Name: "global", Type: pbhcp.TelemetryStateType})
|
suite.client.WaitForDeletion(suite.T(), &pbresource.ID{Name: "global", Type: pbhcp.TelemetryStateType})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (suite *controllerSuite) writeLinkResource() *types.DecodedLink {
|
||||||
|
suite.T().Helper()
|
||||||
|
|
||||||
|
linkData := &pbhcp.Link{
|
||||||
|
ClientId: "abc",
|
||||||
|
ClientSecret: "abc",
|
||||||
|
ResourceId: types.GenerateTestResourceID(suite.T()),
|
||||||
|
}
|
||||||
|
|
||||||
|
res := rtest.Resource(pbhcp.LinkType, "global").
|
||||||
|
WithData(suite.T(), linkData).
|
||||||
|
WithStatus(link.StatusKey, &pbresource.Status{Conditions: []*pbresource.Condition{link.ConditionLinked(linkData.ResourceId)}}).
|
||||||
|
Write(suite.T(), suite.client)
|
||||||
|
|
||||||
|
suite.T().Cleanup(suite.deleteResourceFunc(res.Id))
|
||||||
|
link, err := resource.Decode[*pbhcp.Link](res)
|
||||||
|
require.NoError(suite.T(), err)
|
||||||
|
return link
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue