mirror of
https://github.com/status-im/go-waku.git
synced 2025-01-13 15:24:46 +00:00
fix: update criteria context on missing msg verifier start (#1268)
This commit is contained in:
parent
78b522db50
commit
5893927f1b
@ -41,7 +41,7 @@ type MissingMessageVerifier struct {
|
|||||||
storenodeRequestor common.StorenodeRequestor
|
storenodeRequestor common.StorenodeRequestor
|
||||||
messageTracker MessageTracker
|
messageTracker MessageTracker
|
||||||
|
|
||||||
criteriaInterest map[string]criteriaInterest // Track message verification requests and when was the last time a pubsub topic was verified for missing messages
|
criteriaInterest map[string]*criteriaInterest // Track message verification requests and when was the last time a pubsub topic was verified for missing messages
|
||||||
criteriaInterestMu sync.RWMutex
|
criteriaInterestMu sync.RWMutex
|
||||||
|
|
||||||
C chan *protocol.Envelope
|
C chan *protocol.Envelope
|
||||||
@ -66,7 +66,7 @@ func NewMissingMessageVerifier(storenodeRequester common.StorenodeRequestor, mes
|
|||||||
messageTracker: messageTracker,
|
messageTracker: messageTracker,
|
||||||
logger: logger.Named("missing-msg-verifier"),
|
logger: logger.Named("missing-msg-verifier"),
|
||||||
params: params,
|
params: params,
|
||||||
criteriaInterest: make(map[string]criteriaInterest),
|
criteriaInterest: make(map[string]*criteriaInterest),
|
||||||
C: make(chan *protocol.Envelope, 1000),
|
C: make(chan *protocol.Envelope, 1000),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -99,7 +99,7 @@ func (m *MissingMessageVerifier) SetCriteriaInterest(peerID peer.ID, contentFilt
|
|||||||
currMessageVerificationRequest.cancel()
|
currMessageVerificationRequest.cancel()
|
||||||
}
|
}
|
||||||
|
|
||||||
m.criteriaInterest[contentFilter.PubsubTopic] = criteriaInterest
|
m.criteriaInterest[contentFilter.PubsubTopic] = &criteriaInterest
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MissingMessageVerifier) setRunning(running bool) {
|
func (m *MissingMessageVerifier) setRunning(running bool) {
|
||||||
@ -121,6 +121,15 @@ func (m *MissingMessageVerifier) Start(ctx context.Context) {
|
|||||||
m.ctx = ctx
|
m.ctx = ctx
|
||||||
m.cancel = cancelFunc
|
m.cancel = cancelFunc
|
||||||
|
|
||||||
|
// updating context for existing criteria
|
||||||
|
m.criteriaInterestMu.Lock()
|
||||||
|
for _, value := range m.criteriaInterest {
|
||||||
|
ctx, cancel := context.WithCancel(m.ctx)
|
||||||
|
value.ctx = ctx
|
||||||
|
value.cancel = cancel
|
||||||
|
}
|
||||||
|
m.criteriaInterestMu.Unlock()
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
defer utils.LogOnPanic()
|
defer utils.LogOnPanic()
|
||||||
t := time.NewTicker(m.params.interval)
|
t := time.NewTicker(m.params.interval)
|
||||||
@ -134,7 +143,7 @@ func (m *MissingMessageVerifier) Start(ctx context.Context) {
|
|||||||
m.criteriaInterestMu.RLock()
|
m.criteriaInterestMu.RLock()
|
||||||
critIntList := make([]criteriaInterest, 0, len(m.criteriaInterest))
|
critIntList := make([]criteriaInterest, 0, len(m.criteriaInterest))
|
||||||
for _, value := range m.criteriaInterest {
|
for _, value := range m.criteriaInterest {
|
||||||
critIntList = append(critIntList, value)
|
critIntList = append(critIntList, *value)
|
||||||
}
|
}
|
||||||
m.criteriaInterestMu.RUnlock()
|
m.criteriaInterestMu.RUnlock()
|
||||||
for _, interest := range critIntList {
|
for _, interest := range critIntList {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user