From fd6370f3fa7e15a62648c8ea7484397dd0b56a95 Mon Sep 17 00:00:00 2001 From: Fabiana Cecin Date: Tue, 24 Feb 2026 19:48:34 -0300 Subject: [PATCH] Implement sub/unsub/receive * Apply insights and constraints from early reviews * Add support for Core/Relay sub/unsub/receive * WIP Edge support (new EdgeDriver placeholder) * Hook MessageReceivedInternalEvent to Kernel API bus * Fix MAPI vs Kernel API unique relay handler support * RecvService delegating topic subs to SubscriptionService * SubscriptionService abstracts Core vs. Edge switching * RecvService emits MessageReceivedEvent (fully filtered) * Delete SubscriptionService.shardSubs * Track relay shard sub with empty contentTopicSubs val * Ensure relay shards never unsubscribed for Core for now --- tests/api/test_api_subscription.nim | 213 +++++++++++++-- waku/api/api.nim | 6 +- waku/events/message_events.nim | 10 +- .../recv_service/recv_service.nim | 79 ++---- .../delivery_service/subscription_service.nim | 248 ++++++++++-------- waku/node/edge_driver.nim | 3 + waku/node/edge_driver/edge_driver.nim | 28 ++ waku/node/kernel_api/relay.nim | 100 ++++--- waku/node/waku_node.nim | 5 + 9 files changed, 472 insertions(+), 220 deletions(-) create mode 100644 waku/node/edge_driver.nim create mode 100644 waku/node/edge_driver/edge_driver.nim diff --git a/tests/api/test_api_subscription.nim b/tests/api/test_api_subscription.nim index 5e14f88dd..8f2e300a7 100644 --- a/tests/api/test_api_subscription.nim +++ b/tests/api/test_api_subscription.nim @@ -6,10 +6,20 @@ import libp2p/[peerid, peerinfo, multiaddress, crypto/crypto] import ../testlib/[common, wakucore, wakunode, testasync] import - waku, waku/[waku_node, waku_core, common/broker/broker_context, events/message_events] + waku, + waku/[ + waku_node, + waku_core, + common/broker/broker_context, + events/message_events, + waku_relay/protocol, + ] import waku/api/api_conf, waku/factory/waku_conf +# TODO: Edge testing (after EdgeDriver is completed) + const TestTimeout = chronos.seconds(10) +const NegativeTestTimeout = chronos.seconds(2) const DefaultShard = PubsubTopic("/waku/2/rs/1/0") type ReceiveEventListenerManager = ref object @@ -69,26 +79,25 @@ proc setupSubscriberNode(conf: NodeConfig): Future[Waku] {.async.} = (await startWaku(addr node)).expect("Failed to start subscriber node") return node +proc waitForMesh*(node: WakuNode, shard: PubsubTopic) {.async.} = + for _ in 0 ..< 50: + if node.wakuRelay.getNumPeersInMesh(shard).valueOr(0) > 0: + return + await sleepAsync(100.milliseconds) + raise newException(ValueError, "GossipSub Mesh failed to stabilize") + proc publishWhenMeshReady( publisher: WakuNode, pubsubTopic: PubsubTopic, contentTopic: ContentTopic, payload: seq[byte], - maxRetries: int = 50, - retryDelay: Duration = 200.milliseconds, ): Future[Result[int, string]] {.async.} = - for _ in 0 ..< maxRetries: - let msg = WakuMessage( - payload: payload, contentTopic: contentTopic, version: 0, timestamp: now() - ) + await waitForMesh(publisher, pubsubTopic) - let publishRes = await publisher.publish(some(pubsubTopic), msg) - if publishRes.isOk() and publishRes.value > 0: - return publishRes - - await sleepAsync(retryDelay) - - return err("Timed out waiting for mesh") + let msg = WakuMessage( + payload: payload, contentTopic: contentTopic, version: 0, timestamp: now() + ) + return await publisher.publish(some(pubsubTopic), msg) suite "Messaging API, SubscriptionService": var @@ -111,9 +120,7 @@ suite "Messaging API, SubscriptionService": publisherPeerInfo = publisherNode.peerInfo.toRemotePeerInfo() publisherPeerId = publisherNode.peerInfo.peerId - proc dummyHandler( - topic: PubsubTopic, msg: WakuMessage - ): Future[void] {.async, gcsafe.} = + proc dummyHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = discard publisherNode.subscribe((kind: PubsubSub, topic: DefaultShard), dummyHandler).expect( @@ -142,19 +149,171 @@ suite "Messaging API, SubscriptionService": defer: eventManager.teardown() - const testMessageStr = "Hello, world!" - let msgPayload = testMessageStr.toBytes() + discard ( + await publishWhenMeshReady( + publisherNode, DefaultShard, testTopic, "Hello, world!".toBytes() + ) + ).expect("Publish failed") + + require await eventManager.waitForEvents(TestTimeout) + require eventManager.receivedMessages.len == 1 + check eventManager.receivedMessages[0].contentTopic == testTopic + + asyncTest "Subscription API, relay node ignores unsubscribed content topics on same shard": + subscriberNode = await setupSubscriberNode(createApiNodeConf(WakuMode.Core)) + await subscriberNode.node.connectToNodes(@[publisherPeerInfo]) + + let subbedTopic = ContentTopic("/waku/2/subbed-topic/proto") + let ignoredTopic = ContentTopic("/waku/2/ignored-topic/proto") + (await subscriberNode.subscribe(subbedTopic)).expect("failed to subscribe") + + let eventManager = newReceiveEventListenerManager(subscriberNode.brokerCtx, 1) + defer: + eventManager.teardown() discard ( - await publishWhenMeshReady(publisherNode, DefaultShard, testTopic, msgPayload) - ).expect("Timed out waiting for mesh to stabilize") + await publishWhenMeshReady( + publisherNode, DefaultShard, ignoredTopic, "Ghost Msg".toBytes() + ) + ).expect("Publish failed") - let receivedInTime = await eventManager.waitForEvents(TestTimeout) + check not await eventManager.waitForEvents(NegativeTestTimeout) + check eventManager.receivedMessages.len == 0 - # Hard abort if these conditions aren't met to prevent an IndexDefect below - require receivedInTime + asyncTest "Subscription API, relay node unsubscribe stops message receipt": + subscriberNode = await setupSubscriberNode(createApiNodeConf(WakuMode.Core)) + await subscriberNode.node.connectToNodes(@[publisherPeerInfo]) + let testTopic = ContentTopic("/waku/2/unsub-test/proto") + + (await subscriberNode.subscribe(testTopic)).expect("failed to subscribe") + subscriberNode.unsubscribe(testTopic).expect("failed to unsubscribe") + + let eventManager = newReceiveEventListenerManager(subscriberNode.brokerCtx, 1) + defer: + eventManager.teardown() + + discard ( + await publishWhenMeshReady( + publisherNode, DefaultShard, testTopic, "Should be dropped".toBytes() + ) + ).expect("Publish failed") + + check not await eventManager.waitForEvents(NegativeTestTimeout) + check eventManager.receivedMessages.len == 0 + + asyncTest "Subscription API, overlapping topics on same shard maintain correct isolation": + subscriberNode = await setupSubscriberNode(createApiNodeConf(WakuMode.Core)) + await subscriberNode.node.connectToNodes(@[publisherPeerInfo]) + + let topicA = ContentTopic("/waku/2/topic-a/proto") + let topicB = ContentTopic("/waku/2/topic-b/proto") + (await subscriberNode.subscribe(topicA)).expect("failed to sub A") + (await subscriberNode.subscribe(topicB)).expect("failed to sub B") + + let eventManager = newReceiveEventListenerManager(subscriberNode.brokerCtx, 1) + defer: + eventManager.teardown() + + await waitForMesh(publisherNode, DefaultShard) + + subscriberNode.unsubscribe(topicA).expect("failed to unsub A") + + discard ( + await publisherNode.publish( + some(DefaultShard), + WakuMessage( + payload: "Dropped Message".toBytes(), + contentTopic: topicA, + version: 0, + timestamp: now(), + ), + ) + ).expect("Publish A failed") + + discard ( + await publisherNode.publish( + some(DefaultShard), + WakuMessage( + payload: "Kept Msg".toBytes(), + contentTopic: topicB, + version: 0, + timestamp: now(), + ), + ) + ).expect("Publish B failed") + + require await eventManager.waitForEvents(TestTimeout) require eventManager.receivedMessages.len == 1 + check eventManager.receivedMessages[0].contentTopic == topicB - let receivedMsg = eventManager.receivedMessages[0] - check receivedMsg.contentTopic == testTopic - check string.fromBytes(receivedMsg.payload) == testMessageStr + asyncTest "Subscription API, redundant subs tolerated and subs are removed": + subscriberNode = await setupSubscriberNode(createApiNodeConf(WakuMode.Core)) + await subscriberNode.node.connectToNodes(@[publisherPeerInfo]) + let glitchTopic = ContentTopic("/waku/2/glitch/proto") + + (await subscriberNode.subscribe(glitchTopic)).expect("failed to sub") + (await subscriberNode.subscribe(glitchTopic)).expect("failed to double sub") + subscriberNode.unsubscribe(glitchTopic).expect("failed to unsub") + + let eventManager = newReceiveEventListenerManager(subscriberNode.brokerCtx, 1) + defer: + eventManager.teardown() + + discard ( + await publishWhenMeshReady( + publisherNode, DefaultShard, glitchTopic, "Ghost Msg".toBytes() + ) + ).expect("Publish failed") + + check not await eventManager.waitForEvents(NegativeTestTimeout) + check eventManager.receivedMessages.len == 0 + + asyncTest "Subscription API, resubscribe to an unsubscribed topic": + subscriberNode = await setupSubscriberNode(createApiNodeConf(WakuMode.Core)) + await subscriberNode.node.connectToNodes(@[publisherPeerInfo]) + let testTopic = ContentTopic("/waku/2/resub-test/proto") + + # Subscribe + (await subscriberNode.subscribe(testTopic)).expect("Initial sub failed") + + var eventManager = newReceiveEventListenerManager(subscriberNode.brokerCtx, 1) + discard ( + await publishWhenMeshReady( + publisherNode, DefaultShard, testTopic, "Msg 1".toBytes() + ) + ).expect("Pub 1 failed") + + require await eventManager.waitForEvents(TestTimeout) + eventManager.teardown() + + # Unsubscribe and verify teardown + subscriberNode.unsubscribe(testTopic).expect("Unsub failed") + eventManager = newReceiveEventListenerManager(subscriberNode.brokerCtx, 1) + + discard ( + await publisherNode.publish( + some(DefaultShard), + WakuMessage( + payload: "Ghost".toBytes(), + contentTopic: testTopic, + version: 0, + timestamp: now(), + ), + ) + ).expect("Ghost pub failed") + + check not await eventManager.waitForEvents(NegativeTestTimeout) + eventManager.teardown() + + # Resubscribe + (await subscriberNode.subscribe(testTopic)).expect("Resub failed") + eventManager = newReceiveEventListenerManager(subscriberNode.brokerCtx, 1) + + discard ( + await publishWhenMeshReady( + publisherNode, DefaultShard, testTopic, "Msg 2".toBytes() + ) + ).expect("Pub 2 failed") + + require await eventManager.waitForEvents(TestTimeout) + check eventManager.receivedMessages[0].payload == "Msg 2".toBytes() diff --git a/waku/api/api.nim b/waku/api/api.nim index 6b2535d4d..f48c39c35 100644 --- a/waku/api/api.nim +++ b/waku/api/api.nim @@ -53,9 +53,9 @@ proc send*( .valueOr(false) if not isSubbed: info "Auto-subscribing to topic on send", contentTopic = envelope.contentTopic - let subRes = w.deliveryService.subscriptionService.subscribe(envelope.contentTopic) - if subRes.isErr(): - warn "Failed to auto-subscribe", error = subRes.error + w.deliveryService.subscriptionService.subscribe(envelope.contentTopic).isOkOr: + warn "Failed to auto-subscribe", error = error + return err("Failed to auto-subscribe before sending: " & error) let requestId = RequestId.new(w.rng) diff --git a/waku/events/message_events.nim b/waku/events/message_events.nim index cf3dac9b7..bd38ee7bc 100644 --- a/waku/events/message_events.nim +++ b/waku/events/message_events.nim @@ -1,6 +1,4 @@ -import waku/common/broker/event_broker -import waku/api/types -import waku/waku_core/message +import waku/[api/types, waku_core/message, waku_core/topics, common/broker/event_broker] export types @@ -28,3 +26,9 @@ EventBroker: type MessageReceivedEvent* = object messageHash*: string message*: WakuMessage + +EventBroker: + # Internal event emitted when a message arrives from the network via any protocol + type MessageReceivedInternalEvent* = object + topic*: PubsubTopic + message*: WakuMessage diff --git a/waku/node/delivery_service/recv_service/recv_service.nim b/waku/node/delivery_service/recv_service/recv_service.nim index 12780033a..d95870c21 100644 --- a/waku/node/delivery_service/recv_service/recv_service.nim +++ b/waku/node/delivery_service/recv_service/recv_service.nim @@ -2,7 +2,7 @@ ## receive and is backed by store-v3 requests to get an additional degree of certainty ## -import std/[tables, sequtils, options] +import std/[tables, sequtils, options, sets] import chronos, chronicles, libp2p/utility import ../[subscription_service] import @@ -13,6 +13,7 @@ import waku_filter_v2/client, waku_core/topics, events/delivery_events, + events/message_events, waku_node, common/broker/broker_context, ] @@ -35,13 +36,8 @@ type RecvMessage = object type RecvService* = ref object of RootObj brokerCtx: BrokerContext - topicsInterest: Table[PubsubTopic, seq[ContentTopic]] - ## Tracks message verification requests and when was the last time a - ## pubsub topic was verified for missing messages - ## The key contains pubsub-topics node: WakuNode - onSubscribeListener: OnFilterSubscribeEventListener - onUnsubscribeListener: OnFilterUnsubscribeEventListener + internalMsgListener: MessageReceivedInternalEventListener subscriptionService: SubscriptionService recentReceivedMsgs: seq[RecvMessage] @@ -95,20 +91,20 @@ proc msgChecker(self: RecvService) {.async.} = self.endTimeToCheck = getNowInNanosecondTime() var msgHashesInStore = newSeq[WakuMessageHash](0) - for pubsubTopic, cTopics in self.topicsInterest.pairs: + for sub in self.subscriptionService.getActiveSubscriptions(): let storeResp: StoreQueryResponse = ( await self.node.wakuStoreClient.queryToAny( StoreQueryRequest( includeData: false, - pubsubTopic: some(PubsubTopic(pubsubTopic)), - contentTopics: cTopics, + pubsubTopic: some(PubsubTopic(sub.pubsubTopic)), + contentTopics: sub.contentTopics, startTime: some(self.startTimeToCheck - DelayExtra.nanos), endTime: some(self.endTimeToCheck + DelayExtra.nanos), ) ) ).valueOr: error "msgChecker failed to get remote msgHashes", - pubsubTopic, cTopics, error = $error + pubsubTopic = sub.pubsubTopic, cTopics = sub.contentTopics, error = $error continue msgHashesInStore.add(storeResp.messages.mapIt(it.messageHash)) @@ -133,29 +129,18 @@ proc msgChecker(self: RecvService) {.async.} = ## update next check times self.startTimeToCheck = self.endTimeToCheck -proc onSubscribe( - self: RecvService, pubsubTopic: string, contentTopics: seq[string] -) {.gcsafe, raises: [].} = - info "onSubscribe", pubsubTopic, contentTopics - self.topicsInterest.withValue(pubsubTopic, contentTopicsOfInterest): - contentTopicsOfInterest[].add(contentTopics) - do: - self.topicsInterest[pubsubTopic] = contentTopics +proc processIncomingMessageOfInterest( + self: RecvService, pubsubTopic: string, message: WakuMessage +) = + ## Resolve an incoming network message that was already filtered by topic. + ## Deduplicate (by hash), store (saves in recently-seen messages) and emit + ## the MAPI MessageReceivedEvent for every unique incoming message. -proc onUnsubscribe( - self: RecvService, pubsubTopic: string, contentTopics: seq[string] -) {.gcsafe, raises: [].} = - info "onUnsubscribe", pubsubTopic, contentTopics - - self.topicsInterest.withValue(pubsubTopic, contentTopicsOfInterest): - let remainingCTopics = - contentTopicsOfInterest[].filterIt(not contentTopics.contains(it)) - contentTopicsOfInterest[] = remainingCTopics - - if remainingCTopics.len == 0: - self.topicsInterest.del(pubsubTopic) - do: - error "onUnsubscribe unsubscribing from wrong topic", pubsubTopic, contentTopics + let msgHash = computeMessageHash(pubsubTopic, message) + if not self.recentReceivedMsgs.anyIt(it.msgHash == msgHash): + let rxMsg = RecvMessage(msgHash: msgHash, rxTime: message.timestamp) + self.recentReceivedMsgs.add(rxMsg) + MessageReceivedEvent.emit(self.brokerCtx, msgHash.to0xHex(), message) proc new*(T: typedesc[RecvService], node: WakuNode, s: SubscriptionService): T = ## The storeClient will help to acquire any possible missed messages @@ -166,7 +151,6 @@ proc new*(T: typedesc[RecvService], node: WakuNode, s: SubscriptionService): T = startTimeToCheck: now, brokerCtx: node.brokerCtx, subscriptionService: s, - topicsInterest: initTable[PubsubTopic, seq[ContentTopic]](), recentReceivedMsgs: @[], ) @@ -175,10 +159,7 @@ proc new*(T: typedesc[RecvService], node: WakuNode, s: SubscriptionService): T = pubsubTopic: PubsubTopic, message: WakuMessage ) {.async, closure.} = ## Captures all the messages received through filter - - let msgHash = computeMessageHash(pubSubTopic, message) - let rxMsg = RecvMessage(msgHash: msgHash, rxTime: message.timestamp) - recvService.recentReceivedMsgs.add(rxMsg) + recvService.processIncomingMessageOfInterest(pubSubTopic, message) node.wakuFilterClient.registerPushHandler(filterPushHandler) @@ -194,25 +175,21 @@ proc startRecvService*(self: RecvService) = self.msgCheckerHandler = self.msgChecker() self.msgPrunerHandler = self.loopPruneOldMessages() - self.onSubscribeListener = OnFilterSubscribeEvent.listen( + self.internalMsgListener = MessageReceivedInternalEvent.listen( self.brokerCtx, - proc(subsEv: OnFilterSubscribeEvent) {.async: (raises: []).} = - self.onSubscribe(subsEv.pubsubTopic, subsEv.contentTopics), - ).valueOr: - error "Failed to set OnFilterSubscribeEvent listener", error = error - quit(QuitFailure) + proc(event: MessageReceivedInternalEvent) {.async: (raises: []).} = + if not self.subscriptionService.isSubscribed( + event.topic, event.message.contentTopic + ): + return - self.onUnsubscribeListener = OnFilterUnsubscribeEvent.listen( - self.brokerCtx, - proc(subsEv: OnFilterUnsubscribeEvent) {.async: (raises: []).} = - self.onUnsubscribe(subsEv.pubsubTopic, subsEv.contentTopics), + self.processIncomingMessageOfInterest(event.topic, event.message), ).valueOr: - error "Failed to set OnFilterUnsubscribeEvent listener", error = error + error "Failed to set MessageReceivedInternalEvent listener", error = error quit(QuitFailure) proc stopRecvService*(self: RecvService) {.async.} = - OnFilterSubscribeEvent.dropListener(self.brokerCtx, self.onSubscribeListener) - OnFilterUnsubscribeEvent.dropListener(self.brokerCtx, self.onUnsubscribeListener) + MessageReceivedInternalEvent.dropListener(self.brokerCtx, self.internalMsgListener) if not self.msgCheckerHandler.isNil(): await self.msgCheckerHandler.cancelAndWait() if not self.msgPrunerHandler.isNil(): diff --git a/waku/node/delivery_service/subscription_service.nim b/waku/node/delivery_service/subscription_service.nim index 46ac2cb55..f373a206e 100644 --- a/waku/node/delivery_service/subscription_service.nim +++ b/waku/node/delivery_service/subscription_service.nim @@ -4,39 +4,109 @@ import waku_core, waku_core/topics, waku_core/topics/sharding, - events/message_events, waku_node, waku_relay, common/broker/broker_context, + events/delivery_events, + node/edge_driver, ] type SubscriptionService* = ref object of RootObj node: WakuNode - shardSubs: HashSet[PubsubTopic] contentTopicSubs: Table[PubsubTopic, HashSet[ContentTopic]] - relayHandler: WakuRelayHandler + ## Map of Shard to ContentTopic needed because e.g. WakuRelay is PubsubTopic only. + ## A present key with an empty HashSet value means pubsubtopic already subscribed + ## (via subscribeShard()) but there's no specific content topic interest yet. + filterSubListener: OnFilterSubscribeEventListener + filterUnsubListener: OnFilterUnsubscribeEventListener proc new*(T: typedesc[SubscriptionService], node: WakuNode): T = - let service = SubscriptionService( - node: node, - shardSubs: initHashSet[PubsubTopic](), - contentTopicSubs: initTable[PubsubTopic, HashSet[ContentTopic]](), + SubscriptionService( + node: node, contentTopicSubs: initTable[PubsubTopic, HashSet[ContentTopic]]() ) - service.relayHandler = proc( - topic: PubsubTopic, msg: WakuMessage - ) {.async.} = - if not service.contentTopicSubs.hasKey(topic) or - not service.contentTopicSubs[topic].contains(msg.contentTopic): - return +proc addContentTopicInterest( + self: SubscriptionService, shard: PubsubTopic, topic: ContentTopic +) = + try: + if not self.contentTopicSubs.hasKey(shard): + self.contentTopicSubs[shard] = initHashSet[ContentTopic]() - let msgHash = computeMessageHash(topic, msg).to0xHex() - info "MessageReceivedEvent", - pubsubTopic = topic, contentTopic = msg.contentTopic, msgHash = msgHash + if not self.contentTopicSubs[shard].contains(topic): + self.contentTopicSubs[shard].incl(topic) - MessageReceivedEvent.emit(service.node.brokerCtx, msgHash, msg) + # Always notify EdgeDriver if filter is mounted + if not isNil(self.node.wakuFilterClient): + self.node.edgeDriver.subscribe(shard, topic) + except KeyError: + discard - return service +proc removeContentTopicInterest( + self: SubscriptionService, shard: PubsubTopic, topic: ContentTopic +) = + try: + if self.contentTopicSubs.hasKey(shard) and + self.contentTopicSubs[shard].contains(topic): + self.contentTopicSubs[shard].excl(topic) + + # Only delete the shard tracking if we are not running a Relay. + # If Relay is mounted, we keep the empty HashSet to signal the relay shard sub. + if self.contentTopicSubs[shard].len == 0 and isNil(self.node.wakuRelay): + self.contentTopicSubs.del(shard) + + if not isNil(self.node.wakuFilterClient): + self.node.edgeDriver.unsubscribe(shard, topic) + except KeyError: + discard + +proc startProvidersAndListeners*(self: SubscriptionService): Result[void, string] = + self.filterSubListener = OnFilterSubscribeEvent.listen( + self.node.brokerCtx, + proc(event: OnFilterSubscribeEvent) {.async: (raises: []), gcsafe.} = + for cTopic in event.contentTopics: + self.addContentTopicInterest(event.pubsubTopic, cTopic), + ).valueOr: + return + err("SubscriptionService failed to listen to OnFilterSubscribeEvent: " & error) + + self.filterUnsubListener = OnFilterUnsubscribeEvent.listen( + self.node.brokerCtx, + proc(event: OnFilterUnsubscribeEvent) {.async: (raises: []), gcsafe.} = + for cTopic in event.contentTopics: + self.removeContentTopicInterest(event.pubsubTopic, cTopic), + ).valueOr: + return + err("SubscriptionService failed to listen to OnFilterUnsubscribeEvent: " & error) + + return ok() + +proc stopProvidersAndListeners*(self: SubscriptionService) = + OnFilterSubscribeEvent.dropListener(self.node.brokerCtx, self.filterSubListener) + OnFilterUnsubscribeEvent.dropListener(self.node.brokerCtx, self.filterUnsubListener) + +proc start*(self: SubscriptionService) = + self.startProvidersAndListeners().isOkOr: + error "Fatal error in SubscriptionService.startProvidersAndListeners(): ", + error = error + raise newException(ValueError, "SubscriptionService.start() failed: " & error) + +proc stop*(self: SubscriptionService) = + self.stopProvidersAndListeners() + +proc getActiveSubscriptions*( + self: SubscriptionService +): seq[tuple[pubsubTopic: string, contentTopics: seq[ContentTopic]]] = + var activeSubs: seq[tuple[pubsubTopic: string, contentTopics: seq[ContentTopic]]] = + @[] + + for pubsub, cTopicSet in self.contentTopicSubs.pairs: + if cTopicSet.len > 0: + var cTopicSeq = newSeqOfCap[ContentTopic](cTopicSet.len) + for t in cTopicSet: + cTopicSeq.add(t) + activeSubs.add((pubsub, cTopicSeq)) + + return activeSubs proc getShardForContentTopic( self: SubscriptionService, topic: ContentTopic @@ -45,123 +115,89 @@ proc getShardForContentTopic( let shardObj = ?self.node.wakuAutoSharding.get().getShard(topic) return ok($shardObj) - return - err("Manual sharding is not supported in this API. Autosharding must be enabled.") - -proc doSubscribe(self: SubscriptionService, shard: PubsubTopic): Result[void, string] = - self.node.subscribe((kind: PubsubSub, topic: shard), self.relayHandler).isOkOr: - error "Failed to subscribe to Relay shard", shard = shard, error = error - return err("Failed to subscribe: " & error) - return ok() - -proc doUnsubscribe( - self: SubscriptionService, shard: PubsubTopic -): Result[void, string] = - self.node.unsubscribe((kind: PubsubUnsub, topic: shard)).isOkOr: - error "Failed to unsubscribe from Relay shard", shard = shard, error = error - return err("Failed to unsubscribe: " & error) - return ok() + return err("SubscriptionService requires AutoSharding") proc isSubscribed*( self: SubscriptionService, topic: ContentTopic ): Result[bool, string] = - if self.node.wakuRelay.isNil(): - return err("SubscriptionService currently only supports Relay (Core) mode.") - let shard = ?self.getShardForContentTopic(topic) + return ok( + self.contentTopicSubs.hasKey(shard) and self.contentTopicSubs[shard].contains(topic) + ) - if self.contentTopicSubs.hasKey(shard) and self.contentTopicSubs[shard].contains( - topic - ): - return ok(true) - - return ok(false) - -proc subscribe*(self: SubscriptionService, topic: ContentTopic): Result[void, string] = - if self.node.wakuRelay.isNil(): - return err("SubscriptionService currently only supports Relay (Core) mode.") - - let shard = ?self.getShardForContentTopic(topic) - - let needShardSub = - not self.shardSubs.contains(shard) and not self.contentTopicSubs.hasKey(shard) - - if needShardSub: - ?self.doSubscribe(shard) - - self.contentTopicSubs.mgetOrPut(shard, initHashSet[ContentTopic]()).incl(topic) - - return ok() - -proc unsubscribe*( - self: SubscriptionService, topic: ContentTopic -): Result[void, string] = - if self.node.wakuRelay.isNil(): - return err("SubscriptionService currently only supports Relay (Core) mode.") - - let shard = ?self.getShardForContentTopic(topic) - - if self.contentTopicSubs.hasKey(shard) and self.contentTopicSubs[shard].contains( - topic - ): - let isLastTopic = self.contentTopicSubs[shard].len == 1 - let needShardUnsub = isLastTopic and not self.shardSubs.contains(shard) - - if needShardUnsub: - ?self.doUnsubscribe(shard) - - self.contentTopicSubs[shard].excl(topic) - if self.contentTopicSubs[shard].len == 0: - self.contentTopicSubs.del(shard) - - return ok() +proc isSubscribed*( + self: SubscriptionService, shard: PubsubTopic, contentTopic: ContentTopic +): bool {.raises: [].} = + try: + return + self.contentTopicSubs.hasKey(shard) and + self.contentTopicSubs[shard].contains(contentTopic) + except KeyError: + discard proc subscribeShard*( self: SubscriptionService, shards: seq[PubsubTopic] ): Result[void, string] = - if self.node.wakuRelay.isNil(): - return err("SubscriptionService currently only supports Relay (Core) mode.") + if isNil(self.node.wakuRelay): + return err("subscribeShard requires a Relay") var errors: seq[string] = @[] for shard in shards: - if not self.shardSubs.contains(shard): - let needShardSub = not self.contentTopicSubs.hasKey(shard) + if not self.contentTopicSubs.hasKey(shard): + self.node.subscribe((kind: PubsubSub, topic: shard), nil).isOkOr: + errors.add("shard " & shard & ": " & error) + continue - if needShardSub: - let res = self.doSubscribe(shard) - if res.isErr(): - errors.add("Shard " & shard & " failed: " & res.error) - continue - - self.shardSubs.incl(shard) + self.contentTopicSubs[shard] = initHashSet[ContentTopic]() if errors.len > 0: - return err("Batch subscribe had errors: " & errors.join("; ")) + return err("subscribeShard errors: " & errors.join("; ")) return ok() proc unsubscribeShard*( self: SubscriptionService, shards: seq[PubsubTopic] ): Result[void, string] = - if self.node.wakuRelay.isNil(): - return err("SubscriptionService currently only supports Relay (Core) mode.") + if isNil(self.node.wakuRelay): + return err("unsubscribeShard requires a Relay") var errors: seq[string] = @[] for shard in shards: - if self.shardSubs.contains(shard): - let needShardUnsub = not self.contentTopicSubs.hasKey(shard) + if self.contentTopicSubs.hasKey(shard): + self.node.unsubscribe((kind: PubsubUnsub, topic: shard)).isOkOr: + errors.add("shard " & shard & ": " & error) - if needShardUnsub: - let res = self.doUnsubscribe(shard) - if res.isErr(): - errors.add("Shard " & shard & " failed: " & res.error) - continue - - self.shardSubs.excl(shard) + self.contentTopicSubs.del(shard) if errors.len > 0: - return err("Batch unsubscribe had errors: " & errors.join("; ")) + return err("unsubscribeShard errors: " & errors.join("; ")) + + return ok() + +proc subscribe*(self: SubscriptionService, topic: ContentTopic): Result[void, string] = + if isNil(self.node.wakuRelay) and isNil(self.node.wakuFilterClient): + return err("SubscriptionService requires either Relay or Filter Client.") + + let shard = ?self.getShardForContentTopic(topic) + + if not isNil(self.node.wakuRelay) and not self.contentTopicSubs.hasKey(shard): + ?self.subscribeShard(@[shard]) + + self.addContentTopicInterest(shard, topic) + + return ok() + +proc unsubscribe*( + self: SubscriptionService, topic: ContentTopic +): Result[void, string] = + if isNil(self.node.wakuRelay) and isNil(self.node.wakuFilterClient): + return err("SubscriptionService requires either Relay or Filter Client.") + + let shard = ?self.getShardForContentTopic(topic) + + if self.isSubscribed(shard, topic): + self.removeContentTopicInterest(shard, topic) return ok() diff --git a/waku/node/edge_driver.nim b/waku/node/edge_driver.nim new file mode 100644 index 000000000..582cc6319 --- /dev/null +++ b/waku/node/edge_driver.nim @@ -0,0 +1,3 @@ +import ./edge_driver/edge_driver + +export edge_driver diff --git a/waku/node/edge_driver/edge_driver.nim b/waku/node/edge_driver/edge_driver.nim new file mode 100644 index 000000000..93e47350d --- /dev/null +++ b/waku/node/edge_driver/edge_driver.nim @@ -0,0 +1,28 @@ +{.push raises: [].} + +import chronicles, waku/waku_core/topics + +# Plan: +# - drive the continuous fulfillment and healing of edge peering and topic subscriptions +# - offload the edgeXXX stuff from WakuNode into this and finish it + +type EdgeDriver* = ref object of RootObj # TODO: bg worker, ... + +proc new*(T: typedesc[EdgeDriver]): T = + return EdgeDriver() + +proc start*(self: EdgeDriver) = + # TODO + debug "TODO: EdgeDriver: start bg worker" + +proc stop*(self: EdgeDriver) = + # TODO + debug "TODO: EdgeDriver: stop bg worker" + +proc subscribe*(self: EdgeDriver, shard: PubsubTopic, topic: ContentTopic) = + # TODO: this is an event that can be used to drive an event-driven edge health checker + debug "TODO: EdgeDriver: got subscribe notification", shard = shard, topic = topic + +proc unsubscribe*(self: EdgeDriver, shard: PubsubTopic, topic: ContentTopic) = + # TODO: this is an event that can be used to drive an event-driven edge health checker + debug "TODO: EdgeDriver: got unsubscribe notification", shard = shard, topic = topic diff --git a/waku/node/kernel_api/relay.nim b/waku/node/kernel_api/relay.nim index a0a128449..6a49c72ca 100644 --- a/waku/node/kernel_api/relay.nim +++ b/waku/node/kernel_api/relay.nim @@ -19,16 +19,20 @@ import libp2p/utility import - ../waku_node, - ../../waku_relay, - ../../waku_core, - ../../waku_core/topics/sharding, - ../../waku_filter_v2, - ../../waku_archive_legacy, - ../../waku_archive, - ../../waku_store_sync, - ../peer_manager, - ../../waku_rln_relay + waku/[ + waku_relay, + waku_core, + waku_core/topics/sharding, + waku_filter_v2, + waku_archive_legacy, + waku_archive, + waku_store_sync, + waku_rln_relay, + node/waku_node, + node/peer_manager, + common/broker/broker_context, + events/message_events, + ] export waku_relay.WakuRelayHandler @@ -44,14 +48,25 @@ logScope: ## Waku relay proc registerRelayHandler( - node: WakuNode, topic: PubsubTopic, appHandler: WakuRelayHandler -) = + node: WakuNode, topic: PubsubTopic, appHandler: WakuRelayHandler = nil +): bool = ## Registers the only handler for the given topic. ## Notice that this handler internally calls other handlers, such as filter, ## archive, etc, plus the handler provided by the application. + ## Returns `true` if a mesh subscription was created or `false` if the relay + ## was already subscribed to the topic. - if node.wakuRelay.isSubscribed(topic): - return + let alreadySubscribed = node.wakuRelay.isSubscribed(topic) + + if not appHandler.isNil(): + if not alreadySubscribed or not node.legacyAppHandlers.hasKey(topic): + node.legacyAppHandlers[topic] = appHandler + else: + debug "Legacy appHandler already exists for active PubsubTopic, ignoring new handler", + topic = topic + + if alreadySubscribed: + return false proc traceHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = let msgSizeKB = msg.payload.len / 1000 @@ -82,6 +97,9 @@ proc registerRelayHandler( node.wakuStoreReconciliation.messageIngress(topic, msg) + proc internalHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = + MessageReceivedInternalEvent.emit(node.brokerCtx, topic, msg) + let uniqueTopicHandler = proc( topic: PubsubTopic, msg: WakuMessage ): Future[void] {.async, gcsafe.} = @@ -89,7 +107,15 @@ proc registerRelayHandler( await filterHandler(topic, msg) await archiveHandler(topic, msg) await syncHandler(topic, msg) - await appHandler(topic, msg) + await internalHandler(topic, msg) + + # Call the legacy (kernel API) app handler if it exists. + # Normally, hasKey is false and the MessageReceivedInternalEvent bus (new API) is used instead. + # But we need to support legacy behavior (kernel API use), hence this. + # NOTE: We can delete `legacyAppHandlers` if instead we refactor WakuRelay to support multiple + # PubsubTopic handlers, since that's actually supported by libp2p PubSub (bigger refactor...) + if node.legacyAppHandlers.hasKey(topic) and not node.legacyAppHandlers[topic].isNil(): + await node.legacyAppHandlers[topic](topic, msg) node.wakuRelay.subscribe(topic, uniqueTopicHandler) @@ -115,8 +141,11 @@ proc subscribe*( ): Result[void, string] = ## Subscribes to a PubSub or Content topic. Triggers handler when receiving messages on ## this topic. WakuRelayHandler is a method that takes a topic and a Waku message. + ## If `handler` is nil, the API call will subscribe to the topic in the relay mesh + ## but no app handler will be registered at this time (it can be registered later with + ## another call to this proc for the same gossipsub topic). - if node.wakuRelay.isNil(): + if isNil(node.wakuRelay): error "Invalid API call to `subscribe`. WakuRelay not mounted." return err("Invalid API call to `subscribe`. WakuRelay not mounted.") @@ -124,13 +153,15 @@ proc subscribe*( error "Failed to decode subscription event", error = error return err("Failed to decode subscription event: " & error) - if node.wakuRelay.isSubscribed(pubsubTopic): - warn "No-effect API call to subscribe. Already subscribed to topic", pubsubTopic - return ok() - - info "subscribe", pubsubTopic, contentTopicOp - node.registerRelayHandler(pubsubTopic, handler) - node.topicSubscriptionQueue.emit((kind: PubsubSub, topic: pubsubTopic)) + if node.registerRelayHandler(pubsubTopic, handler): + info "subscribe", pubsubTopic, contentTopicOp + node.topicSubscriptionQueue.emit((kind: PubsubSub, topic: pubsubTopic)) + else: + if isNil(handler): + warn "No-effect API call to subscribe. Already subscribed to topic", pubsubTopic + else: + info "subscribe (was already subscribed in the mesh; appHandler set)", + pubsubTopic = pubsubTopic return ok() @@ -138,8 +169,10 @@ proc unsubscribe*( node: WakuNode, subscription: SubscriptionEvent ): Result[void, string] = ## Unsubscribes from a specific PubSub or Content topic. + ## This will both unsubscribe from the relay mesh and remove the app handler, if any. + ## NOTE: This works because using MAPI and Kernel API at the same time is unsupported. - if node.wakuRelay.isNil(): + if isNil(node.wakuRelay): error "Invalid API call to `unsubscribe`. WakuRelay not mounted." return err("Invalid API call to `unsubscribe`. WakuRelay not mounted.") @@ -147,13 +180,20 @@ proc unsubscribe*( error "Failed to decode unsubscribe event", error = error return err("Failed to decode unsubscribe event: " & error) - if not node.wakuRelay.isSubscribed(pubsubTopic): - warn "No-effect API call to `unsubscribe`. Was not subscribed", pubsubTopic - return ok() + let hadHandler = node.legacyAppHandlers.hasKey(pubsubTopic) + if hadHandler: + node.legacyAppHandlers.del(pubsubTopic) - info "unsubscribe", pubsubTopic, contentTopicOp - node.wakuRelay.unsubscribe(pubsubTopic) - node.topicSubscriptionQueue.emit((kind: PubsubUnsub, topic: pubsubTopic)) + if node.wakuRelay.isSubscribed(pubsubTopic): + info "unsubscribe", pubsubTopic, contentTopicOp + node.wakuRelay.unsubscribe(pubsubTopic) + node.topicSubscriptionQueue.emit((kind: PubsubUnsub, topic: pubsubTopic)) + else: + if not hadHandler: + warn "No-effect API call to `unsubscribe`. Was not subscribed", pubsubTopic + else: + info "unsubscribe (was not subscribed in the mesh; appHandler removed)", + pubsubTopic = pubsubTopic return ok() diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 53ce0349a..252fbbc73 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -68,6 +68,7 @@ import ], ./net_config, ./peer_manager, + ./edge_driver, ./health_monitor/health_status, ./health_monitor/topic_health @@ -113,6 +114,7 @@ type WakuNode* = ref object peerManager*: PeerManager switch*: Switch + edgeDriver*: EdgeDriver wakuRelay*: WakuRelay wakuArchive*: waku_archive.WakuArchive wakuLegacyArchive*: waku_archive_legacy.WakuArchive @@ -144,6 +146,8 @@ type started*: bool # Indicates that node has started listening topicSubscriptionQueue*: AsyncEventQueue[SubscriptionEvent] rateLimitSettings*: ProtocolRateLimitSettings + legacyAppHandlers*: Table[PubsubTopic, WakuRelayHandler] + ## Kernel API Relay appHandlers (if any) wakuMix*: WakuMix edgeTopicsHealth*: Table[PubsubTopic, TopicHealth] edgeHealthEvent*: AsyncEvent @@ -227,6 +231,7 @@ proc new*( let node = WakuNode( peerManager: peerManager, switch: switch, + edgeDriver: EdgeDriver.new(), rng: rng, brokerCtx: brokerCtx, enr: enr,