fix(communities/manager): make historyArchiveTasks a SyncMap (#3448)

Fixes #3447
This commit is contained in:
Jonathan Rainville 2023-05-05 12:40:18 -04:00 committed by GitHub
parent 4935848287
commit b7767ea63c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 39 additions and 19 deletions

View File

@ -75,7 +75,7 @@ type Manager struct {
torrentClient *torrent.Client torrentClient *torrent.Client
walletConfig *params.WalletConfig walletConfig *params.WalletConfig
historyArchiveTasksWaitGroup sync.WaitGroup historyArchiveTasksWaitGroup sync.WaitGroup
historyArchiveTasks map[string]chan struct{} historyArchiveTasks sync.Map // stores `chan struct{}`
periodicMemberPermissionsTasks map[string]chan struct{} periodicMemberPermissionsTasks map[string]chan struct{}
torrentTasks map[string]metainfo.Hash torrentTasks map[string]metainfo.Hash
historyArchiveDownloadTasks map[string]*HistoryArchiveDownloadTask historyArchiveDownloadTasks map[string]*HistoryArchiveDownloadTask
@ -157,7 +157,6 @@ func NewManager(identity *ecdsa.PrivateKey, db *sql.DB, encryptor *encryption.Pr
quit: make(chan struct{}), quit: make(chan struct{}),
transport: transport, transport: transport,
torrentConfig: torrentConfig, torrentConfig: torrentConfig,
historyArchiveTasks: make(map[string]chan struct{}),
periodicMemberPermissionsTasks: make(map[string]chan struct{}), periodicMemberPermissionsTasks: make(map[string]chan struct{}),
torrentTasks: make(map[string]metainfo.Hash), torrentTasks: make(map[string]metainfo.Hash),
historyArchiveDownloadTasks: make(map[string]*HistoryArchiveDownloadTask), historyArchiveDownloadTasks: make(map[string]*HistoryArchiveDownloadTask),
@ -2430,15 +2429,13 @@ func (m *Manager) CreateAndSeedHistoryArchive(communityID types.HexBytes, topics
func (m *Manager) StartHistoryArchiveTasksInterval(community *Community, interval time.Duration) { func (m *Manager) StartHistoryArchiveTasksInterval(community *Community, interval time.Duration) {
id := community.IDString() id := community.IDString()
_, exists := m.historyArchiveTasks[id] if _, exists := m.historyArchiveTasks.Load(id); exists {
if exists {
m.LogStdout("history archive tasks interval already in progres", zap.String("id", id)) m.LogStdout("history archive tasks interval already in progres", zap.String("id", id))
return return
} }
cancel := make(chan struct{}) cancel := make(chan struct{})
m.historyArchiveTasks[id] = cancel m.historyArchiveTasks.Store(id, cancel)
m.historyArchiveTasksWaitGroup.Add(1) m.historyArchiveTasksWaitGroup.Add(1)
ticker := time.NewTicker(interval) ticker := time.NewTicker(interval)
@ -2479,7 +2476,7 @@ func (m *Manager) StartHistoryArchiveTasksInterval(community *Community, interva
} }
case <-cancel: case <-cancel:
m.UnseedHistoryArchiveTorrent(community.ID()) m.UnseedHistoryArchiveTorrent(community.ID())
delete(m.historyArchiveTasks, id) m.historyArchiveTasks.Delete(id)
m.historyArchiveTasksWaitGroup.Done() m.historyArchiveTasksWaitGroup.Done()
return return
} }
@ -2487,9 +2484,10 @@ func (m *Manager) StartHistoryArchiveTasksInterval(community *Community, interva
} }
func (m *Manager) StopHistoryArchiveTasksIntervals() { func (m *Manager) StopHistoryArchiveTasksIntervals() {
for _, t := range m.historyArchiveTasks { m.historyArchiveTasks.Range(func(_, task interface{}) bool {
close(t) close(task.(chan struct{})) // Need to cast to the chan
} return true
})
// Stoping archive interval tasks is async, so we need // Stoping archive interval tasks is async, so we need
// to wait for all of them to be closed before we shutdown // to wait for all of them to be closed before we shutdown
// the torrent client // the torrent client
@ -2497,10 +2495,10 @@ func (m *Manager) StopHistoryArchiveTasksIntervals() {
} }
func (m *Manager) StopHistoryArchiveTasksInterval(communityID types.HexBytes) { func (m *Manager) StopHistoryArchiveTasksInterval(communityID types.HexBytes) {
task, ok := m.historyArchiveTasks[communityID.String()] task, exists := m.historyArchiveTasks.Load(communityID.String())
if ok { if exists {
m.logger.Info("Stopping history archive tasks interval", zap.Any("id", communityID.String())) m.logger.Info("Stopping history archive tasks interval", zap.Any("id", communityID.String()))
close(task) close(task.(chan struct{})) // Need to cast to the chan
} }
} }

View File

@ -52,6 +52,16 @@ func (s *ManagerSuite) SetupTest() {
s.manager = m s.manager = m
} }
func (s *ManagerSuite) getHistoryTaksCount() int {
// sync.Map doesn't have a Len function, so we need to count manually
count := 0
s.manager.historyArchiveTasks.Range(func(_, _ interface{}) bool {
count++
return true
})
return count
}
func (s *ManagerSuite) TestCreateCommunity() { func (s *ManagerSuite) TestCreateCommunity() {
request := &requests.CreateCommunity{ request := &requests.CreateCommunity{
@ -201,7 +211,9 @@ func (s *ManagerSuite) TestStartHistoryArchiveTasksInterval() {
// Due to async exec we need to wait a bit until we check // Due to async exec we need to wait a bit until we check
// the task count. // the task count.
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
s.Require().Len(s.manager.historyArchiveTasks, 1)
count := s.getHistoryTaksCount()
s.Require().Equal(count, 1)
// We wait another 5 seconds to ensure the first tick has kicked in // We wait another 5 seconds to ensure the first tick has kicked in
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
@ -211,7 +223,8 @@ func (s *ManagerSuite) TestStartHistoryArchiveTasksInterval() {
s.manager.StopHistoryArchiveTasksInterval(community.ID()) s.manager.StopHistoryArchiveTasksInterval(community.ID())
s.manager.historyArchiveTasksWaitGroup.Wait() s.manager.historyArchiveTasksWaitGroup.Wait()
s.Require().Len(s.manager.historyArchiveTasks, 0) count = s.getHistoryTaksCount()
s.Require().Equal(count, 0)
} }
func (s *ManagerSuite) TestStopHistoryArchiveTasksIntervals() { func (s *ManagerSuite) TestStopHistoryArchiveTasksIntervals() {
@ -230,9 +243,14 @@ func (s *ManagerSuite) TestStopHistoryArchiveTasksIntervals() {
go s.manager.StartHistoryArchiveTasksInterval(community, interval) go s.manager.StartHistoryArchiveTasksInterval(community, interval)
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
s.Require().Len(s.manager.historyArchiveTasks, 1)
count := s.getHistoryTaksCount()
s.Require().Equal(count, 1)
s.manager.StopHistoryArchiveTasksIntervals() s.manager.StopHistoryArchiveTasksIntervals()
s.Require().Len(s.manager.historyArchiveTasks, 0)
count = s.getHistoryTaksCount()
s.Require().Equal(count, 0)
} }
func (s *ManagerSuite) TestStopTorrentClient_ShouldStopHistoryArchiveTasks() { func (s *ManagerSuite) TestStopTorrentClient_ShouldStopHistoryArchiveTasks() {
@ -251,11 +269,15 @@ func (s *ManagerSuite) TestStopTorrentClient_ShouldStopHistoryArchiveTasks() {
// Due to async exec we need to wait a bit until we check // Due to async exec we need to wait a bit until we check
// the task count. // the task count.
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
s.Require().Len(s.manager.historyArchiveTasks, 1)
count := s.getHistoryTaksCount()
s.Require().Equal(count, 1)
errs := s.manager.StopTorrentClient() errs := s.manager.StopTorrentClient()
s.Require().Len(errs, 0) s.Require().Len(errs, 0)
s.Require().Len(s.manager.historyArchiveTasks, 0)
count = s.getHistoryTaksCount()
s.Require().Equal(count, 0)
} }
func (s *ManagerSuite) TestCreateHistoryArchiveTorrent_WithoutMessages() { func (s *ManagerSuite) TestCreateHistoryArchiveTorrent_WithoutMessages() {