mirror of
https://github.com/status-im/consul.git
synced 2025-01-21 19:20:41 +00:00
5fb6ab6a3a
* Add function to get update channel for watching HCP Link * Add MonitorHCPLink function This function can be called in a goroutine to manage the lifecycle of the HCP manager. * Update HCP Manager config in link monitor before starting This updates HCPMonitorLink so it updates the HCP manager with an HCP client and management token when a Link is upserted. * Let MonitorHCPManager handle lifecycle instead of link controller * Remove cleanup from Link controller and move it to MonitorHCPLink Previously, the Link Controller was responsible for cleaning up the HCP-related files on the file system. This change makes it so MonitorHCPLink handles this cleanup. As a result, we are able to remove the PlacementEachServer placement strategy for the Link controller because it no longer needs to do this per-node cleanup. * Remove HCP Manager dependency from Link Controller The Link controller does not need to have HCP Manager as a dependency anymore, so this removes that dependency in order to simplify the design. * Add Linked prefix to Linked status variables This is in preparation for adding a new status type to the Link resource. * Add new "validated" status type to link resource The link resource controller will now set a "validated" status in addition to the "linked" status. This is needed so that other components (eg the HCP manager) know when the Link is ready to link with HCP. * Fix tests * Handle new 'EndOfSnapshot' WatchList event * Fix watch test * Remove unnecessary config from TestAgent_scadaProvider Since the Scada provider is now started on agent startup regardless of whether a cloud config is provided, this removes the cloud config override from the relevant test. This change is not exactly related to the changes from this PR, but rather is something small and sort of related that was noticed while working on this PR. * Simplify link watch test and remove sleep from link watch This updates the link watch test so that it uses more mocks and does not require setting up the infrastructure for the HCP Link controller. This also removes the time.Sleep delay in the link watcher loop in favor of an error counter. When we receive 10 consecutive errors, we shut down the link watcher loop. * Add better logging for link validation. Remove EndOfSnapshot test. * Refactor link monitor test into a table test * Add some clarifying comments to link monitor * Simplify link watch test * Test a bunch more errors cases in link monitor test * Use exponential backoff instead of errorCounter in LinkWatch * Move link watch and link monitor into a single goroutine called from server.go * Refactor HCP link watcher to use single go-routine. Previously, if the WatchClient errored, we would've never recovered because we never retry to create the stream. With this change, we have a single goroutine that runs for the life of the server agent and if the WatchClient stream ever errors, we retry the creation of the stream with an exponential backoff.
441 lines
13 KiB
Go
441 lines
13 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: BUSL-1.1
|
|
|
|
package hcp
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/mock"
|
|
"github.com/stretchr/testify/require"
|
|
"golang.org/x/net/context"
|
|
"google.golang.org/protobuf/types/known/anypb"
|
|
|
|
"github.com/hashicorp/go-hclog"
|
|
|
|
hcpclient "github.com/hashicorp/consul/agent/hcp/client"
|
|
"github.com/hashicorp/consul/agent/hcp/config"
|
|
"github.com/hashicorp/consul/agent/hcp/scada"
|
|
hcpctl "github.com/hashicorp/consul/internal/hcp"
|
|
pbhcp "github.com/hashicorp/consul/proto-public/pbhcp/v2"
|
|
"github.com/hashicorp/consul/proto-public/pbresource"
|
|
"github.com/hashicorp/consul/sdk/testutil"
|
|
)
|
|
|
|
func TestManager_MonitorHCPLink(t *testing.T) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
t.Cleanup(cancel)
|
|
logger := hclog.New(&hclog.LoggerOptions{Output: io.Discard})
|
|
|
|
mgr := NewManager(
|
|
ManagerConfig{
|
|
Logger: hclog.New(&hclog.LoggerOptions{Output: io.Discard}),
|
|
},
|
|
)
|
|
mockHCPClient := hcpclient.NewMockClient(t)
|
|
mockHcpClientFn := func(_ config.CloudConfig) (hcpclient.Client, error) {
|
|
return mockHCPClient, nil
|
|
}
|
|
loadMgmtTokenFn := func(ctx context.Context, logger hclog.Logger, hcpClient hcpclient.Client, dataDir string) (string, error) {
|
|
return "test-mgmt-token", nil
|
|
}
|
|
|
|
require.False(t, mgr.isRunning())
|
|
updateManagerLifecycle := HCPManagerLifecycleFn(
|
|
mgr, mockHcpClientFn,
|
|
loadMgmtTokenFn, config.CloudConfig{}, "",
|
|
)
|
|
|
|
// Set up a link
|
|
link := pbhcp.Link{
|
|
ResourceId: "abc",
|
|
ClientId: "def",
|
|
ClientSecret: "ghi",
|
|
AccessLevel: pbhcp.AccessLevel_ACCESS_LEVEL_GLOBAL_READ_WRITE,
|
|
}
|
|
linkResource, err := anypb.New(&link)
|
|
require.NoError(t, err)
|
|
updateManagerLifecycle(ctx, logger, &pbresource.WatchEvent{
|
|
Event: &pbresource.WatchEvent_Upsert_{
|
|
Upsert: &pbresource.WatchEvent_Upsert{
|
|
Resource: &pbresource.Resource{
|
|
Id: &pbresource.ID{
|
|
Name: "global",
|
|
Type: pbhcp.LinkType,
|
|
},
|
|
Status: map[string]*pbresource.Status{
|
|
hcpctl.StatusKey: {
|
|
Conditions: []*pbresource.Condition{hcpctl.ConditionValidatedSuccess},
|
|
},
|
|
},
|
|
Data: linkResource,
|
|
},
|
|
},
|
|
},
|
|
})
|
|
|
|
// Validate that the HCP manager is started
|
|
require.True(t, mgr.isRunning())
|
|
}
|
|
|
|
func TestManager_Start(t *testing.T) {
|
|
client := hcpclient.NewMockClient(t)
|
|
statusF := func(ctx context.Context) (hcpclient.ServerStatus, error) {
|
|
return hcpclient.ServerStatus{ID: t.Name()}, nil
|
|
}
|
|
upsertManagementTokenCalled := make(chan struct{}, 1)
|
|
upsertManagementTokenF := func(name, secretID string) error {
|
|
upsertManagementTokenCalled <- struct{}{}
|
|
return nil
|
|
}
|
|
updateCh := make(chan struct{}, 1)
|
|
client.EXPECT().PushServerStatus(mock.Anything, &hcpclient.ServerStatus{ID: t.Name()}).Return(nil).Once()
|
|
|
|
cloudCfg := config.CloudConfig{
|
|
ResourceID: "resource-id",
|
|
NodeID: "node-1",
|
|
ManagementToken: "fake-token",
|
|
}
|
|
scadaM := scada.NewMockProvider(t)
|
|
scadaM.EXPECT().UpdateHCPConfig(cloudCfg).Return(nil).Once()
|
|
scadaM.EXPECT().UpdateMeta(
|
|
map[string]string{
|
|
"consul_server_id": string(cloudCfg.NodeID),
|
|
},
|
|
).Return().Once()
|
|
scadaM.EXPECT().Start().Return(nil)
|
|
|
|
telemetryM := NewMockTelemetryProvider(t)
|
|
telemetryM.EXPECT().Start(
|
|
mock.Anything, &HCPProviderCfg{
|
|
HCPClient: client,
|
|
HCPConfig: &cloudCfg,
|
|
},
|
|
).Return(nil).Once()
|
|
|
|
mgr := NewManager(
|
|
ManagerConfig{
|
|
Logger: hclog.New(&hclog.LoggerOptions{Output: io.Discard}),
|
|
StatusFn: statusF,
|
|
ManagementTokenUpserterFn: upsertManagementTokenF,
|
|
SCADAProvider: scadaM,
|
|
TelemetryProvider: telemetryM,
|
|
},
|
|
)
|
|
mgr.testUpdateSent = updateCh
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
t.Cleanup(cancel)
|
|
|
|
mgr.UpdateConfig(client, cloudCfg)
|
|
mgr.Start(ctx)
|
|
select {
|
|
case <-updateCh:
|
|
case <-time.After(time.Second):
|
|
require.Fail(t, "manager did not send update in expected time")
|
|
}
|
|
|
|
select {
|
|
case <-upsertManagementTokenCalled:
|
|
case <-time.After(time.Second):
|
|
require.Fail(t, "manager did not upsert management token in expected time")
|
|
}
|
|
|
|
// Make sure after manager has stopped no more statuses are pushed.
|
|
cancel()
|
|
client.AssertExpectations(t)
|
|
}
|
|
|
|
func TestManager_StartMultipleTimes(t *testing.T) {
|
|
client := hcpclient.NewMockClient(t)
|
|
statusF := func(ctx context.Context) (hcpclient.ServerStatus, error) {
|
|
return hcpclient.ServerStatus{ID: t.Name()}, nil
|
|
}
|
|
|
|
updateCh := make(chan struct{}, 1)
|
|
client.EXPECT().PushServerStatus(mock.Anything, &hcpclient.ServerStatus{ID: t.Name()}).Return(nil).Once()
|
|
|
|
cloudCfg := config.CloudConfig{
|
|
ResourceID: "organization/85702e73-8a3d-47dc-291c-379b783c5804/project/8c0547c0-10e8-1ea2-dffe-384bee8da634/hashicorp.consul.global-network-manager.cluster/test",
|
|
NodeID: "node-1",
|
|
ManagementToken: "fake-token",
|
|
}
|
|
|
|
mgr := NewManager(
|
|
ManagerConfig{
|
|
Logger: hclog.New(&hclog.LoggerOptions{Output: io.Discard}),
|
|
StatusFn: statusF,
|
|
},
|
|
)
|
|
|
|
mgr.testUpdateSent = updateCh
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
t.Cleanup(cancel)
|
|
|
|
// Start the manager twice concurrently, expect only one update
|
|
mgr.UpdateConfig(client, cloudCfg)
|
|
go mgr.Start(ctx)
|
|
go mgr.Start(ctx)
|
|
select {
|
|
case <-updateCh:
|
|
case <-time.After(time.Second):
|
|
require.Fail(t, "manager did not send update in expected time")
|
|
}
|
|
|
|
select {
|
|
case <-updateCh:
|
|
require.Fail(t, "manager sent an update when not expected")
|
|
case <-time.After(time.Second):
|
|
}
|
|
|
|
// Try start the manager again, still don't expect an update since already running
|
|
mgr.Start(ctx)
|
|
select {
|
|
case <-updateCh:
|
|
require.Fail(t, "manager sent an update when not expected")
|
|
case <-time.After(time.Second):
|
|
}
|
|
}
|
|
|
|
func TestManager_UpdateConfig(t *testing.T) {
|
|
client := hcpclient.NewMockClient(t)
|
|
statusF := func(ctx context.Context) (hcpclient.ServerStatus, error) {
|
|
return hcpclient.ServerStatus{ID: t.Name()}, nil
|
|
}
|
|
|
|
updateCh := make(chan struct{}, 1)
|
|
|
|
cloudCfg := config.CloudConfig{
|
|
ResourceID: "organization/85702e73-8a3d-47dc-291c-379b783c5804/project/8c0547c0-10e8-1ea2-dffe-384bee8da634/hashicorp.consul.global-network-manager.cluster/test",
|
|
NodeID: "node-1",
|
|
}
|
|
|
|
mgr := NewManager(
|
|
ManagerConfig{
|
|
Logger: hclog.New(&hclog.LoggerOptions{Output: io.Discard}),
|
|
StatusFn: statusF,
|
|
CloudConfig: cloudCfg,
|
|
Client: client,
|
|
},
|
|
)
|
|
|
|
mgr.testUpdateSent = updateCh
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
t.Cleanup(cancel)
|
|
|
|
// Start the manager, expect an initial status update
|
|
client.EXPECT().PushServerStatus(mock.Anything, &hcpclient.ServerStatus{ID: t.Name()}).Return(nil).Once()
|
|
mgr.Start(ctx)
|
|
select {
|
|
case <-updateCh:
|
|
case <-time.After(time.Second):
|
|
require.Fail(t, "manager did not send update in expected time")
|
|
}
|
|
|
|
// Update the cloud configuration, expect a status update
|
|
client.EXPECT().PushServerStatus(mock.Anything, &hcpclient.ServerStatus{ID: t.Name()}).Return(nil).Once()
|
|
updatedCfg := cloudCfg
|
|
updatedCfg.ManagementToken = "token"
|
|
mgr.UpdateConfig(client, updatedCfg)
|
|
select {
|
|
case <-updateCh:
|
|
case <-time.After(time.Second):
|
|
require.Fail(t, "manager did not send update in expected time")
|
|
}
|
|
|
|
// Update the client, expect a status update
|
|
updatedClient := hcpclient.NewMockClient(t)
|
|
updatedClient.EXPECT().PushServerStatus(mock.Anything, &hcpclient.ServerStatus{ID: t.Name()}).Return(nil).Once()
|
|
mgr.UpdateConfig(updatedClient, updatedCfg)
|
|
select {
|
|
case <-updateCh:
|
|
case <-time.After(time.Second):
|
|
require.Fail(t, "manager did not send update in expected time")
|
|
}
|
|
|
|
// Update with the same values, don't expect a status update
|
|
mgr.UpdateConfig(updatedClient, updatedCfg)
|
|
select {
|
|
case <-updateCh:
|
|
require.Fail(t, "manager sent an update when not expected")
|
|
case <-time.After(time.Second):
|
|
}
|
|
}
|
|
|
|
func TestManager_SendUpdate(t *testing.T) {
|
|
client := hcpclient.NewMockClient(t)
|
|
statusF := func(ctx context.Context) (hcpclient.ServerStatus, error) {
|
|
return hcpclient.ServerStatus{ID: t.Name()}, nil
|
|
}
|
|
updateCh := make(chan struct{}, 1)
|
|
|
|
// Expect two calls, once during run startup and again when SendUpdate is called
|
|
client.EXPECT().PushServerStatus(mock.Anything, &hcpclient.ServerStatus{ID: t.Name()}).Return(nil).Twice()
|
|
mgr := NewManager(
|
|
ManagerConfig{
|
|
Client: client,
|
|
Logger: hclog.New(&hclog.LoggerOptions{Output: io.Discard}),
|
|
StatusFn: statusF,
|
|
},
|
|
)
|
|
mgr.testUpdateSent = updateCh
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
mgr.Start(ctx)
|
|
select {
|
|
case <-updateCh:
|
|
case <-time.After(time.Second):
|
|
require.Fail(t, "manager did not send update in expected time")
|
|
}
|
|
mgr.SendUpdate()
|
|
select {
|
|
case <-updateCh:
|
|
case <-time.After(time.Second):
|
|
require.Fail(t, "manager did not send update in expected time")
|
|
}
|
|
client.AssertExpectations(t)
|
|
}
|
|
|
|
func TestManager_SendUpdate_Periodic(t *testing.T) {
|
|
client := hcpclient.NewMockClient(t)
|
|
statusF := func(ctx context.Context) (hcpclient.ServerStatus, error) {
|
|
return hcpclient.ServerStatus{ID: t.Name()}, nil
|
|
}
|
|
updateCh := make(chan struct{}, 1)
|
|
|
|
// Expect two calls, once during run startup and again when SendUpdate is called
|
|
client.EXPECT().PushServerStatus(mock.Anything, &hcpclient.ServerStatus{ID: t.Name()}).Return(nil).Twice()
|
|
mgr := NewManager(
|
|
ManagerConfig{
|
|
Client: client,
|
|
Logger: hclog.New(&hclog.LoggerOptions{Output: io.Discard}),
|
|
StatusFn: statusF,
|
|
MaxInterval: time.Second,
|
|
MinInterval: 100 * time.Millisecond,
|
|
},
|
|
)
|
|
mgr.testUpdateSent = updateCh
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
mgr.Start(ctx)
|
|
select {
|
|
case <-updateCh:
|
|
case <-time.After(time.Second):
|
|
require.Fail(t, "manager did not send update in expected time")
|
|
}
|
|
select {
|
|
case <-updateCh:
|
|
case <-time.After(time.Second):
|
|
require.Fail(t, "manager did not send update in expected time")
|
|
}
|
|
client.AssertExpectations(t)
|
|
}
|
|
|
|
func TestManager_Stop(t *testing.T) {
|
|
client := hcpclient.NewMockClient(t)
|
|
|
|
// Configure status functions called in sendUpdate
|
|
statusF := func(ctx context.Context) (hcpclient.ServerStatus, error) {
|
|
return hcpclient.ServerStatus{ID: t.Name()}, nil
|
|
}
|
|
updateCh := make(chan struct{}, 1)
|
|
client.EXPECT().PushServerStatus(mock.Anything, &hcpclient.ServerStatus{ID: t.Name()}).Return(nil).Twice()
|
|
|
|
// Configure management token creation and cleanup
|
|
token := "test-token"
|
|
upsertManagementTokenCalled := make(chan struct{}, 1)
|
|
upsertManagementTokenF := func(name, secretID string) error {
|
|
upsertManagementTokenCalled <- struct{}{}
|
|
if secretID != token {
|
|
return fmt.Errorf("expected token %q, got %q", token, secretID)
|
|
}
|
|
return nil
|
|
}
|
|
deleteManagementTokenCalled := make(chan struct{}, 1)
|
|
deleteManagementTokenF := func(secretID string) error {
|
|
deleteManagementTokenCalled <- struct{}{}
|
|
if secretID != token {
|
|
return fmt.Errorf("expected token %q, got %q", token, secretID)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Configure the SCADA provider
|
|
scadaM := scada.NewMockProvider(t)
|
|
scadaM.EXPECT().UpdateHCPConfig(mock.Anything).Return(nil).Once()
|
|
scadaM.EXPECT().UpdateMeta(mock.Anything).Return().Once()
|
|
scadaM.EXPECT().Start().Return(nil).Once()
|
|
scadaM.EXPECT().Stop().Return(nil).Once()
|
|
|
|
// Configure the telemetry provider
|
|
telemetryM := NewMockTelemetryProvider(t)
|
|
telemetryM.EXPECT().Start(mock.Anything, mock.Anything).Return(nil).Once()
|
|
telemetryM.EXPECT().Stop().Return().Once()
|
|
|
|
// Configure manager with all its dependencies
|
|
mgr := NewManager(
|
|
ManagerConfig{
|
|
Logger: testutil.Logger(t),
|
|
StatusFn: statusF,
|
|
Client: client,
|
|
ManagementTokenUpserterFn: upsertManagementTokenF,
|
|
ManagementTokenDeleterFn: deleteManagementTokenF,
|
|
SCADAProvider: scadaM,
|
|
TelemetryProvider: telemetryM,
|
|
CloudConfig: config.CloudConfig{
|
|
ManagementToken: token,
|
|
},
|
|
},
|
|
)
|
|
mgr.testUpdateSent = updateCh
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
t.Cleanup(cancel)
|
|
|
|
// Start the manager
|
|
err := mgr.Start(ctx)
|
|
require.NoError(t, err)
|
|
select {
|
|
case <-updateCh:
|
|
case <-time.After(time.Second):
|
|
require.Fail(t, "manager did not send update in expected time")
|
|
}
|
|
select {
|
|
case <-upsertManagementTokenCalled:
|
|
case <-time.After(time.Second):
|
|
require.Fail(t, "manager did not create token in expected time")
|
|
}
|
|
|
|
// Send an update to ensure the manager is running in its main loop
|
|
mgr.SendUpdate()
|
|
select {
|
|
case <-updateCh:
|
|
case <-time.After(time.Second):
|
|
require.Fail(t, "manager did not send update in expected time")
|
|
}
|
|
|
|
// Stop the manager
|
|
err = mgr.Stop()
|
|
require.NoError(t, err)
|
|
|
|
// Validate that the management token delete function is called
|
|
select {
|
|
case <-deleteManagementTokenCalled:
|
|
case <-time.After(time.Millisecond * 100):
|
|
require.Fail(t, "manager did not create token in expected time")
|
|
}
|
|
|
|
// Send an update, expect no update since manager is stopped
|
|
mgr.SendUpdate()
|
|
select {
|
|
case <-updateCh:
|
|
require.Fail(t, "manager sent update after stopped")
|
|
case <-time.After(time.Second):
|
|
}
|
|
}
|