diff --git a/agent/grpc-external/services/peerstream/subscription_manager_test.go b/agent/grpc-external/services/peerstream/subscription_manager_test.go index 7c4025655f..89331070d4 100644 --- a/agent/grpc-external/services/peerstream/subscription_manager_test.go +++ b/agent/grpc-external/services/peerstream/subscription_manager_test.go @@ -3,6 +3,7 @@ package peerstream import ( "context" "sort" + "sync" "testing" "time" @@ -821,9 +822,6 @@ func setupTestPeering(t *testing.T, store *state.Store, name string, index uint6 } func newStateStore(t *testing.T, publisher *stream.EventPublisher) (*state.Store, *mockSnapshotHandler) { - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - gc, err := state.NewTombstoneGC(time.Second, time.Millisecond) require.NoError(t, err) @@ -834,7 +832,22 @@ func newStateStore(t *testing.T, publisher *stream.EventPublisher) (*state.Store require.NoError(t, publisher.RegisterHandler(state.EventTopicServiceHealthConnect, store.ServiceHealthSnapshot, false)) require.NoError(t, publisher.RegisterHandler(state.EventTopicCARoots, store.CARootsSnapshot, false)) require.NoError(t, publisher.RegisterHandler(autopilotevents.EventTopicReadyServers, handler.handle, false)) - go publisher.Run(ctx) + + // WaitGroup used to make sure that the publisher returns + // before handler's t.Cleanup is called (otherwise an event + // might fire during an assertion and cause a data race). + var wg sync.WaitGroup + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(func() { + cancel() + wg.Wait() + }) + + wg.Add(1) + go func() { + publisher.Run(ctx) + wg.Done() + }() return store, handler }