mirror of https://github.com/status-im/consul.git
Various race condition and test fixes. (#20212)
* Increase timeouts for flakey peering test. * Various test fixes. * Fix race condition in reconcilePeering. This resolves an issue where a peering object in the state store was incorrectly mutated by a function, resulting in the test being flagged as failing when the -race flag was used.
This commit is contained in:
parent
dcba25f118
commit
b8b8ad46fc
|
@ -2217,7 +2217,7 @@ func TestACL_Authorize(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
a1 := NewTestAgent(t, TestACLConfigWithParams(nil))
|
a1 := NewTestAgent(t, TestACLConfigWithParams(nil), TestAgentOpts{DisableACLBootstrapCheck: true})
|
||||||
defer a1.Shutdown()
|
defer a1.Shutdown()
|
||||||
|
|
||||||
testrpc.WaitForTestAgent(t, a1.RPC, "dc1", testrpc.WithToken(TestDefaultInitialManagementToken))
|
testrpc.WaitForTestAgent(t, a1.RPC, "dc1", testrpc.WithToken(TestDefaultInitialManagementToken))
|
||||||
|
@ -2253,7 +2253,7 @@ func TestACL_Authorize(t *testing.T) {
|
||||||
secondaryParams.ReplicationToken = secondaryParams.InitialManagementToken
|
secondaryParams.ReplicationToken = secondaryParams.InitialManagementToken
|
||||||
secondaryParams.EnableTokenReplication = true
|
secondaryParams.EnableTokenReplication = true
|
||||||
|
|
||||||
a2 := NewTestAgent(t, `datacenter = "dc2" `+TestACLConfigWithParams(secondaryParams))
|
a2 := NewTestAgent(t, `datacenter = "dc2" `+TestACLConfigWithParams(secondaryParams), TestAgentOpts{DisableACLBootstrapCheck: true})
|
||||||
defer a2.Shutdown()
|
defer a2.Shutdown()
|
||||||
|
|
||||||
addr := fmt.Sprintf("127.0.0.1:%d", a1.Config.SerfPortWAN)
|
addr := fmt.Sprintf("127.0.0.1:%d", a1.Config.SerfPortWAN)
|
||||||
|
|
|
@ -1175,7 +1175,7 @@ func TestStreamResources_Server_CARootUpdates(t *testing.T) {
|
||||||
|
|
||||||
func TestStreamResources_Server_AckNackNonce(t *testing.T) {
|
func TestStreamResources_Server_AckNackNonce(t *testing.T) {
|
||||||
srv, store := newTestServer(t, func(c *Config) {
|
srv, store := newTestServer(t, func(c *Config) {
|
||||||
c.incomingHeartbeatTimeout = 10 * time.Millisecond
|
c.incomingHeartbeatTimeout = 5 * time.Second
|
||||||
})
|
})
|
||||||
|
|
||||||
p := writePeeringToBeDialed(t, store, 1, "my-peer")
|
p := writePeeringToBeDialed(t, store, 1, "my-peer")
|
||||||
|
@ -1222,7 +1222,7 @@ func TestStreamResources_Server_AckNackNonce(t *testing.T) {
|
||||||
})
|
})
|
||||||
// Add in a sleep to prevent the test from flaking.
|
// Add in a sleep to prevent the test from flaking.
|
||||||
// The mock client expects certain calls to be made.
|
// The mock client expects certain calls to be made.
|
||||||
time.Sleep(50 * time.Millisecond)
|
time.Sleep(500 * time.Millisecond)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test that when the client doesn't send a heartbeat in time, the stream is disconnected.
|
// Test that when the client doesn't send a heartbeat in time, the stream is disconnected.
|
||||||
|
|
|
@ -30,7 +30,7 @@ func (c *MockClient) Send(r *pbpeerstream.ReplicationMessage) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *MockClient) Recv() (*pbpeerstream.ReplicationMessage, error) {
|
func (c *MockClient) Recv() (*pbpeerstream.ReplicationMessage, error) {
|
||||||
return c.RecvWithTimeout(10 * time.Millisecond)
|
return c.RecvWithTimeout(100 * time.Millisecond)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *MockClient) RecvWithTimeout(dur time.Duration) (*pbpeerstream.ReplicationMessage, error) {
|
func (c *MockClient) RecvWithTimeout(dur time.Duration) (*pbpeerstream.ReplicationMessage, error) {
|
||||||
|
|
|
@ -150,28 +150,6 @@ func assertMetricExistsWithValue(t *testing.T, respRec *httptest.ResponseRecorde
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func assertMetricsWithLabelIsNonZero(t *testing.T, respRec *httptest.ResponseRecorder, label, labelValue string) {
|
|
||||||
if respRec.Body.String() == "" {
|
|
||||||
t.Fatalf("Response body is empty.")
|
|
||||||
}
|
|
||||||
|
|
||||||
metrics := respRec.Body.String()
|
|
||||||
labelWithValueTarget := label + "=" + "\"" + labelValue + "\""
|
|
||||||
|
|
||||||
for _, line := range strings.Split(metrics, "\n") {
|
|
||||||
if len(line) < 1 || line[0] == '#' {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if strings.Contains(line, labelWithValueTarget) {
|
|
||||||
s := strings.SplitN(line, " ", 2)
|
|
||||||
if s[1] == "0" {
|
|
||||||
t.Fatalf("Metric with label provided \"%s:%s\" has the value 0", label, labelValue)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func assertMetricNotExists(t *testing.T, respRec *httptest.ResponseRecorder, metric string) {
|
func assertMetricNotExists(t *testing.T, respRec *httptest.ResponseRecorder, metric string) {
|
||||||
if respRec.Body.String() == "" {
|
if respRec.Body.String() == "" {
|
||||||
t.Fatalf("Response body is empty.")
|
t.Fatalf("Response body is empty.")
|
||||||
|
@ -241,8 +219,6 @@ func TestAgent_OneTwelveRPCMetrics(t *testing.T) {
|
||||||
assertMetricExistsWithLabels(t, respRec, metricsPrefix+"_rpc_server_call", []string{"errored", "method", "request_type", "rpc_type", "leader"})
|
assertMetricExistsWithLabels(t, respRec, metricsPrefix+"_rpc_server_call", []string{"errored", "method", "request_type", "rpc_type", "leader"})
|
||||||
// make sure we see 3 Status.Ping metrics corresponding to the calls we made above
|
// make sure we see 3 Status.Ping metrics corresponding to the calls we made above
|
||||||
assertLabelWithValueForMetricExistsNTime(t, respRec, metricsPrefix+"_rpc_server_call", "method", "Status.Ping", 3)
|
assertLabelWithValueForMetricExistsNTime(t, respRec, metricsPrefix+"_rpc_server_call", "method", "Status.Ping", 3)
|
||||||
// make sure rpc calls with elapsed time below 1ms are reported as decimal
|
|
||||||
assertMetricsWithLabelIsNonZero(t, respRec, "method", "Status.Ping")
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -764,16 +764,15 @@ func (s *Server) PeeringList(ctx context.Context, req *pbpeering.PeeringListRequ
|
||||||
// -- ImportedServicesCount and ExportedServicesCount
|
// -- ImportedServicesCount and ExportedServicesCount
|
||||||
// NOTE: we return a new peering with this additional data
|
// NOTE: we return a new peering with this additional data
|
||||||
func (s *Server) reconcilePeering(peering *pbpeering.Peering) *pbpeering.Peering {
|
func (s *Server) reconcilePeering(peering *pbpeering.Peering) *pbpeering.Peering {
|
||||||
|
cp := copyPeering(peering)
|
||||||
streamState, found := s.Tracker.StreamStatus(peering.ID)
|
streamState, found := s.Tracker.StreamStatus(peering.ID)
|
||||||
if !found {
|
if !found {
|
||||||
// TODO(peering): this may be noise on non-leaders
|
// TODO(peering): this may be noise on non-leaders
|
||||||
s.Logger.Warn("did not find peer in stream tracker; cannot populate imported and"+
|
s.Logger.Warn("did not find peer in stream tracker; cannot populate imported and"+
|
||||||
" exported services count or reconcile peering state", "peerID", peering.ID)
|
" exported services count or reconcile peering state", "peerID", peering.ID)
|
||||||
peering.StreamStatus = &pbpeering.StreamStatus{}
|
cp.StreamStatus = &pbpeering.StreamStatus{}
|
||||||
return peering
|
return cp
|
||||||
} else {
|
} else {
|
||||||
cp := copyPeering(peering)
|
|
||||||
|
|
||||||
// reconcile pbpeering.PeeringState_Active
|
// reconcile pbpeering.PeeringState_Active
|
||||||
if streamState.Connected {
|
if streamState.Connected {
|
||||||
cp.State = pbpeering.PeeringState_ACTIVE
|
cp.State = pbpeering.PeeringState_ACTIVE
|
||||||
|
@ -1160,6 +1159,5 @@ func validatePeer(peering *pbpeering.Peering, shouldDial bool) error {
|
||||||
func copyPeering(p *pbpeering.Peering) *pbpeering.Peering {
|
func copyPeering(p *pbpeering.Peering) *pbpeering.Peering {
|
||||||
var copyP pbpeering.Peering
|
var copyP pbpeering.Peering
|
||||||
proto.Merge(©P, p)
|
proto.Merge(©P, p)
|
||||||
|
|
||||||
return ©P
|
return ©P
|
||||||
}
|
}
|
||||||
|
|
|
@ -754,6 +754,7 @@ func TestPeeringService_Read(t *testing.T) {
|
||||||
PeerCAPems: nil,
|
PeerCAPems: nil,
|
||||||
PeerServerName: "test",
|
PeerServerName: "test",
|
||||||
PeerServerAddresses: []string{"addr1"},
|
PeerServerAddresses: []string{"addr1"},
|
||||||
|
StreamStatus: &pbpeering.StreamStatus{},
|
||||||
}
|
}
|
||||||
err := s.Server.FSM().State().PeeringWrite(10, &pbpeering.PeeringWriteRequest{Peering: p})
|
err := s.Server.FSM().State().PeeringWrite(10, &pbpeering.PeeringWriteRequest{Peering: p})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -815,6 +816,7 @@ func TestPeeringService_Read_ACLEnforcement(t *testing.T) {
|
||||||
PeerCAPems: nil,
|
PeerCAPems: nil,
|
||||||
PeerServerName: "test",
|
PeerServerName: "test",
|
||||||
PeerServerAddresses: []string{"addr1"},
|
PeerServerAddresses: []string{"addr1"},
|
||||||
|
StreamStatus: &pbpeering.StreamStatus{},
|
||||||
}
|
}
|
||||||
err := s.Server.FSM().State().PeeringWrite(10, &pbpeering.PeeringWriteRequest{Peering: p})
|
err := s.Server.FSM().State().PeeringWrite(10, &pbpeering.PeeringWriteRequest{Peering: p})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -879,8 +881,10 @@ func TestPeeringService_Read_Blocking(t *testing.T) {
|
||||||
PeerCAPems: nil,
|
PeerCAPems: nil,
|
||||||
PeerServerName: "test",
|
PeerServerName: "test",
|
||||||
PeerServerAddresses: []string{"addr1"},
|
PeerServerAddresses: []string{"addr1"},
|
||||||
|
StreamStatus: &pbpeering.StreamStatus{},
|
||||||
}
|
}
|
||||||
err := s.Server.FSM().State().PeeringWrite(lastIdx, &pbpeering.PeeringWriteRequest{Peering: p})
|
toWrite := proto.Clone(p).(*pbpeering.Peering)
|
||||||
|
err := s.Server.FSM().State().PeeringWrite(lastIdx, &pbpeering.PeeringWriteRequest{Peering: toWrite})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
client := pbpeering.NewPeeringServiceClient(s.ClientConn(t))
|
client := pbpeering.NewPeeringServiceClient(s.ClientConn(t))
|
||||||
|
@ -891,37 +895,44 @@ func TestPeeringService_Read_Blocking(t *testing.T) {
|
||||||
|
|
||||||
options := structs.QueryOptions{
|
options := structs.QueryOptions{
|
||||||
MinQueryIndex: lastIdx,
|
MinQueryIndex: lastIdx,
|
||||||
MaxQueryTime: 1 * time.Second,
|
MaxQueryTime: 10 * time.Second,
|
||||||
}
|
}
|
||||||
ctx, err = external.ContextWithQueryOptions(ctx, options)
|
ctx, err = external.ContextWithQueryOptions(ctx, options)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// Mutate the original peering
|
// Mutate the original peering
|
||||||
p = proto.Clone(p).(*pbpeering.Peering)
|
|
||||||
p.PeerServerAddresses = append(p.PeerServerAddresses, "addr2")
|
p.PeerServerAddresses = append(p.PeerServerAddresses, "addr2")
|
||||||
|
|
||||||
// Async change to trigger update
|
// Async change to trigger update
|
||||||
marker := time.Now()
|
recvChan := make(chan *pbpeering.PeeringReadResponse)
|
||||||
|
errChan := make(chan error)
|
||||||
|
var header metadata.MD
|
||||||
go func() {
|
go func() {
|
||||||
time.Sleep(100 * time.Millisecond)
|
resp, err := client.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: "foo"}, gogrpc.Header(&header))
|
||||||
lastIdx++
|
if err != nil {
|
||||||
require.NoError(t, s.Server.FSM().State().PeeringWrite(lastIdx, &pbpeering.PeeringWriteRequest{Peering: p}))
|
errChan <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
recvChan <- resp
|
||||||
}()
|
}()
|
||||||
|
|
||||||
var header metadata.MD
|
lastIdx++
|
||||||
resp, err := client.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: "foo"}, gogrpc.Header(&header))
|
toWrite = proto.Clone(p).(*pbpeering.Peering)
|
||||||
require.NoError(t, err)
|
require.NoError(t, s.Server.FSM().State().PeeringWrite(lastIdx, &pbpeering.PeeringWriteRequest{Peering: toWrite}))
|
||||||
|
|
||||||
// The query should return after the async change, but before the timeout
|
select {
|
||||||
require.True(t, time.Since(marker) >= 100*time.Millisecond)
|
case err := <-errChan:
|
||||||
require.True(t, time.Since(marker) < 1*time.Second)
|
require.NoError(t, err)
|
||||||
|
case resp := <-recvChan:
|
||||||
// Verify query results
|
meta, err := external.QueryMetaFromGRPCMeta(header)
|
||||||
meta, err := external.QueryMetaFromGRPCMeta(header)
|
require.NoError(t, err)
|
||||||
require.NoError(t, err)
|
require.Equal(t, lastIdx, meta.Index)
|
||||||
require.Equal(t, lastIdx, meta.Index)
|
resp.Peering.CreateIndex = 0
|
||||||
|
resp.Peering.ModifyIndex = 0
|
||||||
prototest.AssertDeepEqual(t, p, resp.Peering)
|
prototest.AssertDeepEqual(t, p, resp.Peering)
|
||||||
|
case <-time.After(2 * time.Second):
|
||||||
|
t.Error("blocking query timed out while waiting")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPeeringService_Delete(t *testing.T) {
|
func TestPeeringService_Delete(t *testing.T) {
|
||||||
|
@ -1064,6 +1075,7 @@ func TestPeeringService_List(t *testing.T) {
|
||||||
PeerCAPems: nil,
|
PeerCAPems: nil,
|
||||||
PeerServerName: "fooservername",
|
PeerServerName: "fooservername",
|
||||||
PeerServerAddresses: []string{"addr1"},
|
PeerServerAddresses: []string{"addr1"},
|
||||||
|
StreamStatus: &pbpeering.StreamStatus{},
|
||||||
}
|
}
|
||||||
require.NoError(t, s.Server.FSM().State().PeeringWrite(lastIdx, &pbpeering.PeeringWriteRequest{Peering: foo}))
|
require.NoError(t, s.Server.FSM().State().PeeringWrite(lastIdx, &pbpeering.PeeringWriteRequest{Peering: foo}))
|
||||||
|
|
||||||
|
@ -1075,6 +1087,7 @@ func TestPeeringService_List(t *testing.T) {
|
||||||
PeerCAPems: nil,
|
PeerCAPems: nil,
|
||||||
PeerServerName: "barservername",
|
PeerServerName: "barservername",
|
||||||
PeerServerAddresses: []string{"addr1"},
|
PeerServerAddresses: []string{"addr1"},
|
||||||
|
StreamStatus: &pbpeering.StreamStatus{},
|
||||||
}
|
}
|
||||||
require.NoError(t, s.Server.FSM().State().PeeringWrite(lastIdx, &pbpeering.PeeringWriteRequest{Peering: bar}))
|
require.NoError(t, s.Server.FSM().State().PeeringWrite(lastIdx, &pbpeering.PeeringWriteRequest{Peering: bar}))
|
||||||
|
|
||||||
|
@ -1120,6 +1133,7 @@ func TestPeeringService_List(t *testing.T) {
|
||||||
PeerCAPems: nil,
|
PeerCAPems: nil,
|
||||||
PeerServerName: "bazservername",
|
PeerServerName: "bazservername",
|
||||||
PeerServerAddresses: []string{"addr1"},
|
PeerServerAddresses: []string{"addr1"},
|
||||||
|
StreamStatus: &pbpeering.StreamStatus{},
|
||||||
}
|
}
|
||||||
go func() {
|
go func() {
|
||||||
time.Sleep(100 * time.Millisecond)
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
@ -1166,6 +1180,7 @@ func TestPeeringService_List_ACLEnforcement(t *testing.T) {
|
||||||
PeerCAPems: nil,
|
PeerCAPems: nil,
|
||||||
PeerServerName: "fooservername",
|
PeerServerName: "fooservername",
|
||||||
PeerServerAddresses: []string{"addr1"},
|
PeerServerAddresses: []string{"addr1"},
|
||||||
|
StreamStatus: &pbpeering.StreamStatus{},
|
||||||
}
|
}
|
||||||
require.NoError(t, s.Server.FSM().State().PeeringWrite(10, &pbpeering.PeeringWriteRequest{Peering: foo}))
|
require.NoError(t, s.Server.FSM().State().PeeringWrite(10, &pbpeering.PeeringWriteRequest{Peering: foo}))
|
||||||
bar := &pbpeering.Peering{
|
bar := &pbpeering.Peering{
|
||||||
|
@ -1175,6 +1190,7 @@ func TestPeeringService_List_ACLEnforcement(t *testing.T) {
|
||||||
PeerCAPems: nil,
|
PeerCAPems: nil,
|
||||||
PeerServerName: "barservername",
|
PeerServerName: "barservername",
|
||||||
PeerServerAddresses: []string{"addr1"},
|
PeerServerAddresses: []string{"addr1"},
|
||||||
|
StreamStatus: &pbpeering.StreamStatus{},
|
||||||
}
|
}
|
||||||
require.NoError(t, s.Server.FSM().State().PeeringWrite(15, &pbpeering.PeeringWriteRequest{Peering: bar}))
|
require.NoError(t, s.Server.FSM().State().PeeringWrite(15, &pbpeering.PeeringWriteRequest{Peering: bar}))
|
||||||
|
|
||||||
|
|
|
@ -13,6 +13,7 @@ import (
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"text/template"
|
"text/template"
|
||||||
"time"
|
"time"
|
||||||
|
@ -20,6 +21,7 @@ import (
|
||||||
"github.com/armon/go-metrics"
|
"github.com/armon/go-metrics"
|
||||||
"github.com/hashicorp/go-hclog"
|
"github.com/hashicorp/go-hclog"
|
||||||
"github.com/hashicorp/go-uuid"
|
"github.com/hashicorp/go-uuid"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/acl"
|
"github.com/hashicorp/consul/acl"
|
||||||
"github.com/hashicorp/consul/agent/config"
|
"github.com/hashicorp/consul/agent/config"
|
||||||
|
@ -86,15 +88,32 @@ type TestAgent struct {
|
||||||
// allows the BaseDeps to be modified before starting the embedded agent
|
// allows the BaseDeps to be modified before starting the embedded agent
|
||||||
OverrideDeps func(deps *BaseDeps)
|
OverrideDeps func(deps *BaseDeps)
|
||||||
|
|
||||||
|
// Skips asserting that the ACL bootstrap has occurred. This may be required
|
||||||
|
// for various tests where multiple servers are joined later.
|
||||||
|
disableACLBootstrapCheck bool
|
||||||
|
|
||||||
// Agent is the embedded consul agent.
|
// Agent is the embedded consul agent.
|
||||||
// It is valid after Start().
|
// It is valid after Start().
|
||||||
*Agent
|
*Agent
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type TestAgentOpts struct {
|
||||||
|
// Skips asserting that the ACL bootstrap has occurred. This may be required
|
||||||
|
// for various tests where multiple servers are joined later.
|
||||||
|
DisableACLBootstrapCheck bool
|
||||||
|
}
|
||||||
|
|
||||||
// NewTestAgent returns a started agent with the given configuration. It fails
|
// NewTestAgent returns a started agent with the given configuration. It fails
|
||||||
// the test if the Agent could not be started.
|
// the test if the Agent could not be started.
|
||||||
func NewTestAgent(t *testing.T, hcl string) *TestAgent {
|
func NewTestAgent(t *testing.T, hcl string, opts ...TestAgentOpts) *TestAgent {
|
||||||
a := StartTestAgent(t, TestAgent{HCL: hcl})
|
// This varargs approach is used so that we don't have to modify all of the `NewTestAgent()` calls
|
||||||
|
// in order to introduce more optional arguments.
|
||||||
|
require.LessOrEqual(t, len(opts), 1, "NewTestAgent cannot accept more than one opts argument")
|
||||||
|
ta := TestAgent{HCL: hcl}
|
||||||
|
if len(opts) == 1 {
|
||||||
|
ta.disableACLBootstrapCheck = opts[0].DisableACLBootstrapCheck
|
||||||
|
}
|
||||||
|
a := StartTestAgent(t, ta)
|
||||||
t.Cleanup(func() { a.Shutdown() })
|
t.Cleanup(func() { a.Shutdown() })
|
||||||
return a
|
return a
|
||||||
}
|
}
|
||||||
|
@ -286,6 +305,16 @@ func (a *TestAgent) waitForUp() error {
|
||||||
continue // fail, try again
|
continue // fail, try again
|
||||||
}
|
}
|
||||||
if a.Config.Bootstrap && a.Config.ServerMode {
|
if a.Config.Bootstrap && a.Config.ServerMode {
|
||||||
|
if !a.disableACLBootstrapCheck {
|
||||||
|
if ok, err := a.isACLBootstrapped(); err != nil {
|
||||||
|
retErr = fmt.Errorf("error checking for acl bootstrap: %w", err)
|
||||||
|
continue // fail, try again
|
||||||
|
} else if !ok {
|
||||||
|
retErr = fmt.Errorf("acl system not bootstrapped yet")
|
||||||
|
continue // fail, try again
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if a.baseDeps.UseV2Resources() {
|
if a.baseDeps.UseV2Resources() {
|
||||||
args := structs.DCSpecificRequest{
|
args := structs.DCSpecificRequest{
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
|
@ -337,11 +366,56 @@ func (a *TestAgent) waitForUp() error {
|
||||||
}
|
}
|
||||||
return nil // success
|
return nil // success
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return fmt.Errorf("unavailable. last error: %v", retErr)
|
return fmt.Errorf("unavailable. last error: %v", retErr)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *TestAgent) isACLBootstrapped() (bool, error) {
|
||||||
|
if a.config.ACLInitialManagementToken == "" {
|
||||||
|
logger := a.Agent.logger.Named("test")
|
||||||
|
logger.Warn("Skipping check for ACL bootstrapping")
|
||||||
|
|
||||||
|
return true, nil // We lie because we can't check.
|
||||||
|
}
|
||||||
|
|
||||||
|
const policyName = structs.ACLPolicyGlobalManagementName
|
||||||
|
|
||||||
|
req := httptest.NewRequest("GET", "/v1/acl/policy/name/"+policyName, nil)
|
||||||
|
req.Header.Add("X-Consul-Token", a.config.ACLInitialManagementToken)
|
||||||
|
resp := httptest.NewRecorder()
|
||||||
|
|
||||||
|
raw, err := a.srv.ACLPolicyReadByName(resp, req)
|
||||||
|
if err != nil {
|
||||||
|
if strings.Contains(err.Error(), "Unexpected response code: 403 (ACL not found)") {
|
||||||
|
return false, nil
|
||||||
|
} else if isACLNotBootstrapped(err) {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
if raw == nil {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
policy, ok := raw.(*structs.ACLPolicy)
|
||||||
|
if !ok {
|
||||||
|
return false, fmt.Errorf("expected ACLPolicy got %T", raw)
|
||||||
|
}
|
||||||
|
|
||||||
|
return policy != nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func isACLNotBootstrapped(err error) bool {
|
||||||
|
switch {
|
||||||
|
case strings.Contains(err.Error(), "ACL system must be bootstrapped before making any requests that require authorization"):
|
||||||
|
return true
|
||||||
|
case strings.Contains(err.Error(), "The ACL system is currently in legacy mode"):
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
// Shutdown stops the agent and removes the data directory if it is
|
// Shutdown stops the agent and removes the data directory if it is
|
||||||
// managed by the test agent.
|
// managed by the test agent.
|
||||||
func (a *TestAgent) Shutdown() error {
|
func (a *TestAgent) Shutdown() error {
|
||||||
|
|
Loading…
Reference in New Issue