mirror of
https://github.com/logos-messaging/logos-delivery.git
synced 2026-04-14 04:03:20 +00:00
Fixes from Ivan and Zoltan reviews
* process store results per subscribed topic instead of batching all * revert CI build fixes (moved to separate PR)
This commit is contained in:
parent
9f2346bd24
commit
db9c460b13
@ -90,7 +90,7 @@ proc checkStore*(self: RecvService) {.async.} =
|
||||
## delivers them via MessageReceivedEvent.
|
||||
self.endTimeToCheck = getNowInNanosecondTime()
|
||||
|
||||
var msgHashesInStore = newSeq[WakuMessageHash](0)
|
||||
## query store and deliver new recovered messages per subscribed topic
|
||||
for pubsubTopic, contentTopics in self.subscriptionManager.subscribedTopics:
|
||||
let storeResp: StoreQueryResponse = (
|
||||
await self.node.wakuStoreClient.queryToAny(
|
||||
@ -107,22 +107,21 @@ proc checkStore*(self: RecvService) {.async.} =
|
||||
pubsubTopic = pubsubTopic, cTopics = toSeq(contentTopics), error = $error
|
||||
continue
|
||||
|
||||
msgHashesInStore.add(storeResp.messages.mapIt(it.messageHash))
|
||||
## compare the msgHashes seen from the store vs the ones received directly
|
||||
let msgHashesInStore = storeResp.messages.mapIt(it.messageHash)
|
||||
let rxMsgHashes = self.recentReceivedMsgs.mapIt(it.msgHash)
|
||||
let missedHashes: seq[WakuMessageHash] =
|
||||
msgHashesInStore.filterIt(not rxMsgHashes.contains(it))
|
||||
|
||||
## compare the msgHashes seen from the store vs the ones received directly
|
||||
let rxMsgHashes = self.recentReceivedMsgs.mapIt(it.msgHash)
|
||||
let missedHashes: seq[WakuMessageHash] =
|
||||
msgHashesInStore.filterIt(not rxMsgHashes.contains(it))
|
||||
|
||||
## Now retrieve the missing WakuMessages and deliver them
|
||||
let missingMsgsRet = await self.getMissingMsgsFromStore(missedHashes)
|
||||
if missingMsgsRet.isOk():
|
||||
for msgTuple in missingMsgsRet.get():
|
||||
if self.processIncomingMessageOfInterest(msgTuple.pubsubTopic, msgTuple.msg):
|
||||
info "recv service store-recovered message",
|
||||
msg_hash = shortLog(msgTuple.hash), pubsubTopic = msgTuple.pubsubTopic
|
||||
else:
|
||||
error "failed to retrieve missing messages: ", error = $missingMsgsRet.error
|
||||
## Now retrieve the missing WakuMessages and deliver them
|
||||
let missingMsgsRet = await self.getMissingMsgsFromStore(missedHashes)
|
||||
if missingMsgsRet.isOk():
|
||||
for msgTuple in missingMsgsRet.get():
|
||||
if self.processIncomingMessageOfInterest(msgTuple.pubsubTopic, msgTuple.msg):
|
||||
info "recv service store-recovered message",
|
||||
msg_hash = shortLog(msgTuple.hash), pubsubTopic = msgTuple.pubsubTopic
|
||||
else:
|
||||
error "failed to retrieve missing messages: ", error = $missingMsgsRet.error
|
||||
|
||||
## update next check times
|
||||
self.startTimeToCheck = self.endTimeToCheck
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user