fix(communities): ensure archive import is only cancelled if not already marked as such
There were cases where this caused a crash, as handling magnetlinks would try to close an already closed tasked channel See https://github.com/status-im/status-desktop/issues/8996 for more information. This commit extends the task struct such that it can be marked as cancelled and safely read and written by multiple go routines.
This commit is contained in:
parent
b1960a6ca9
commit
b5964348a1
|
@ -61,8 +61,23 @@ type Manager struct {
|
|||
}
|
||||
|
||||
type HistoryArchiveDownloadTask struct {
|
||||
Cancel chan struct{}
|
||||
Waiter sync.WaitGroup
|
||||
CancelChan chan struct{}
|
||||
Waiter sync.WaitGroup
|
||||
m sync.RWMutex
|
||||
Cancelled bool
|
||||
}
|
||||
|
||||
func (t *HistoryArchiveDownloadTask) IsCancelled() bool {
|
||||
t.m.RLock()
|
||||
defer t.m.RUnlock()
|
||||
return t.Cancelled
|
||||
}
|
||||
|
||||
func (t *HistoryArchiveDownloadTask) Cancel() {
|
||||
t.m.Lock()
|
||||
defer t.m.Unlock()
|
||||
t.Cancelled = true
|
||||
close(t.CancelChan)
|
||||
}
|
||||
|
||||
func NewManager(identity *ecdsa.PrivateKey, db *sql.DB, encryptor *encryption.Protocol, logger *zap.Logger, verifier *ens.Verifier, transport *transport.Transport, torrentConfig *params.TorrentConfig) (*Manager, error) {
|
||||
|
|
|
@ -1999,8 +1999,9 @@ func (m *Messenger) resumeHistoryArchivesImport(communityID types.HexBytes) erro
|
|||
|
||||
// Create new task
|
||||
task := &communities.HistoryArchiveDownloadTask{
|
||||
Cancel: make(chan struct{}),
|
||||
Waiter: *new(sync.WaitGroup),
|
||||
CancelChan: make(chan struct{}),
|
||||
Waiter: *new(sync.WaitGroup),
|
||||
Cancelled: false,
|
||||
}
|
||||
|
||||
m.communitiesManager.AddHistoryArchiveDownloadTask(communityID.String(), task)
|
||||
|
@ -2009,11 +2010,8 @@ func (m *Messenger) resumeHistoryArchivesImport(communityID types.HexBytes) erro
|
|||
task.Waiter.Add(1)
|
||||
|
||||
go func() {
|
||||
defer func() {
|
||||
task.Waiter.Done()
|
||||
m.communitiesManager.DeleteHistoryArchiveDownloadTask(communityID.String())
|
||||
}()
|
||||
err := m.importHistoryArchives(communityID, task.Cancel)
|
||||
defer task.Waiter.Done()
|
||||
err := m.importHistoryArchives(communityID, task.CancelChan)
|
||||
if err != nil {
|
||||
m.communitiesManager.LogStdout("failed to import history archives", zap.Error(err))
|
||||
}
|
||||
|
|
|
@ -941,30 +941,28 @@ func (m *Messenger) HandleHistoryArchiveMagnetlinkMessage(state *ReceivedMessage
|
|||
go func(currentTask *communities.HistoryArchiveDownloadTask, communityID types.HexBytes) {
|
||||
|
||||
// Cancel ongoing download/import task
|
||||
if currentTask != nil {
|
||||
close(currentTask.Cancel)
|
||||
if currentTask != nil && !currentTask.IsCancelled() {
|
||||
currentTask.Cancel()
|
||||
currentTask.Waiter.Wait()
|
||||
}
|
||||
|
||||
// Create new task
|
||||
task := &communities.HistoryArchiveDownloadTask{
|
||||
Cancel: make(chan struct{}),
|
||||
Waiter: *new(sync.WaitGroup),
|
||||
CancelChan: make(chan struct{}),
|
||||
Waiter: *new(sync.WaitGroup),
|
||||
Cancelled: false,
|
||||
}
|
||||
|
||||
m.communitiesManager.AddHistoryArchiveDownloadTask(communityID.String(), task)
|
||||
|
||||
// this wait groups tracks the ongoing task for a particular community
|
||||
task.Waiter.Add(1)
|
||||
defer func() {
|
||||
task.Waiter.Done()
|
||||
m.communitiesManager.DeleteHistoryArchiveDownloadTask(communityID.String())
|
||||
}()
|
||||
defer task.Waiter.Done()
|
||||
|
||||
// this wait groups tracks all ongoing tasks across communities
|
||||
m.downloadHistoryArchiveTasksWaitGroup.Add(1)
|
||||
defer m.downloadHistoryArchiveTasksWaitGroup.Done()
|
||||
m.downloadAndImportHistoryArchives(communityID, magnetlink, task.Cancel)
|
||||
m.downloadAndImportHistoryArchives(communityID, magnetlink, task.CancelChan)
|
||||
}(currentTask, id)
|
||||
|
||||
return m.communitiesManager.UpdateMagnetlinkMessageClock(id, clock)
|
||||
|
@ -1186,14 +1184,15 @@ func (m *Messenger) HandleCommunityRequestToJoinResponse(state *ReceivedMessageS
|
|||
go func(currentTask *communities.HistoryArchiveDownloadTask) {
|
||||
|
||||
// Cancel ongoing download/import task
|
||||
if currentTask != nil {
|
||||
close(currentTask.Cancel)
|
||||
if currentTask != nil && !currentTask.IsCancelled() {
|
||||
currentTask.Cancel()
|
||||
currentTask.Waiter.Wait()
|
||||
}
|
||||
|
||||
task := &communities.HistoryArchiveDownloadTask{
|
||||
Cancel: make(chan struct{}),
|
||||
Waiter: *new(sync.WaitGroup),
|
||||
CancelChan: make(chan struct{}),
|
||||
Waiter: *new(sync.WaitGroup),
|
||||
Cancelled: false,
|
||||
}
|
||||
m.communitiesManager.AddHistoryArchiveDownloadTask(community.IDString(), task)
|
||||
|
||||
|
@ -1203,7 +1202,7 @@ func (m *Messenger) HandleCommunityRequestToJoinResponse(state *ReceivedMessageS
|
|||
m.downloadHistoryArchiveTasksWaitGroup.Add(1)
|
||||
defer m.downloadHistoryArchiveTasksWaitGroup.Done()
|
||||
|
||||
m.downloadAndImportHistoryArchives(community.ID(), magnetlink, task.Cancel)
|
||||
m.downloadAndImportHistoryArchives(community.ID(), magnetlink, task.CancelChan)
|
||||
}(currentTask)
|
||||
|
||||
clock := requestToJoinResponseProto.Community.ArchiveMagnetlinkClock
|
||||
|
|
Loading…
Reference in New Issue