fix_: code review

This commit is contained in:
Richard Ramos 2024-11-21 09:36:31 -04:00
parent a708ce1b5d
commit 38dce5990a
No known key found for this signature in database
GPG Key ID: 1CE87DB518195760
1 changed files with 17 additions and 6 deletions

View File

@ -815,10 +815,11 @@ loop:
// If time range is greater than 24 hours, limit the range: to - (to-24h) // If time range is greater than 24 hours, limit the range: to - (to-24h)
from := w.from from := w.from
to := w.to to := w.to
nextWorkTo := to
exceeds24h := false
if batch.To-batch.From > uint32(oneDayDuration.Seconds()) { if batch.To-batch.From > uint32(oneDayDuration.Seconds()) {
from = to - uint32(oneDayDuration.Seconds()) from = to - uint32(oneDayDuration.Seconds())
nextWorkTo = from exceeds24h = true
} }
cursor, envelopesCount, err := messageRequester.SendMessagesRequestForTopics(queryCtx, storenodeID, from, to, w.cursor, w.pubsubTopic, w.contentTopics, w.limit, true, processEnvelopes) cursor, envelopesCount, err := messageRequester.SendMessagesRequestForTopics(queryCtx, storenodeID, from, to, w.cursor, w.pubsubTopic, w.contentTopics, w.limit, true, processEnvelopes)
@ -843,21 +844,31 @@ loop:
// Check the cursor after calling `shouldProcessNextPage`. // Check the cursor after calling `shouldProcessNextPage`.
// The app might use process the fetched envelopes in the callback for own needs. // The app might use process the fetched envelopes in the callback for own needs.
if cursor == nil { // If from/to does not exceed 24h and no cursor was returned, we have already
// requested the entire time range
if cursor == nil && !exceeds24h {
return return
} }
logger.Debug("processBatch producer - creating work (cursor)") logger.Debug("processBatch producer - creating work (cursor)")
workWg.Add(1) newWork := work{
workCh <- work{
pubsubTopic: w.pubsubTopic, pubsubTopic: w.pubsubTopic,
contentTopics: w.contentTopics, contentTopics: w.contentTopics,
cursor: cursor, cursor: cursor,
limit: nextPageLimit, limit: nextPageLimit,
from: w.from, from: w.from,
to: nextWorkTo, to: w.to,
} }
// If from/to has exceeded the 24h, but there are no more records within the current
// 24h range, then we update the `to` for the new work to not include it.
if cursor == nil && exceeds24h {
newWork.to = from
}
workWg.Add(1)
workCh <- newWork
}(w) }(w)
case err := <-errCh: case err := <-errCh:
logger.Debug("processBatch - received error", zap.Error(err)) logger.Debug("processBatch - received error", zap.Error(err))