nwaku/waku/node/delivery_monitor/recv_monitor.nim
Ivan FB 0f68274c85
chore: delivery monitor for store v3 reliability protocol (#2977)
- Use of observer observable pattern to inform delivery_monitor about subscription state
- send_monitor becomes a publish observer of lightpush and relay
- deliver monitor add more protection against possible crash and better logs
- creating a separate proc in store client for delivery monitor
2024-08-27 16:49:46 +02:00

197 lines
6.9 KiB
Nim

## This module is in charge of taking care of the messages that this node is expecting to
## receive and is backed by store-v3 requests to get an additional degree of certainty
##
import std/[tables, sequtils, sets, options]
import chronos, chronicles, libp2p/utility
import
../../waku_core,
./delivery_callback,
./subscriptions_observer,
../../waku_store/[client, common],
../../waku_filter_v2/client,
../../waku_core/topics
const StoreCheckPeriod = chronos.minutes(5) ## How often to perform store queries
const MaxMessageLife = chronos.minutes(7) ## Max time we will keep track of rx messages
const PruneOldMsgsPeriod = chronos.minutes(1)
const DelayExtra* = chronos.seconds(5)
## Additional security time to overlap the missing messages queries
type TupleHashAndMsg = tuple[hash: WakuMessageHash, msg: WakuMessage]
type RecvMessage = object
msgHash: WakuMessageHash
rxTime: Timestamp
## timestamp of the rx message. We will not keep the rx messages forever
type RecvMonitor* = ref object of SubscriptionObserver
topicsInterest: Table[PubsubTopic, seq[ContentTopic]]
## Tracks message verification requests and when was the last time a
## pubsub topic was verified for missing messages
## The key contains pubsub-topics
storeClient: WakuStoreClient
deliveryCb: DeliveryFeedbackCallback
recentReceivedMsgs: seq[RecvMessage]
msgCheckerHandler: Future[void] ## allows to stop the msgChecker async task
msgPrunerHandler: Future[void] ## removes too old messages
startTimeToCheck: Timestamp
endTimeToCheck: Timestamp
proc getMissingMsgsFromStore(
self: RecvMonitor, msgHashes: seq[WakuMessageHash]
): Future[Result[seq[TupleHashAndMsg], string]] {.async.} =
let storeResp: StoreQueryResponse = (
await self.storeClient.queryToAny(
StoreQueryRequest(includeData: true, messageHashes: msgHashes)
)
).valueOr:
return err("getMissingMsgsFromStore: " & $error)
let otherwiseMsg = WakuMessage()
## message to be returned if the Option message is none
return ok(
storeResp.messages.mapIt((hash: it.messageHash, msg: it.message.get(otherwiseMsg)))
)
proc performDeliveryFeedback(
self: RecvMonitor,
success: DeliverySuccess,
dir: DeliveryDirection,
comment: string,
msgHash: WakuMessageHash,
msg: WakuMessage,
) {.gcsafe, raises: [].} =
## This procs allows to bring delivery feedback to the API client
## It requires a 'deliveryCb' to be registered beforehand.
if self.deliveryCb.isNil():
error "deliveryCb is nil in performDeliveryFeedback",
success, dir, comment, msg_hash
return
debug "recv monitor performDeliveryFeedback",
success, dir, comment, msg_hash = shortLog(msgHash)
self.deliveryCb(success, dir, comment, msgHash, msg)
proc msgChecker(self: RecvMonitor) {.async.} =
## Continuously checks if a message has been received
while true:
await sleepAsync(StoreCheckPeriod)
self.endTimeToCheck = getNowInNanosecondTime()
var msgHashesInStore = newSeq[WakuMessageHash](0)
for pubsubTopic, cTopics in self.topicsInterest.pairs:
let storeResp: StoreQueryResponse = (
await self.storeClient.queryToAny(
StoreQueryRequest(
includeData: false,
pubsubTopic: some(PubsubTopic(pubsubTopic)),
contentTopics: cTopics,
startTime: some(self.startTimeToCheck - DelayExtra.nanos),
endTime: some(self.endTimeToCheck + DelayExtra.nanos),
)
)
).valueOr:
error "msgChecker failed to get remote msgHashes",
pubsubTopic, cTopics, error = $error
continue
msgHashesInStore.add(storeResp.messages.mapIt(it.messageHash))
## compare the msgHashes seen from the store vs the ones received directly
let rxMsgHashes = self.recentReceivedMsgs.mapIt(it.msgHash)
let missedHashes: seq[WakuMessageHash] =
msgHashesInStore.filterIt(not rxMsgHashes.contains(it))
## Now retrieve the missed WakuMessages
let missingMsgsRet = await self.getMissingMsgsFromStore(missedHashes)
if missingMsgsRet.isOk():
## Give feedback so that the api client can perfom any action with the missed messages
for msgTuple in missingMsgsRet.get():
self.performDeliveryFeedback(
DeliverySuccess.UNSUCCESSFUL, RECEIVING, "Missed message", msgTuple.hash,
msgTuple.msg,
)
else:
error "failed to retrieve missing messages: ", error = $missingMsgsRet.error
## update next check times
self.startTimeToCheck = self.endTimeToCheck
method onSubscribe(
self: RecvMonitor, pubsubTopic: string, contentTopics: seq[string]
) {.gcsafe, raises: [].} =
debug "onSubscribe", pubsubTopic, contentTopics
self.topicsInterest.withValue(pubsubTopic, contentTopicsOfInterest):
contentTopicsOfInterest[].add(contentTopics)
do:
self.topicsInterest[pubsubTopic] = contentTopics
method onUnsubscribe(
self: RecvMonitor, pubsubTopic: string, contentTopics: seq[string]
) {.gcsafe, raises: [].} =
debug "onUnsubscribe", pubsubTopic, contentTopics
self.topicsInterest.withValue(pubsubTopic, contentTopicsOfInterest):
let remainingCTopics =
contentTopicsOfInterest[].filterIt(not contentTopics.contains(it))
contentTopicsOfInterest[] = remainingCTopics
if remainingCTopics.len == 0:
self.topicsInterest.del(pubsubTopic)
do:
error "onUnsubscribe unsubscribing from wrong topic", pubsubTopic, contentTopics
proc new*(
T: type RecvMonitor,
storeClient: WakuStoreClient,
wakuFilterClient: WakuFilterClient,
): T =
## The storeClient will help to acquire any possible missed messages
let now = getNowInNanosecondTime()
var recvMonitor = RecvMonitor(storeClient: storeClient, startTimeToCheck: now)
if not wakuFilterClient.isNil():
wakuFilterClient.addSubscrObserver(recvMonitor)
let filterPushHandler = proc(
pubsubTopic: PubsubTopic, message: WakuMessage
) {.async, closure.} =
## Captures all the messages recived through filter
let msgHash = computeMessageHash(pubSubTopic, message)
let rxMsg = RecvMessage(msgHash: msgHash, rxTime: message.timestamp)
recvMonitor.recentReceivedMsgs.add(rxMsg)
wakuFilterClient.registerPushHandler(filterPushHandler)
return recvMonitor
proc loopPruneOldMessages(self: RecvMonitor) {.async.} =
while true:
let oldestAllowedTime = getNowInNanosecondTime() - MaxMessageLife.nanos
self.recentReceivedMsgs.keepItIf(it.rxTime > oldestAllowedTime)
await sleepAsync(PruneOldMsgsPeriod)
proc startRecvMonitor*(self: RecvMonitor) =
self.msgCheckerHandler = self.msgChecker()
self.msgPrunerHandler = self.loopPruneOldMessages()
proc stopRecvMonitor*(self: RecvMonitor) {.async.} =
if not self.msgCheckerHandler.isNil():
await self.msgCheckerHandler.cancelAndWait()
if not self.msgPrunerHandler.isNil():
await self.msgPrunerHandler.cancelAndWait()
proc setDeliveryCallback*(self: RecvMonitor, deliveryCb: DeliveryFeedbackCallback) =
self.deliveryCb = deliveryCb