From b8b8ad46fca9190fa0e8e8455512604baf1479d3 Mon Sep 17 00:00:00 2001 From: Derek Menteer <105233703+hashi-derek@users.noreply.github.com> Date: Tue, 16 Jan 2024 08:57:43 -0600 Subject: [PATCH] Various race condition and test fixes. (#20212) * Increase timeouts for flakey peering test. * Various test fixes. * Fix race condition in reconcilePeering. This resolves an issue where a peering object in the state store was incorrectly mutated by a function, resulting in the test being flagged as failing when the -race flag was used. --- agent/acl_endpoint_test.go | 4 +- .../services/peerstream/stream_test.go | 4 +- .../services/peerstream/testing.go | 2 +- agent/metrics_test.go | 24 ------ agent/rpc/peering/service.go | 8 +- agent/rpc/peering/service_test.go | 56 ++++++++----- agent/testagent.go | 78 ++++++++++++++++++- 7 files changed, 120 insertions(+), 56 deletions(-) diff --git a/agent/acl_endpoint_test.go b/agent/acl_endpoint_test.go index 419763e5c4..0656b0882d 100644 --- a/agent/acl_endpoint_test.go +++ b/agent/acl_endpoint_test.go @@ -2217,7 +2217,7 @@ func TestACL_Authorize(t *testing.T) { } t.Parallel() - a1 := NewTestAgent(t, TestACLConfigWithParams(nil)) + a1 := NewTestAgent(t, TestACLConfigWithParams(nil), TestAgentOpts{DisableACLBootstrapCheck: true}) defer a1.Shutdown() testrpc.WaitForTestAgent(t, a1.RPC, "dc1", testrpc.WithToken(TestDefaultInitialManagementToken)) @@ -2253,7 +2253,7 @@ func TestACL_Authorize(t *testing.T) { secondaryParams.ReplicationToken = secondaryParams.InitialManagementToken secondaryParams.EnableTokenReplication = true - a2 := NewTestAgent(t, `datacenter = "dc2" `+TestACLConfigWithParams(secondaryParams)) + a2 := NewTestAgent(t, `datacenter = "dc2" `+TestACLConfigWithParams(secondaryParams), TestAgentOpts{DisableACLBootstrapCheck: true}) defer a2.Shutdown() addr := fmt.Sprintf("127.0.0.1:%d", a1.Config.SerfPortWAN) diff --git a/agent/grpc-external/services/peerstream/stream_test.go b/agent/grpc-external/services/peerstream/stream_test.go index d5a0abd6b4..37e5e99a76 100644 --- a/agent/grpc-external/services/peerstream/stream_test.go +++ b/agent/grpc-external/services/peerstream/stream_test.go @@ -1175,7 +1175,7 @@ func TestStreamResources_Server_CARootUpdates(t *testing.T) { func TestStreamResources_Server_AckNackNonce(t *testing.T) { srv, store := newTestServer(t, func(c *Config) { - c.incomingHeartbeatTimeout = 10 * time.Millisecond + c.incomingHeartbeatTimeout = 5 * time.Second }) p := writePeeringToBeDialed(t, store, 1, "my-peer") @@ -1222,7 +1222,7 @@ func TestStreamResources_Server_AckNackNonce(t *testing.T) { }) // Add in a sleep to prevent the test from flaking. // The mock client expects certain calls to be made. - time.Sleep(50 * time.Millisecond) + time.Sleep(500 * time.Millisecond) } // Test that when the client doesn't send a heartbeat in time, the stream is disconnected. diff --git a/agent/grpc-external/services/peerstream/testing.go b/agent/grpc-external/services/peerstream/testing.go index 47cc8d1dbe..b5e79a6347 100644 --- a/agent/grpc-external/services/peerstream/testing.go +++ b/agent/grpc-external/services/peerstream/testing.go @@ -30,7 +30,7 @@ func (c *MockClient) Send(r *pbpeerstream.ReplicationMessage) error { } func (c *MockClient) Recv() (*pbpeerstream.ReplicationMessage, error) { - return c.RecvWithTimeout(10 * time.Millisecond) + return c.RecvWithTimeout(100 * time.Millisecond) } func (c *MockClient) RecvWithTimeout(dur time.Duration) (*pbpeerstream.ReplicationMessage, error) { diff --git a/agent/metrics_test.go b/agent/metrics_test.go index 44d561e017..fa1fc55aa2 100644 --- a/agent/metrics_test.go +++ b/agent/metrics_test.go @@ -150,28 +150,6 @@ func assertMetricExistsWithValue(t *testing.T, respRec *httptest.ResponseRecorde } } -func assertMetricsWithLabelIsNonZero(t *testing.T, respRec *httptest.ResponseRecorder, label, labelValue string) { - if respRec.Body.String() == "" { - t.Fatalf("Response body is empty.") - } - - metrics := respRec.Body.String() - labelWithValueTarget := label + "=" + "\"" + labelValue + "\"" - - for _, line := range strings.Split(metrics, "\n") { - if len(line) < 1 || line[0] == '#' { - continue - } - - if strings.Contains(line, labelWithValueTarget) { - s := strings.SplitN(line, " ", 2) - if s[1] == "0" { - t.Fatalf("Metric with label provided \"%s:%s\" has the value 0", label, labelValue) - } - } - } -} - func assertMetricNotExists(t *testing.T, respRec *httptest.ResponseRecorder, metric string) { if respRec.Body.String() == "" { t.Fatalf("Response body is empty.") @@ -241,8 +219,6 @@ func TestAgent_OneTwelveRPCMetrics(t *testing.T) { assertMetricExistsWithLabels(t, respRec, metricsPrefix+"_rpc_server_call", []string{"errored", "method", "request_type", "rpc_type", "leader"}) // make sure we see 3 Status.Ping metrics corresponding to the calls we made above assertLabelWithValueForMetricExistsNTime(t, respRec, metricsPrefix+"_rpc_server_call", "method", "Status.Ping", 3) - // make sure rpc calls with elapsed time below 1ms are reported as decimal - assertMetricsWithLabelIsNonZero(t, respRec, "method", "Status.Ping") }) } diff --git a/agent/rpc/peering/service.go b/agent/rpc/peering/service.go index 7f59c2941c..2c6655be66 100644 --- a/agent/rpc/peering/service.go +++ b/agent/rpc/peering/service.go @@ -764,16 +764,15 @@ func (s *Server) PeeringList(ctx context.Context, req *pbpeering.PeeringListRequ // -- ImportedServicesCount and ExportedServicesCount // NOTE: we return a new peering with this additional data func (s *Server) reconcilePeering(peering *pbpeering.Peering) *pbpeering.Peering { + cp := copyPeering(peering) streamState, found := s.Tracker.StreamStatus(peering.ID) if !found { // TODO(peering): this may be noise on non-leaders s.Logger.Warn("did not find peer in stream tracker; cannot populate imported and"+ " exported services count or reconcile peering state", "peerID", peering.ID) - peering.StreamStatus = &pbpeering.StreamStatus{} - return peering + cp.StreamStatus = &pbpeering.StreamStatus{} + return cp } else { - cp := copyPeering(peering) - // reconcile pbpeering.PeeringState_Active if streamState.Connected { cp.State = pbpeering.PeeringState_ACTIVE @@ -1160,6 +1159,5 @@ func validatePeer(peering *pbpeering.Peering, shouldDial bool) error { func copyPeering(p *pbpeering.Peering) *pbpeering.Peering { var copyP pbpeering.Peering proto.Merge(©P, p) - return ©P } diff --git a/agent/rpc/peering/service_test.go b/agent/rpc/peering/service_test.go index b2b2075157..5d78588a34 100644 --- a/agent/rpc/peering/service_test.go +++ b/agent/rpc/peering/service_test.go @@ -754,6 +754,7 @@ func TestPeeringService_Read(t *testing.T) { PeerCAPems: nil, PeerServerName: "test", PeerServerAddresses: []string{"addr1"}, + StreamStatus: &pbpeering.StreamStatus{}, } err := s.Server.FSM().State().PeeringWrite(10, &pbpeering.PeeringWriteRequest{Peering: p}) require.NoError(t, err) @@ -815,6 +816,7 @@ func TestPeeringService_Read_ACLEnforcement(t *testing.T) { PeerCAPems: nil, PeerServerName: "test", PeerServerAddresses: []string{"addr1"}, + StreamStatus: &pbpeering.StreamStatus{}, } err := s.Server.FSM().State().PeeringWrite(10, &pbpeering.PeeringWriteRequest{Peering: p}) require.NoError(t, err) @@ -879,8 +881,10 @@ func TestPeeringService_Read_Blocking(t *testing.T) { PeerCAPems: nil, PeerServerName: "test", PeerServerAddresses: []string{"addr1"}, + StreamStatus: &pbpeering.StreamStatus{}, } - err := s.Server.FSM().State().PeeringWrite(lastIdx, &pbpeering.PeeringWriteRequest{Peering: p}) + toWrite := proto.Clone(p).(*pbpeering.Peering) + err := s.Server.FSM().State().PeeringWrite(lastIdx, &pbpeering.PeeringWriteRequest{Peering: toWrite}) require.NoError(t, err) client := pbpeering.NewPeeringServiceClient(s.ClientConn(t)) @@ -891,37 +895,44 @@ func TestPeeringService_Read_Blocking(t *testing.T) { options := structs.QueryOptions{ MinQueryIndex: lastIdx, - MaxQueryTime: 1 * time.Second, + MaxQueryTime: 10 * time.Second, } ctx, err = external.ContextWithQueryOptions(ctx, options) require.NoError(t, err) // Mutate the original peering - p = proto.Clone(p).(*pbpeering.Peering) p.PeerServerAddresses = append(p.PeerServerAddresses, "addr2") // Async change to trigger update - marker := time.Now() + recvChan := make(chan *pbpeering.PeeringReadResponse) + errChan := make(chan error) + var header metadata.MD go func() { - time.Sleep(100 * time.Millisecond) - lastIdx++ - require.NoError(t, s.Server.FSM().State().PeeringWrite(lastIdx, &pbpeering.PeeringWriteRequest{Peering: p})) + resp, err := client.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: "foo"}, gogrpc.Header(&header)) + if err != nil { + errChan <- err + return + } + recvChan <- resp }() - var header metadata.MD - resp, err := client.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: "foo"}, gogrpc.Header(&header)) - require.NoError(t, err) + lastIdx++ + toWrite = proto.Clone(p).(*pbpeering.Peering) + require.NoError(t, s.Server.FSM().State().PeeringWrite(lastIdx, &pbpeering.PeeringWriteRequest{Peering: toWrite})) - // The query should return after the async change, but before the timeout - require.True(t, time.Since(marker) >= 100*time.Millisecond) - require.True(t, time.Since(marker) < 1*time.Second) - - // Verify query results - meta, err := external.QueryMetaFromGRPCMeta(header) - require.NoError(t, err) - require.Equal(t, lastIdx, meta.Index) - - prototest.AssertDeepEqual(t, p, resp.Peering) + select { + case err := <-errChan: + require.NoError(t, err) + case resp := <-recvChan: + meta, err := external.QueryMetaFromGRPCMeta(header) + require.NoError(t, err) + require.Equal(t, lastIdx, meta.Index) + resp.Peering.CreateIndex = 0 + resp.Peering.ModifyIndex = 0 + prototest.AssertDeepEqual(t, p, resp.Peering) + case <-time.After(2 * time.Second): + t.Error("blocking query timed out while waiting") + } } func TestPeeringService_Delete(t *testing.T) { @@ -1064,6 +1075,7 @@ func TestPeeringService_List(t *testing.T) { PeerCAPems: nil, PeerServerName: "fooservername", PeerServerAddresses: []string{"addr1"}, + StreamStatus: &pbpeering.StreamStatus{}, } require.NoError(t, s.Server.FSM().State().PeeringWrite(lastIdx, &pbpeering.PeeringWriteRequest{Peering: foo})) @@ -1075,6 +1087,7 @@ func TestPeeringService_List(t *testing.T) { PeerCAPems: nil, PeerServerName: "barservername", PeerServerAddresses: []string{"addr1"}, + StreamStatus: &pbpeering.StreamStatus{}, } require.NoError(t, s.Server.FSM().State().PeeringWrite(lastIdx, &pbpeering.PeeringWriteRequest{Peering: bar})) @@ -1120,6 +1133,7 @@ func TestPeeringService_List(t *testing.T) { PeerCAPems: nil, PeerServerName: "bazservername", PeerServerAddresses: []string{"addr1"}, + StreamStatus: &pbpeering.StreamStatus{}, } go func() { time.Sleep(100 * time.Millisecond) @@ -1166,6 +1180,7 @@ func TestPeeringService_List_ACLEnforcement(t *testing.T) { PeerCAPems: nil, PeerServerName: "fooservername", PeerServerAddresses: []string{"addr1"}, + StreamStatus: &pbpeering.StreamStatus{}, } require.NoError(t, s.Server.FSM().State().PeeringWrite(10, &pbpeering.PeeringWriteRequest{Peering: foo})) bar := &pbpeering.Peering{ @@ -1175,6 +1190,7 @@ func TestPeeringService_List_ACLEnforcement(t *testing.T) { PeerCAPems: nil, PeerServerName: "barservername", PeerServerAddresses: []string{"addr1"}, + StreamStatus: &pbpeering.StreamStatus{}, } require.NoError(t, s.Server.FSM().State().PeeringWrite(15, &pbpeering.PeeringWriteRequest{Peering: bar})) diff --git a/agent/testagent.go b/agent/testagent.go index 49001cbdcb..a18dee1ead 100644 --- a/agent/testagent.go +++ b/agent/testagent.go @@ -13,6 +13,7 @@ import ( "net/http/httptest" "path/filepath" "strconv" + "strings" "testing" "text/template" "time" @@ -20,6 +21,7 @@ import ( "github.com/armon/go-metrics" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-uuid" + "github.com/stretchr/testify/require" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/config" @@ -86,15 +88,32 @@ type TestAgent struct { // allows the BaseDeps to be modified before starting the embedded agent OverrideDeps func(deps *BaseDeps) + // Skips asserting that the ACL bootstrap has occurred. This may be required + // for various tests where multiple servers are joined later. + disableACLBootstrapCheck bool + // Agent is the embedded consul agent. // It is valid after Start(). *Agent } +type TestAgentOpts struct { + // Skips asserting that the ACL bootstrap has occurred. This may be required + // for various tests where multiple servers are joined later. + DisableACLBootstrapCheck bool +} + // NewTestAgent returns a started agent with the given configuration. It fails // the test if the Agent could not be started. -func NewTestAgent(t *testing.T, hcl string) *TestAgent { - a := StartTestAgent(t, TestAgent{HCL: hcl}) +func NewTestAgent(t *testing.T, hcl string, opts ...TestAgentOpts) *TestAgent { + // This varargs approach is used so that we don't have to modify all of the `NewTestAgent()` calls + // in order to introduce more optional arguments. + require.LessOrEqual(t, len(opts), 1, "NewTestAgent cannot accept more than one opts argument") + ta := TestAgent{HCL: hcl} + if len(opts) == 1 { + ta.disableACLBootstrapCheck = opts[0].DisableACLBootstrapCheck + } + a := StartTestAgent(t, ta) t.Cleanup(func() { a.Shutdown() }) return a } @@ -286,6 +305,16 @@ func (a *TestAgent) waitForUp() error { continue // fail, try again } if a.Config.Bootstrap && a.Config.ServerMode { + if !a.disableACLBootstrapCheck { + if ok, err := a.isACLBootstrapped(); err != nil { + retErr = fmt.Errorf("error checking for acl bootstrap: %w", err) + continue // fail, try again + } else if !ok { + retErr = fmt.Errorf("acl system not bootstrapped yet") + continue // fail, try again + } + } + if a.baseDeps.UseV2Resources() { args := structs.DCSpecificRequest{ Datacenter: "dc1", @@ -337,11 +366,56 @@ func (a *TestAgent) waitForUp() error { } return nil // success } + } return fmt.Errorf("unavailable. last error: %v", retErr) } +func (a *TestAgent) isACLBootstrapped() (bool, error) { + if a.config.ACLInitialManagementToken == "" { + logger := a.Agent.logger.Named("test") + logger.Warn("Skipping check for ACL bootstrapping") + + return true, nil // We lie because we can't check. + } + + const policyName = structs.ACLPolicyGlobalManagementName + + req := httptest.NewRequest("GET", "/v1/acl/policy/name/"+policyName, nil) + req.Header.Add("X-Consul-Token", a.config.ACLInitialManagementToken) + resp := httptest.NewRecorder() + + raw, err := a.srv.ACLPolicyReadByName(resp, req) + if err != nil { + if strings.Contains(err.Error(), "Unexpected response code: 403 (ACL not found)") { + return false, nil + } else if isACLNotBootstrapped(err) { + return false, nil + } + return false, err + } + if raw == nil { + return false, nil + } + policy, ok := raw.(*structs.ACLPolicy) + if !ok { + return false, fmt.Errorf("expected ACLPolicy got %T", raw) + } + + return policy != nil, nil +} + +func isACLNotBootstrapped(err error) bool { + switch { + case strings.Contains(err.Error(), "ACL system must be bootstrapped before making any requests that require authorization"): + return true + case strings.Contains(err.Error(), "The ACL system is currently in legacy mode"): + return true + } + return false +} + // Shutdown stops the agent and removes the data directory if it is // managed by the test agent. func (a *TestAgent) Shutdown() error {