fix: criteriaInterest mutex (#1205)

Co-authored-by: Pablo Lopez <p.lopez.lpz@gmail.com>
This commit is contained in:
richΛrd 2024-08-23 10:32:38 -04:00 committed by GitHub
parent 4c3ec60da5
commit 949684092e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 10 additions and 6 deletions

View File

@ -37,7 +37,7 @@ type MissingMessageVerifier struct {
messageTracker MessageTracker
criteriaInterest map[string]criteriaInterest // Track message verification requests and when was the last time a pubsub topic was verified for missing messages
criteriaInterestMu sync.Mutex
criteriaInterestMu sync.RWMutex
C <-chan *protocol.Envelope
@ -110,8 +110,13 @@ func (m *MissingMessageVerifier) Start(ctx context.Context) {
select {
case <-t.C:
m.logger.Debug("checking for missing messages...")
m.criteriaInterestMu.Lock()
for _, interest := range m.criteriaInterest {
m.criteriaInterestMu.RLock()
critIntList := make([]criteriaInterest, 0, len(m.criteriaInterest))
for _, value := range m.criteriaInterest {
critIntList = append(critIntList, value)
}
m.criteriaInterestMu.RUnlock()
for _, interest := range critIntList {
select {
case <-ctx.Done():
return
@ -123,7 +128,6 @@ func (m *MissingMessageVerifier) Start(ctx context.Context) {
}(interest)
}
}
m.criteriaInterestMu.Unlock()
case <-ctx.Done():
return
@ -155,8 +159,8 @@ func (m *MissingMessageVerifier) fetchHistory(c chan<- *protocol.Envelope, inter
}
m.criteriaInterestMu.Lock()
c := m.criteriaInterest[interest.contentFilter.PubsubTopic]
if c.equals(interest) {
c, ok := m.criteriaInterest[interest.contentFilter.PubsubTopic]
if ok && c.equals(interest) {
c.lastChecked = now
m.criteriaInterest[interest.contentFilter.PubsubTopic] = c
}