mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-05-18 08:19:26 +00:00
stop recv_service from delivering messages on unsubscribed topics for store-recovered messages (#3874)
* fix/harden recv_service so it won't deliver messages on unsubscribed content topics * fix SubscrptionManager's subscribed-content-topics iterator * fix broken store-message-receive test * misc cleanups
This commit is contained in:
parent
f23983f488
commit
cb35b59f95
@ -138,7 +138,20 @@ suite "Messaging API, Receive Service (store recovery)":
|
||||
break
|
||||
await sleepAsync(100.milliseconds)
|
||||
|
||||
# publish before subscriber exists, gets archived
|
||||
# create the subscriber before publishing.
|
||||
# RecvService captures startTimeToCheck at construction time; the
|
||||
# message's timestamp must land after that point to fall inside
|
||||
# checkStore's time window.
|
||||
var subscriber: Waku
|
||||
lockNewGlobalBrokerContext:
|
||||
subscriber = (await createNode(createApiNodeConf(numShards))).expect(
|
||||
"Failed to create subscriber"
|
||||
)
|
||||
(await startWaku(addr subscriber)).expect("Failed to start subscriber")
|
||||
|
||||
# publish after the subscriber exists but before it connects to the
|
||||
# store; the message reaches the archive but the subscriber doesn't
|
||||
# see it via live relay.
|
||||
let missedPayload = "This message was missed".toBytes()
|
||||
let missedMsg = WakuMessage(
|
||||
payload: missedPayload, contentTopic: testTopic, version: 0, timestamp: now()
|
||||
@ -159,15 +172,8 @@ suite "Messaging API, Receive Service (store recovery)":
|
||||
await sleepAsync(100.milliseconds)
|
||||
raiseAssert "Message was not archived in time"
|
||||
|
||||
# create subscriber
|
||||
var subscriber: Waku
|
||||
lockNewGlobalBrokerContext:
|
||||
subscriber = (await createNode(createApiNodeConf(numShards))).expect(
|
||||
"Failed to create subscriber"
|
||||
)
|
||||
(await startWaku(addr subscriber)).expect("Failed to start subscriber")
|
||||
|
||||
# connect subscriber to store (not publisher, so msg won't come via relay to it)
|
||||
# connect subscriber to store after the message is already archived so
|
||||
# gossipsub doesn't replay it via the live path
|
||||
await subscriber.node.connectToNodes(@[storeNodePeerInfo])
|
||||
|
||||
# subscribe to content topic
|
||||
|
||||
@ -70,20 +70,30 @@ proc getMissingMsgsFromStore(
|
||||
)
|
||||
)
|
||||
|
||||
proc processIncomingMessageOfInterest(
|
||||
proc processIncomingMessage(
|
||||
self: RecvService, pubsubTopic: string, message: WakuMessage
|
||||
): bool =
|
||||
## Deduplicate (by hash), store (saves in recently-seen messages) and emit
|
||||
## the MAPI MessageReceivedEvent for every unique incoming message.
|
||||
## Returns true if the message was new and the MessageReceivedEvent was properly emitted.
|
||||
## Return false if the incoming message is from a non-subscribed topic,
|
||||
## or if the message is a duplicate (recently-seen). Otherwise, save it as
|
||||
## recently-seen, emit a MessageReceivedEvent, and return true.
|
||||
|
||||
if not self.subscriptionManager.isSubscribed(pubsubTopic, message.contentTopic):
|
||||
trace "skipping message as I am not subscribed",
|
||||
shard = pubsubTopic, contentTopic = message.contentTopic
|
||||
return false
|
||||
|
||||
let msgHash = computeMessageHash(pubsubTopic, message)
|
||||
if not self.recentReceivedMsgs.anyIt(it.msgHash == msgHash):
|
||||
let rxMsg = RecvMessage(msgHash: msgHash, rxTime: message.timestamp)
|
||||
self.recentReceivedMsgs.add(rxMsg)
|
||||
MessageReceivedEvent.emit(self.brokerCtx, msgHash.to0xHex(), message)
|
||||
return true
|
||||
return false
|
||||
if self.recentReceivedMsgs.anyIt(it.msgHash == msgHash):
|
||||
trace "skipping duplicate message",
|
||||
shard = pubsubTopic,
|
||||
contentTopic = message.contentTopic,
|
||||
msg_hash = msgHash.to0xHex()
|
||||
return false
|
||||
|
||||
let rxMsg = RecvMessage(msgHash: msgHash, rxTime: message.timestamp)
|
||||
self.recentReceivedMsgs.add(rxMsg)
|
||||
MessageReceivedEvent.emit(self.brokerCtx, msgHash.to0xHex(), message)
|
||||
return true
|
||||
|
||||
proc checkStore*(self: RecvService) {.async.} =
|
||||
## Checks the store for messages that were not received directly and
|
||||
@ -121,7 +131,7 @@ proc checkStore*(self: RecvService) {.async.} =
|
||||
let missingMsgsRet = await self.getMissingMsgsFromStore(missedHashes)
|
||||
if missingMsgsRet.isOk():
|
||||
for msgTuple in missingMsgsRet.get():
|
||||
if self.processIncomingMessageOfInterest(msgTuple.pubsubTopic, msgTuple.msg):
|
||||
if self.processIncomingMessage(msgTuple.pubsubTopic, msgTuple.msg):
|
||||
info "recv service store-recovered message",
|
||||
msg_hash = shortLog(msgTuple.hash), pubsubTopic = msgTuple.pubsubTopic
|
||||
else:
|
||||
@ -163,14 +173,7 @@ proc startRecvService*(self: RecvService) =
|
||||
self.seenMsgListener = MessageSeenEvent.listen(
|
||||
self.brokerCtx,
|
||||
proc(event: MessageSeenEvent) {.async: (raises: []).} =
|
||||
if not self.subscriptionManager.isSubscribed(
|
||||
event.topic, event.message.contentTopic
|
||||
):
|
||||
trace "skipping message as I am not subscribed",
|
||||
shard = event.topic, contenttopic = event.message.contentTopic
|
||||
return
|
||||
|
||||
discard self.processIncomingMessageOfInterest(event.topic, event.message),
|
||||
discard self.processIncomingMessage(event.topic, event.message),
|
||||
).valueOr:
|
||||
error "Failed to set MessageSeenEvent listener", error = error
|
||||
quit(QuitFailure)
|
||||
|
||||
@ -61,7 +61,16 @@ type SubscriptionManager* = ref object of RootObj
|
||||
iterator subscribedTopics*(
|
||||
self: SubscriptionManager
|
||||
): (PubsubTopic, HashSet[ContentTopic]) =
|
||||
## Iterate over all subscribed content topics, batched per shard.
|
||||
## This is guaranteed to return a non-empty `topics` (content topics) list on iteration.
|
||||
|
||||
for pubsub, topics in self.contentTopicSubs.pairs:
|
||||
# We are iterating over subscribed content topics; if we are subscribed to
|
||||
# a shard but have no subscription (interest) for any content topic in that
|
||||
# shard, then avoid triggering an iteration that doesn't advance the intent
|
||||
# to iterate over content topic subscriptions.
|
||||
if topics.len == 0:
|
||||
continue
|
||||
yield (pubsub, topics)
|
||||
|
||||
proc edgeFilterPeerCount*(sm: SubscriptionManager, shard: PubsubTopic): int =
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user