From 6453375ab228aa58fb31efdaf50cd223a6a1dea9 Mon Sep 17 00:00:00 2001 From: freddygv Date: Mon, 13 Jun 2022 19:50:59 -0600 Subject: [PATCH 1/2] Add leader routine to clean up peerings Once a peering is marked for deletion a new leader routine will now clean up all imported resources and then the peering itself. A lot of the logic was grabbed from the namespace/partitions deferred deletions but with a handful of simplifications: - The rate limiting is not configurable. - Deleting imported nodes/services/checks is done by deleting nodes with the Txn API. The services and checks are deleted as a side-effect. - There is no "round rate limiter" like with namespaces and partitions. This is because peerings are purely local, and deleting a peering in the datacenter does not depend on deleting data from other DCs like with WAN-federated namespaces. All rate limiting is handled by the Raft rate limiter. --- agent/consul/enterprise_server_oss.go | 6 + agent/consul/leader.go | 15 +++ agent/consul/leader_peering.go | 151 ++++++++++++++++++++++++ agent/consul/leader_peering_test.go | 162 +++++++++++++++++++++++++- agent/consul/server.go | 1 + agent/peering_endpoint_test.go | 25 ++-- agent/rpc/peering/service_test.go | 14 ++- api/peering_test.go | 13 ++- 8 files changed, 359 insertions(+), 28 deletions(-) diff --git a/agent/consul/enterprise_server_oss.go b/agent/consul/enterprise_server_oss.go index 187d59e97f..d6e07ddd8c 100644 --- a/agent/consul/enterprise_server_oss.go +++ b/agent/consul/enterprise_server_oss.go @@ -57,6 +57,12 @@ func (s *Server) revokeEnterpriseLeadership() error { return nil } +func (s *Server) startTenancyDeferredDeletion(ctx context.Context) { +} + +func (s *Server) stopTenancyDeferredDeletion() { +} + func (s *Server) validateEnterpriseRequest(entMeta *acl.EnterpriseMeta, write bool) error { return nil } diff --git a/agent/consul/leader.go b/agent/consul/leader.go index 0d9359c14f..d38e5015a9 100644 --- a/agent/consul/leader.go +++ b/agent/consul/leader.go @@ -47,6 +47,9 @@ var LeaderSummaries = []prometheus.SummaryDefinition{ const ( newLeaderEvent = "consul:new-leader" barrierWriteTimeout = 2 * time.Minute + + defaultDeletionRoundBurst int = 5 // number replication round bursts + defaultDeletionApplyRate rate.Limit = 10 // raft applies per second ) var ( @@ -313,6 +316,8 @@ func (s *Server) establishLeadership(ctx context.Context) error { s.startPeeringStreamSync(ctx) + s.startDeferredDeletion(ctx) + if err := s.startConnectLeader(ctx); err != nil { return err } @@ -751,6 +756,16 @@ func (s *Server) stopACLReplication() { s.leaderRoutineManager.Stop(aclTokenReplicationRoutineName) } +func (s *Server) startDeferredDeletion(ctx context.Context) { + s.startPeeringDeferredDeletion(ctx) + s.startTenancyDeferredDeletion(ctx) +} + +func (s *Server) stopDeferredDeletion() { + s.leaderRoutineManager.Stop(peeringDeletionRoutineName) + s.stopTenancyDeferredDeletion() +} + func (s *Server) startConfigReplication(ctx context.Context) { if s.config.PrimaryDatacenter == "" || s.config.PrimaryDatacenter == s.config.Datacenter { // replication shouldn't run in the primary DC diff --git a/agent/consul/leader_peering.go b/agent/consul/leader_peering.go index 5efc52edf9..dea09ebed5 100644 --- a/agent/consul/leader_peering.go +++ b/agent/consul/leader_peering.go @@ -12,12 +12,17 @@ import ( "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-multierror" "github.com/hashicorp/go-uuid" + "golang.org/x/time/rate" "google.golang.org/grpc" "google.golang.org/grpc/credentials" + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/rpc/peering" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/logging" "github.com/hashicorp/consul/proto/pbpeering" ) @@ -297,3 +302,149 @@ func newPeerDialer(peerAddr string) func(context.Context, string) (net.Conn, err return conn, nil } } + +func (s *Server) startPeeringDeferredDeletion(ctx context.Context) { + s.leaderRoutineManager.Start(ctx, peeringDeletionRoutineName, s.runPeeringDeletions) +} + +// runPeeringDeletions watches for peerings marked for deletions and then cleans up data for them. +func (s *Server) runPeeringDeletions(ctx context.Context) error { + logger := s.loggers.Named(logging.Peering) + + // This limiter's purpose is to control the rate of raft applies caused by the deferred deletion + // process. This includes deletion of the peerings themselves in addition to any peering data + raftLimiter := rate.NewLimiter(defaultDeletionApplyRate, int(defaultDeletionApplyRate)) + for { + ws := memdb.NewWatchSet() + state := s.fsm.State() + _, peerings, err := s.fsm.State().PeeringListDeleted(ws) + if err != nil { + logger.Warn("encountered an error while searching for deleted peerings", "error", err) + continue + } + + if len(peerings) == 0 { + ws.Add(state.AbandonCh()) + + // wait for a peering to be deleted or the routine to be cancelled + if err := ws.WatchCtx(ctx); err != nil { + return err + } + continue + } + + for _, p := range peerings { + s.removePeeringAndData(ctx, logger, raftLimiter, p.Name, acl.PartitionOrDefault(p.Partition)) + } + } +} + +// removepPeeringAndData removes data imported for a peering and the peering itself. +func (s *Server) removePeeringAndData(ctx context.Context, logger hclog.Logger, limiter *rate.Limiter, peer string, partition string) { + logger = logger.With("peer", peer, "partition", partition) + + // First delete all imported data. + // By deleting all imported nodes we also delete all services and checks registered on them. + if err := s.deleteAllNodes(ctx, limiter, *structs.NodeEnterpriseMetaInPartition(partition), peer); err != nil { + logger.Error("Failed to remove Nodes for peer", "error", err) + return + } + if err := s.deleteTrustBundleFromPeer(ctx, limiter, *structs.NodeEnterpriseMetaInPartition(partition), peer); err != nil { + logger.Error("Failed to remove trust bundle for peer", "error", err) + return + } + + if err := limiter.Wait(ctx); err != nil { + return + } + + // Once all imported data is deleted, the peering itself is also deleted. + req := pbpeering.PeeringDeleteRequest{ + Name: peer, + Partition: partition, + } + _, err := s.raftApplyProtobuf(structs.PeeringDeleteType, &req) + if err != nil { + logger.Error("failed to apply full peering deletion", "error", err) + return + } +} + +// deleteAllNodes will delete all nodes in a partition or all nodes imported from a given peer name. +func (s *Server) deleteAllNodes(ctx context.Context, limiter *rate.Limiter, entMeta acl.EnterpriseMeta, peerName string) error { + // Same as ACL batch upsert size + nodeBatchSizeBytes := 256 * 1024 + + _, nodes, err := s.fsm.State().NodeDump(nil, &entMeta, peerName) + if err != nil { + return err + } + if len(nodes) == 0 { + return nil + } + + i := 0 + for { + var ops structs.TxnOps + for batchSize := 0; batchSize < nodeBatchSizeBytes && i < len(nodes); i++ { + entry := nodes[i] + + op := structs.TxnOp{ + Node: &structs.TxnNodeOp{ + Verb: api.NodeDelete, + Node: structs.Node{ + Node: entry.Node, + Partition: entry.Partition, + PeerName: entry.PeerName, + }, + }, + } + ops = append(ops, &op) + + // Add entries to the transaction until it reaches the max batch size + batchSize += len(entry.Node) + len(entry.Partition) + } + + // Send each batch as a TXN Req to avoid sending one at a time + req := structs.TxnRequest{ + Datacenter: s.config.Datacenter, + Ops: ops, + } + if len(req.Ops) > 0 { + if err := limiter.Wait(ctx); err != nil { + return err + } + + _, err := s.raftApplyMsgpack(structs.TxnRequestType, &req) + if err != nil { + return err + } + } else { + break + } + } + + return nil +} + +// deleteTrustBundleFromPeer deletes the trust bundle imported from a peer, if present. +func (s *Server) deleteTrustBundleFromPeer(ctx context.Context, limiter *rate.Limiter, entMeta acl.EnterpriseMeta, peerName string) error { + _, bundle, err := s.fsm.State().PeeringTrustBundleRead(nil, state.Query{Value: peerName, EnterpriseMeta: entMeta}) + if err != nil { + return err + } + if bundle == nil { + return nil + } + + if err := limiter.Wait(ctx); err != nil { + return err + } + + req := pbpeering.PeeringTrustBundleDeleteRequest{ + Name: peerName, + Partition: entMeta.PartitionOrDefault(), + } + _, err = s.raftApplyProtobuf(structs.PeeringTrustBundleDeleteType, &req) + return err +} diff --git a/agent/consul/leader_peering_test.go b/agent/consul/leader_peering_test.go index feb520e146..e881b144ed 100644 --- a/agent/consul/leader_peering_test.go +++ b/agent/consul/leader_peering_test.go @@ -7,6 +7,8 @@ import ( "testing" "time" + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/api" "github.com/stretchr/testify/require" "google.golang.org/grpc" @@ -107,7 +109,7 @@ func TestLeader_PeeringSync_Lifecycle_ClientDeletion(t *testing.T) { Value: "my-peer-s2", }) require.NoError(r, err) - require.Equal(r, pbpeering.PeeringState_TERMINATED, peering.State) + require.Nil(r, peering) }) } @@ -196,6 +198,162 @@ func TestLeader_PeeringSync_Lifecycle_ServerDeletion(t *testing.T) { Value: "my-peer-s1", }) require.NoError(r, err) - require.Equal(r, pbpeering.PeeringState_TERMINATED, peering.State) + require.Nil(r, peering) }) } + +func TestLeader_Peering_DeferredDeletion(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + // TODO(peering): Configure with TLS + _, s1 := testServerWithConfig(t, func(c *Config) { + c.NodeName = "s1.dc1" + c.Datacenter = "dc1" + c.TLSConfig.Domain = "consul" + }) + testrpc.WaitForLeader(t, s1.RPC, "dc1") + + var ( + peerName = "my-peer-s2" + defaultMeta = acl.DefaultEnterpriseMeta() + lastIdx = uint64(0) + ) + + // Simulate a peering initiation event by writing a peering to the state store. + lastIdx++ + require.NoError(t, s1.fsm.State().PeeringWrite(lastIdx, &pbpeering.Peering{ + Name: peerName, + })) + + // Insert imported data: nodes, services, checks, trust bundle + lastIdx = insertTestPeeringData(t, s1.fsm.State(), peerName, lastIdx) + + // Mark the peering for deletion to trigger the termination sequence. + lastIdx++ + require.NoError(t, s1.fsm.State().PeeringWrite(lastIdx, &pbpeering.Peering{ + Name: peerName, + DeletedAt: structs.TimeToProto(time.Now()), + })) + + // Ensure imported data is gone: + retry.Run(t, func(r *retry.R) { + _, csn, err := s1.fsm.State().ServiceDump(nil, "", false, defaultMeta, peerName) + require.NoError(r, err) + require.Len(r, csn, 0) + + _, checks, err := s1.fsm.State().ChecksInState(nil, api.HealthAny, defaultMeta, peerName) + require.NoError(r, err) + require.Len(r, checks, 0) + + _, nodes, err := s1.fsm.State().NodeDump(nil, defaultMeta, peerName) + require.NoError(r, err) + require.Len(r, nodes, 0) + + _, tb, err := s1.fsm.State().PeeringTrustBundleRead(nil, state.Query{Value: peerName}) + require.NoError(r, err) + require.Nil(r, tb) + }) + + // The leader routine should pick up the deletion and finish deleting the peering. + retry.Run(t, func(r *retry.R) { + _, peering, err := s1.fsm.State().PeeringRead(nil, state.Query{ + Value: peerName, + }) + require.NoError(r, err) + require.Nil(r, peering) + }) +} + +func insertTestPeeringData(t *testing.T, store *state.Store, peer string, lastIdx uint64) uint64 { + lastIdx++ + require.NoError(t, store.PeeringTrustBundleWrite(lastIdx, &pbpeering.PeeringTrustBundle{ + TrustDomain: "952e6bd1-f4d6-47f7-83ff-84b31babaa17", + PeerName: peer, + RootPEMs: []string{"certificate bundle"}, + })) + + lastIdx++ + require.NoError(t, store.EnsureRegistration(lastIdx, &structs.RegisterRequest{ + Node: "aaa", + Address: "10.0.0.1", + PeerName: peer, + Service: &structs.NodeService{ + Service: "a-service", + ID: "a-service-1", + Port: 8080, + PeerName: peer, + }, + Checks: structs.HealthChecks{ + { + CheckID: "a-service-1-check", + ServiceName: "a-service", + ServiceID: "a-service-1", + Node: "aaa", + PeerName: peer, + }, + { + CheckID: structs.SerfCheckID, + Node: "aaa", + PeerName: peer, + }, + }, + })) + + lastIdx++ + require.NoError(t, store.EnsureRegistration(lastIdx, &structs.RegisterRequest{ + Node: "bbb", + Address: "10.0.0.2", + PeerName: peer, + Service: &structs.NodeService{ + Service: "b-service", + ID: "b-service-1", + Port: 8080, + PeerName: peer, + }, + Checks: structs.HealthChecks{ + { + CheckID: "b-service-1-check", + ServiceName: "b-service", + ServiceID: "b-service-1", + Node: "bbb", + PeerName: peer, + }, + { + CheckID: structs.SerfCheckID, + Node: "bbb", + PeerName: peer, + }, + }, + })) + + lastIdx++ + require.NoError(t, store.EnsureRegistration(lastIdx, &structs.RegisterRequest{ + Node: "ccc", + Address: "10.0.0.3", + PeerName: peer, + Service: &structs.NodeService{ + Service: "c-service", + ID: "c-service-1", + Port: 8080, + PeerName: peer, + }, + Checks: structs.HealthChecks{ + { + CheckID: "c-service-1-check", + ServiceName: "c-service", + ServiceID: "c-service-1", + Node: "ccc", + PeerName: peer, + }, + { + CheckID: structs.SerfCheckID, + Node: "ccc", + PeerName: peer, + }, + }, + })) + + return lastIdx +} diff --git a/agent/consul/server.go b/agent/consul/server.go index 344efb1679..0b8702cbae 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -126,6 +126,7 @@ const ( backgroundCAInitializationRoutineName = "CA initialization" virtualIPCheckRoutineName = "virtual IP version check" peeringStreamsRoutineName = "streaming peering resources" + peeringDeletionRoutineName = "peering deferred deletion" ) var ( diff --git a/agent/peering_endpoint_test.go b/agent/peering_endpoint_test.go index 412ea4ae44..07dd9bfaf8 100644 --- a/agent/peering_endpoint_test.go +++ b/agent/peering_endpoint_test.go @@ -12,13 +12,12 @@ import ( "testing" "time" - "github.com/stretchr/testify/require" - "google.golang.org/protobuf/encoding/protojson" - "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/proto/pbpeering" + "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/testrpc" + "github.com/stretchr/testify/require" ) var validCA = ` @@ -344,18 +343,14 @@ func TestHTTP_Peering_Delete(t *testing.T) { require.Equal(t, "", resp.Body.String()) }) - t.Run("now the token is marked for deletion", func(t *testing.T) { - req, err := http.NewRequest("GET", "/v1/peering/foo", nil) - require.NoError(t, err) - resp := httptest.NewRecorder() - a.srv.h.ServeHTTP(resp, req) - require.Equal(t, http.StatusOK, resp.Code) - - var p pbpeering.Peering - body, err := io.ReadAll(resp.Body) - require.NoError(t, err) - require.NoError(t, protojson.Unmarshal(body, &p)) - require.False(t, p.IsActive()) + t.Run("now the token is deleted and reads should yield a 404", func(t *testing.T) { + retry.Run(t, func(r *retry.R) { + req, err := http.NewRequest("GET", "/v1/peering/foo", nil) + require.NoError(t, err) + resp := httptest.NewRecorder() + a.srv.h.ServeHTTP(resp, req) + require.Equal(r, http.StatusNotFound, resp.Code) + }) }) t.Run("delete a token that does not exist", func(t *testing.T) { diff --git a/agent/rpc/peering/service_test.go b/agent/rpc/peering/service_test.go index 2cedfc80b2..1e462dd9e7 100644 --- a/agent/rpc/peering/service_test.go +++ b/agent/rpc/peering/service_test.go @@ -11,6 +11,7 @@ import ( "testing" "time" + "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-uuid" "github.com/stretchr/testify/require" @@ -297,11 +298,14 @@ func TestPeeringService_Delete(t *testing.T) { _, err = client.PeeringDelete(ctx, &pbpeering.PeeringDeleteRequest{Name: "foo"}) require.NoError(t, err) - // "foo" peering must only be marked for deletion, rather than actually be deleted. - _, resp, err := s.Server.FSM().State().PeeringRead(nil, state.Query{Value: "foo"}) - require.NoError(t, err) - require.NotNil(t, resp.DeletedAt) - require.False(t, resp.IsActive()) + retry.Run(t, func(r *retry.R) { + _, resp, err := s.Server.FSM().State().PeeringRead(nil, state.Query{Value: "foo"}) + require.NoError(r, err) + + // Initially the peering will be marked for deletion but eventually the leader + // routine will clean it up. + require.Nil(r, resp) + }) } func TestPeeringService_List(t *testing.T) { diff --git a/api/peering_test.go b/api/peering_test.go index ea93e63454..015da723f3 100644 --- a/api/peering_test.go +++ b/api/peering_test.go @@ -212,11 +212,12 @@ func TestAPI_Peering_GenerateToken_Read_Establish_Delete(t *testing.T) { require.NoError(t, err) require.NotNil(t, wm) - // Read to see if the token is marked for deletion - resp, qm, err := c.Peerings().Read(ctx, "peer1", nil) - require.NoError(t, err) - require.NotNil(t, qm) - require.NotNil(t, resp) - require.False(t, resp.DeletedAt.IsZero()) + // Read to see if the token is gone + retry.Run(t, func(r *retry.R) { + resp, qm, err := c.Peerings().Read(ctx, "peer1", nil) + require.NoError(r, err) + require.NotNil(r, qm) + require.Nil(r, resp) + }) }) } From f3843809da648a29fb4c0ae36de5af6ee80c912f Mon Sep 17 00:00:00 2001 From: freddygv Date: Tue, 14 Jun 2022 08:39:23 -0600 Subject: [PATCH 2/2] Avoid deleting peerings marked as terminated. When our peer deletes the peering it is locally marked as terminated. This termination should kick off deleting all imported data, but should not delete the peering object itself. Keeping peerings marked as terminated acts as a signal that the action took place. --- agent/consul/leader_peering.go | 35 +++++++++++++++++------------ agent/consul/leader_peering_test.go | 4 ++-- agent/peering_endpoint_test.go | 5 +++-- agent/rpc/peering/service_test.go | 2 +- 4 files changed, 27 insertions(+), 19 deletions(-) diff --git a/agent/consul/leader_peering.go b/agent/consul/leader_peering.go index dea09ebed5..f698e48ec8 100644 --- a/agent/consul/leader_peering.go +++ b/agent/consul/leader_peering.go @@ -120,7 +120,7 @@ func (s *Server) syncPeeringsAndBlock(ctx context.Context, logger hclog.Logger, logger.Trace("evaluating stored peer", "peer", peer.Name, "should_dial", peer.ShouldDial(), "sequence_id", seq) if !peer.IsActive() { - // The peering is marked for deletion, no need to dial or track them. + // The peering was marked for deletion by ourselves or our peer, no need to dial or track them. continue } @@ -334,22 +334,23 @@ func (s *Server) runPeeringDeletions(ctx context.Context) error { } for _, p := range peerings { - s.removePeeringAndData(ctx, logger, raftLimiter, p.Name, acl.PartitionOrDefault(p.Partition)) + s.removePeeringAndData(ctx, logger, raftLimiter, p) } } } // removepPeeringAndData removes data imported for a peering and the peering itself. -func (s *Server) removePeeringAndData(ctx context.Context, logger hclog.Logger, limiter *rate.Limiter, peer string, partition string) { - logger = logger.With("peer", peer, "partition", partition) +func (s *Server) removePeeringAndData(ctx context.Context, logger hclog.Logger, limiter *rate.Limiter, peer *pbpeering.Peering) { + logger = logger.With("peer_name", peer.Name, "peer_id", peer.ID) + entMeta := *structs.NodeEnterpriseMetaInPartition(peer.Partition) // First delete all imported data. // By deleting all imported nodes we also delete all services and checks registered on them. - if err := s.deleteAllNodes(ctx, limiter, *structs.NodeEnterpriseMetaInPartition(partition), peer); err != nil { + if err := s.deleteAllNodes(ctx, limiter, entMeta, peer.Name); err != nil { logger.Error("Failed to remove Nodes for peer", "error", err) return } - if err := s.deleteTrustBundleFromPeer(ctx, limiter, *structs.NodeEnterpriseMetaInPartition(partition), peer); err != nil { + if err := s.deleteTrustBundleFromPeer(ctx, limiter, entMeta, peer.Name); err != nil { logger.Error("Failed to remove trust bundle for peer", "error", err) return } @@ -358,12 +359,18 @@ func (s *Server) removePeeringAndData(ctx context.Context, logger hclog.Logger, return } - // Once all imported data is deleted, the peering itself is also deleted. - req := pbpeering.PeeringDeleteRequest{ - Name: peer, - Partition: partition, + if peer.State == pbpeering.PeeringState_TERMINATED { + // For peerings terminated by our peer we only clean up the local data, we do not delete the peering itself. + // This is to avoid a situation where the peering disappears without the local operator's knowledge. + return } - _, err := s.raftApplyProtobuf(structs.PeeringDeleteType, &req) + + // Once all imported data is deleted, the peering itself is also deleted. + req := &pbpeering.PeeringDeleteRequest{ + Name: peer.Name, + Partition: acl.PartitionOrDefault(peer.Partition), + } + _, err := s.raftApplyProtobuf(structs.PeeringDeleteType, req) if err != nil { logger.Error("failed to apply full peering deletion", "error", err) return @@ -402,7 +409,7 @@ func (s *Server) deleteAllNodes(ctx context.Context, limiter *rate.Limiter, entM ops = append(ops, &op) // Add entries to the transaction until it reaches the max batch size - batchSize += len(entry.Node) + len(entry.Partition) + batchSize += len(entry.Node) + len(entry.Partition) + len(entry.PeerName) } // Send each batch as a TXN Req to avoid sending one at a time @@ -441,10 +448,10 @@ func (s *Server) deleteTrustBundleFromPeer(ctx context.Context, limiter *rate.Li return err } - req := pbpeering.PeeringTrustBundleDeleteRequest{ + req := &pbpeering.PeeringTrustBundleDeleteRequest{ Name: peerName, Partition: entMeta.PartitionOrDefault(), } - _, err = s.raftApplyProtobuf(structs.PeeringTrustBundleDeleteType, &req) + _, err = s.raftApplyProtobuf(structs.PeeringTrustBundleDeleteType, req) return err } diff --git a/agent/consul/leader_peering_test.go b/agent/consul/leader_peering_test.go index e881b144ed..3e2f6c8ff9 100644 --- a/agent/consul/leader_peering_test.go +++ b/agent/consul/leader_peering_test.go @@ -109,7 +109,7 @@ func TestLeader_PeeringSync_Lifecycle_ClientDeletion(t *testing.T) { Value: "my-peer-s2", }) require.NoError(r, err) - require.Nil(r, peering) + require.Equal(r, pbpeering.PeeringState_TERMINATED, peering.State) }) } @@ -198,7 +198,7 @@ func TestLeader_PeeringSync_Lifecycle_ServerDeletion(t *testing.T) { Value: "my-peer-s1", }) require.NoError(r, err) - require.Nil(r, peering) + require.Equal(r, pbpeering.PeeringState_TERMINATED, peering.State) }) } diff --git a/agent/peering_endpoint_test.go b/agent/peering_endpoint_test.go index 07dd9bfaf8..b9974fd934 100644 --- a/agent/peering_endpoint_test.go +++ b/agent/peering_endpoint_test.go @@ -12,12 +12,13 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/proto/pbpeering" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/testrpc" - "github.com/stretchr/testify/require" ) var validCA = ` @@ -346,7 +347,7 @@ func TestHTTP_Peering_Delete(t *testing.T) { t.Run("now the token is deleted and reads should yield a 404", func(t *testing.T) { retry.Run(t, func(r *retry.R) { req, err := http.NewRequest("GET", "/v1/peering/foo", nil) - require.NoError(t, err) + require.NoError(r, err) resp := httptest.NewRecorder() a.srv.h.ServeHTTP(resp, req) require.Equal(r, http.StatusNotFound, resp.Code) diff --git a/agent/rpc/peering/service_test.go b/agent/rpc/peering/service_test.go index 1e462dd9e7..aba7973d00 100644 --- a/agent/rpc/peering/service_test.go +++ b/agent/rpc/peering/service_test.go @@ -11,7 +11,6 @@ import ( "testing" "time" - "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-uuid" "github.com/stretchr/testify/require" @@ -36,6 +35,7 @@ import ( "github.com/hashicorp/consul/proto/prototest" "github.com/hashicorp/consul/sdk/freeport" "github.com/hashicorp/consul/sdk/testutil" + "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/testrpc" "github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/consul/types"