mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-05-11 21:09:27 +00:00
add temporary debug logs recv_monitor
This commit is contained in:
parent
55c38c5070
commit
cd898d0f72
@ -114,6 +114,15 @@ proc msgChecker(self: RecvService) {.async.} =
|
||||
let missedHashes: seq[WakuMessageHash] =
|
||||
msgHashesInStore.filterIt(not rxMsgHashes.contains(it))
|
||||
|
||||
debug "AAAA msgChecker delivery discrepancy",
|
||||
store_hashes = msgHashesInStore.len,
|
||||
rx_hashes = rxMsgHashes.len,
|
||||
missed_hashes = missedHashes.len
|
||||
|
||||
if missedHashes.len > 0:
|
||||
debug "AAAA msgChecker detected missed hashes",
|
||||
missed_hashes = missedHashes.len, missed_hash_list = missedHashes
|
||||
|
||||
## Now retrieve the missed WakuMessages
|
||||
let missingMsgsRet = await self.getMissingMsgsFromStore(missedHashes)
|
||||
if missingMsgsRet.isOk():
|
||||
@ -137,10 +146,30 @@ proc processIncomingMessageOfInterest(
|
||||
## the MAPI MessageReceivedEvent for every unique incoming message.
|
||||
|
||||
let msgHash = computeMessageHash(pubsubTopic, message)
|
||||
if not self.recentReceivedMsgs.anyIt(it.msgHash == msgHash):
|
||||
let rxMsg = RecvMessage(msgHash: msgHash, rxTime: message.timestamp)
|
||||
self.recentReceivedMsgs.add(rxMsg)
|
||||
MessageReceivedEvent.emit(self.brokerCtx, msgHash.to0xHex(), message)
|
||||
debug "AAAA processIncomingMessageOfInterest received event",
|
||||
topic = pubsubTopic,
|
||||
contenttopic = message.contentTopic,
|
||||
msg_hash = shortLog(msgHash)
|
||||
|
||||
if self.recentReceivedMsgs.anyIt(it.msgHash == msgHash):
|
||||
debug "AAAA processIncomingMessageOfInterest duplicate message skipped",
|
||||
topic = pubsubTopic,
|
||||
contenttopic = message.contentTopic,
|
||||
msg_hash = shortLog(msgHash)
|
||||
return
|
||||
|
||||
let prevHashes = self.recentReceivedMsgs.mapIt(it.msgHash)
|
||||
let rxMsg = RecvMessage(msgHash: msgHash, rxTime: message.timestamp)
|
||||
self.recentReceivedMsgs.add(rxMsg)
|
||||
debug "AAAA recentReceivedMsgs added new message",
|
||||
topic = pubsubTopic,
|
||||
contenttopic = message.contentTopic,
|
||||
added_msg_hash = shortLog(msgHash),
|
||||
prev_entries = prevHashes.len,
|
||||
now_entries = self.recentReceivedMsgs.len,
|
||||
recent_hashes = self.recentReceivedMsgs.mapIt(it.msgHash)
|
||||
|
||||
MessageReceivedEvent.emit(self.brokerCtx, msgHash.to0xHex(), message)
|
||||
|
||||
proc new*(T: typedesc[RecvService], node: WakuNode, s: SubscriptionManager): T =
|
||||
## The storeClient will help to acquire any possible missed messages
|
||||
@ -159,7 +188,16 @@ proc new*(T: typedesc[RecvService], node: WakuNode, s: SubscriptionManager): T =
|
||||
proc loopPruneOldMessages(self: RecvService) {.async.} =
|
||||
while true:
|
||||
let oldestAllowedTime = getNowInNanosecondTime() - MaxMessageLife.nanos
|
||||
let prevHashes = self.recentReceivedMsgs.mapIt(it.msgHash)
|
||||
let prevEntries = prevHashes.len
|
||||
let hashesBeforePrune = self.recentReceivedMsgs.mapIt(it.msgHash)
|
||||
self.recentReceivedMsgs.keepItIf(it.rxTime > oldestAllowedTime)
|
||||
debug "AAAA loopPruneOldMessages",
|
||||
before_entries = prevEntries,
|
||||
after_entries = self.recentReceivedMsgs.len,
|
||||
pruned_entries = prevEntries - self.recentReceivedMsgs.len,
|
||||
remaining_hashes = self.recentReceivedMsgs.mapIt(it.msgHash),
|
||||
hashesBeforePrune = hashesBeforePrune
|
||||
await sleepAsync(PruneOldMsgsPeriod)
|
||||
|
||||
proc startRecvService*(self: RecvService) =
|
||||
@ -169,13 +207,31 @@ proc startRecvService*(self: RecvService) =
|
||||
self.seenMsgListener = MessageSeenEvent.listen(
|
||||
self.brokerCtx,
|
||||
proc(event: MessageSeenEvent) {.async: (raises: []).} =
|
||||
if not self.subscriptionManager.isSubscribed(
|
||||
event.topic, event.message.contentTopic
|
||||
):
|
||||
trace "skipping message as I am not subscribed",
|
||||
shard = event.topic, contenttopic = event.message.contentTopic
|
||||
let eventHash = computeMessageHash(event.topic, event.message)
|
||||
debug "AAAA MessageSeenEvent listener received event",
|
||||
topic = event.topic,
|
||||
contenttopic = event.message.contentTopic,
|
||||
msg_hash = shortLog(eventHash),
|
||||
timestamp = event.message.timestamp
|
||||
|
||||
let subscribed =
|
||||
self.subscriptionManager.isSubscribed(event.topic, event.message.contentTopic)
|
||||
debug "AAAA MessageSeenEvent subscription check",
|
||||
topic = event.topic,
|
||||
contenttopic = event.message.contentTopic,
|
||||
subscribed = subscribed
|
||||
|
||||
if not subscribed:
|
||||
debug "AAAA skipping MessageSeenEvent because not subscribed",
|
||||
shard = event.topic,
|
||||
contenttopic = event.message.contentTopic,
|
||||
msg_hash = shortLog(eventHash)
|
||||
return
|
||||
|
||||
debug "AAAA MessageSeenEvent accepted for processing",
|
||||
topic = event.topic,
|
||||
contenttopic = event.message.contentTopic,
|
||||
msg_hash = shortLog(eventHash)
|
||||
self.processIncomingMessageOfInterest(event.topic, event.message),
|
||||
).valueOr:
|
||||
error "Failed to set MessageSeenEvent listener", error = error
|
||||
|
||||
@ -92,6 +92,12 @@ proc registerRelayHandler(
|
||||
node.wakuStoreReconciliation.messageIngress(topic, msg)
|
||||
|
||||
proc internalHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
|
||||
let msgHash = topic.computeMessageHash(msg)
|
||||
debug "AAAA relay internalHandler emitting MessageSeenEvent",
|
||||
topic = topic,
|
||||
contenttopic = msg.contentTopic,
|
||||
msg_hash = shortLog(msgHash),
|
||||
payload_len = msg.payload.len
|
||||
MessageSeenEvent.emit(node.brokerCtx, topic, msg)
|
||||
|
||||
let uniqueTopicHandler = proc(
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user