From 964f0e131a8dea367bbc81971436af4f2cd4a658 Mon Sep 17 00:00:00 2001 From: Fabiana Cecin Date: Fri, 27 Feb 2026 10:46:56 -0300 Subject: [PATCH] Fixes from pair session with Ivan --- tests/api/test_api_subscription.nim | 2 +- waku/events/message_events.nim | 2 +- waku/factory/waku.nim | 31 --- .../delivery_service/delivery_service.nim | 4 +- .../recv_service/recv_service.nim | 10 +- .../delivery_service/subscription_service.nim | 184 ++++++++---------- waku/node/edge_driver.nim | 3 - waku/node/edge_driver/edge_driver.nim | 28 --- waku/node/kernel_api/relay.nim | 4 +- waku/node/waku_node.nim | 3 - 10 files changed, 88 insertions(+), 183 deletions(-) delete mode 100644 waku/node/edge_driver.nim delete mode 100644 waku/node/edge_driver/edge_driver.nim diff --git a/tests/api/test_api_subscription.nim b/tests/api/test_api_subscription.nim index 8f2e300a7..770dcd12d 100644 --- a/tests/api/test_api_subscription.nim +++ b/tests/api/test_api_subscription.nim @@ -16,7 +16,7 @@ import ] import waku/api/api_conf, waku/factory/waku_conf -# TODO: Edge testing (after EdgeDriver is completed) +# TODO: Edge testing (after MAPI edge support is completed) const TestTimeout = chronos.seconds(10) const NegativeTestTimeout = chronos.seconds(2) diff --git a/waku/events/message_events.nim b/waku/events/message_events.nim index bd38ee7bc..677a4a433 100644 --- a/waku/events/message_events.nim +++ b/waku/events/message_events.nim @@ -29,6 +29,6 @@ EventBroker: EventBroker: # Internal event emitted when a message arrives from the network via any protocol - type MessageReceivedInternalEvent* = object + type MessageSeenEvent* = object topic*: PubsubTopic message*: WakuMessage diff --git a/waku/factory/waku.nim b/waku/factory/waku.nim index 1af011475..c46f92fa8 100644 --- a/waku/factory/waku.nim +++ b/waku/factory/waku.nim @@ -420,37 +420,6 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises: if not waku[].deliveryService.isNil(): waku[].deliveryService.startDeliveryService() - ## Subscription Service - if not waku.node.wakuRelay.isNil() and not waku.deliveryService.isNil(): - let subService = waku.deliveryService.subscriptionService - - if waku.node.wakuAutoSharding.isSome(): - # Subscribe relay to all shards in autosharding. - let autoSharding = waku.node.wakuAutoSharding.get() - let clusterId = autoSharding.clusterId - let numShards = autoSharding.shardCountGenZero - - if numShards > 0: - var clusterPubsubTopics = newSeqOfCap[PubsubTopic](numShards) - - for i in 0 ..< numShards: - let shardObj = RelayShard(clusterId: clusterId, shardId: uint16(i)) - clusterPubsubTopics.add(PubsubTopic($shardObj)) - - subService.subscribeShard(clusterPubsubTopics).isOkOr: - return err("Failed to auto-subscribe Relay to cluster shards: " & error) - else: - # Fallback to configured shards when no autosharding. - if waku.conf.subscribeShards.len > 0: - let manualShards = waku.conf.subscribeShards.mapIt( - PubsubTopic( - $(RelayShard(clusterId: waku.conf.clusterId, shardId: uint16(it))) - ) - ) - - subService.subscribeShard(manualShards).isOkOr: - return err("Failed to subscribe Relay to manual shards: " & error) - ## Health Monitor waku[].healthMonitor.startHealthMonitor().isOkOr: return err("failed to start health monitor: " & $error) diff --git a/waku/node/delivery_service/delivery_service.nim b/waku/node/delivery_service/delivery_service.nim index 0cc951c58..f8b275fe7 100644 --- a/waku/node/delivery_service/delivery_service.nim +++ b/waku/node/delivery_service/delivery_service.nim @@ -38,9 +38,11 @@ proc new*( ) proc startDeliveryService*(self: DeliveryService) = - self.sendService.startSendService() + self.subscriptionService.startSubscriptionService() self.recvService.startRecvService() + self.sendService.startSendService() proc stopDeliveryService*(self: DeliveryService) {.async.} = await self.sendService.stopSendService() await self.recvService.stopRecvService() + await self.subscriptionService.stopSubscriptionService() diff --git a/waku/node/delivery_service/recv_service/recv_service.nim b/waku/node/delivery_service/recv_service/recv_service.nim index 708e19022..36a63e3c2 100644 --- a/waku/node/delivery_service/recv_service/recv_service.nim +++ b/waku/node/delivery_service/recv_service/recv_service.nim @@ -37,7 +37,7 @@ type RecvMessage = object type RecvService* = ref object of RootObj brokerCtx: BrokerContext node: WakuNode - internalMsgListener: MessageReceivedInternalEventListener + seenMsgListener: MessageSeenEventListener subscriptionService: SubscriptionService recentReceivedMsgs: seq[RecvMessage] @@ -176,9 +176,9 @@ proc startRecvService*(self: RecvService) = self.msgCheckerHandler = self.msgChecker() self.msgPrunerHandler = self.loopPruneOldMessages() - self.internalMsgListener = MessageReceivedInternalEvent.listen( + self.seenMsgListener = MessageSeenEvent.listen( self.brokerCtx, - proc(event: MessageReceivedInternalEvent) {.async: (raises: []).} = + proc(event: MessageSeenEvent) {.async: (raises: []).} = if not self.subscriptionService.isSubscribed( event.topic, event.message.contentTopic ): @@ -186,11 +186,11 @@ proc startRecvService*(self: RecvService) = self.processIncomingMessageOfInterest(event.topic, event.message), ).valueOr: - error "Failed to set MessageReceivedInternalEvent listener", error = error + error "Failed to set MessageSeenEvent listener", error = error quit(QuitFailure) proc stopRecvService*(self: RecvService) {.async.} = - MessageReceivedInternalEvent.dropListener(self.brokerCtx, self.internalMsgListener) + MessageSeenEvent.dropListener(self.brokerCtx, self.seenMsgListener) if not self.msgCheckerHandler.isNil(): await self.msgCheckerHandler.cancelAndWait() self.msgCheckerHandler = nil diff --git a/waku/node/delivery_service/subscription_service.nim b/waku/node/delivery_service/subscription_service.nim index 5e36fa69f..1270db60f 100644 --- a/waku/node/delivery_service/subscription_service.nim +++ b/waku/node/delivery_service/subscription_service.nim @@ -8,7 +8,6 @@ import waku_relay, common/broker/broker_context, events/delivery_events, - node/edge_driver, ] type SubscriptionService* = ref object of RootObj @@ -16,9 +15,7 @@ type SubscriptionService* = ref object of RootObj contentTopicSubs: Table[PubsubTopic, HashSet[ContentTopic]] ## Map of Shard to ContentTopic needed because e.g. WakuRelay is PubsubTopic only. ## A present key with an empty HashSet value means pubsubtopic already subscribed - ## (via subscribeShard()) but there's no specific content topic interest yet. - filterSubListener: OnFilterSubscribeEventListener - filterUnsubListener: OnFilterUnsubscribeEventListener + ## (via subscribePubsubTopics()) but there's no specific content topic interest yet. proc new*(T: typedesc[SubscriptionService], node: WakuNode): T = SubscriptionService( @@ -27,74 +24,86 @@ proc new*(T: typedesc[SubscriptionService], node: WakuNode): T = proc addContentTopicInterest( self: SubscriptionService, shard: PubsubTopic, topic: ContentTopic -) = - try: - if not self.contentTopicSubs.hasKey(shard): - self.contentTopicSubs[shard] = initHashSet[ContentTopic]() +): Result[void, string] = + if not self.contentTopicSubs.hasKey(shard): + self.contentTopicSubs[shard] = initHashSet[ContentTopic]() - if not self.contentTopicSubs[shard].contains(topic): - self.contentTopicSubs[shard].incl(topic) + self.contentTopicSubs.withValue(shard, cTopics): + if not cTopics[].contains(topic): + cTopics[].incl(topic) - # Always notify EdgeDriver if filter is mounted - if not isNil(self.node.wakuFilterClient): - self.node.edgeDriver.subscribe(shard, topic) - except KeyError: - discard - -proc removeContentTopicInterest( - self: SubscriptionService, shard: PubsubTopic, topic: ContentTopic -) = - try: - if self.contentTopicSubs.hasKey(shard) and - self.contentTopicSubs[shard].contains(topic): - self.contentTopicSubs[shard].excl(topic) - - # Only delete the shard tracking if we are not running a Relay. - # If Relay is mounted, we keep the empty HashSet to signal the relay shard sub. - if self.contentTopicSubs[shard].len == 0 and isNil(self.node.wakuRelay): - self.contentTopicSubs.del(shard) - - if not isNil(self.node.wakuFilterClient): - self.node.edgeDriver.unsubscribe(shard, topic) - except KeyError: - discard - -proc startProvidersAndListeners*(self: SubscriptionService): Result[void, string] = - self.filterSubListener = OnFilterSubscribeEvent.listen( - self.node.brokerCtx, - proc(event: OnFilterSubscribeEvent) {.async: (raises: []), gcsafe.} = - for cTopic in event.contentTopics: - self.addContentTopicInterest(event.pubsubTopic, cTopic), - ).valueOr: - return - err("SubscriptionService failed to listen to OnFilterSubscribeEvent: " & error) - - self.filterUnsubListener = OnFilterUnsubscribeEvent.listen( - self.node.brokerCtx, - proc(event: OnFilterUnsubscribeEvent) {.async: (raises: []), gcsafe.} = - for cTopic in event.contentTopics: - self.removeContentTopicInterest(event.pubsubTopic, cTopic), - ).valueOr: - return - err("SubscriptionService failed to listen to OnFilterUnsubscribeEvent: " & error) + # TODO: Call a "subscribe(shard, topic)" on filter client here, + # so the filter client can know that subscriptions changed. return ok() -proc stopProvidersAndListeners*(self: SubscriptionService) = - OnFilterSubscribeEvent.dropListener(self.node.brokerCtx, self.filterSubListener) - OnFilterUnsubscribeEvent.dropListener(self.node.brokerCtx, self.filterUnsubListener) +proc removeContentTopicInterest( + self: SubscriptionService, shard: PubsubTopic, topic: ContentTopic +): Result[void, string] = + self.contentTopicSubs.withValue(shard, cTopics): + if cTopics[].contains(topic): + cTopics[].excl(topic) + + if cTopics[].len == 0 and isNil(self.node.wakuRelay): + self.contentTopicSubs.del(shard) # We're done with cTopics here + + # TODO: Call a "unsubscribe(shard, topic)" on filter client here, + # so the filter client can know that subscriptions changed. + + return ok() + +proc subscribePubsubTopics*( + self: SubscriptionService, shards: seq[PubsubTopic] +): Result[void, string] = + if isNil(self.node.wakuRelay): + return err("subscribeShard requires a Relay") + + var errors: seq[string] = @[] + + for shard in shards: + if not self.contentTopicSubs.hasKey(shard): + self.node.subscribe((kind: PubsubSub, topic: shard), nil).isOkOr: + errors.add("shard " & shard & ": " & error) + continue + + self.contentTopicSubs[shard] = initHashSet[ContentTopic]() + + if errors.len > 0: + return err("subscribeShard errors: " & errors.join("; ")) + + return ok() + +proc startSubscriptionService*(self: SubscriptionService) = + if not isNil(self.node.wakuRelay): + if self.node.wakuAutoSharding.isSome(): + # Subscribe relay to all shards in autosharding. + let autoSharding = self.node.wakuAutoSharding.get() + let clusterId = autoSharding.clusterId + let numShards = autoSharding.shardCountGenZero + + if numShards > 0: + var clusterPubsubTopics = newSeqOfCap[PubsubTopic](numShards) + + for i in 0 ..< numShards: + let shardObj = RelayShard(clusterId: clusterId, shardId: uint16(i)) + clusterPubsubTopics.add(PubsubTopic($shardObj)) + + self.subscribePubsubTopics(clusterPubsubTopics).isOkOr: + error "Failed to auto-subscribe Relay to cluster shards: ", error = error + else: + # NOTE: We can't fallback to configured shards when no autosharding here since + # we don't currently have access to Waku.conf here. However, we don't support + # manual/static sharding at the MAPI level anyway so wiring that up now is not needed. + # When we no longer auto-subscribe to all shards in Core boot, we will probably + # scan the shard config due to fleet nodes; then shard conf will have to be reachable here. + # For non-fleet, interactive Core nodes (e.g. Desktop apps) this can't matter + # as much since shard subscriptions originate from subscription to content topics, but + # I guess even in that case subbing to some conf shards may make sense for some apps. + info "SubscriptionService has no AutoSharding for Relay, won't subscribe to shards by default." -proc start*(self: SubscriptionService) = - # TODO: re-enable for MAPI edge support. - #self.startProvidersAndListeners().isOkOr: - # error "Fatal error in SubscriptionService.startProvidersAndListeners(): ", - # error = error - # raise newException(ValueError, "SubscriptionService.start() failed: " & error) discard -proc stop*(self: SubscriptionService) = - # TODO: re-enable for MAPI edge support. - #self.stopProvidersAndListeners() +proc stopSubscriptionService*(self: SubscriptionService) {.async.} = discard proc getActiveSubscriptions*( @@ -139,47 +148,6 @@ proc isSubscribed*( except KeyError: discard -proc subscribeShard*( - self: SubscriptionService, shards: seq[PubsubTopic] -): Result[void, string] = - if isNil(self.node.wakuRelay): - return err("subscribeShard requires a Relay") - - var errors: seq[string] = @[] - - for shard in shards: - if not self.contentTopicSubs.hasKey(shard): - self.node.subscribe((kind: PubsubSub, topic: shard), nil).isOkOr: - errors.add("shard " & shard & ": " & error) - continue - - self.contentTopicSubs[shard] = initHashSet[ContentTopic]() - - if errors.len > 0: - return err("subscribeShard errors: " & errors.join("; ")) - - return ok() - -proc unsubscribeShard*( - self: SubscriptionService, shards: seq[PubsubTopic] -): Result[void, string] = - if isNil(self.node.wakuRelay): - return err("unsubscribeShard requires a Relay") - - var errors: seq[string] = @[] - - for shard in shards: - if self.contentTopicSubs.hasKey(shard): - self.node.unsubscribe((kind: PubsubUnsub, topic: shard)).isOkOr: - errors.add("shard " & shard & ": " & error) - - self.contentTopicSubs.del(shard) - - if errors.len > 0: - return err("unsubscribeShard errors: " & errors.join("; ")) - - return ok() - proc subscribe*(self: SubscriptionService, topic: ContentTopic): Result[void, string] = if isNil(self.node.wakuRelay) and isNil(self.node.wakuFilterClient): return err("SubscriptionService requires either Relay or Filter Client.") @@ -187,9 +155,9 @@ proc subscribe*(self: SubscriptionService, topic: ContentTopic): Result[void, st let shard = ?self.getShardForContentTopic(topic) if not isNil(self.node.wakuRelay) and not self.contentTopicSubs.hasKey(shard): - ?self.subscribeShard(@[shard]) + ?self.subscribePubsubTopics(@[shard]) - self.addContentTopicInterest(shard, topic) + ?self.addContentTopicInterest(shard, topic) return ok() @@ -202,6 +170,6 @@ proc unsubscribe*( let shard = ?self.getShardForContentTopic(topic) if self.isSubscribed(shard, topic): - self.removeContentTopicInterest(shard, topic) + ?self.removeContentTopicInterest(shard, topic) return ok() diff --git a/waku/node/edge_driver.nim b/waku/node/edge_driver.nim deleted file mode 100644 index 582cc6319..000000000 --- a/waku/node/edge_driver.nim +++ /dev/null @@ -1,3 +0,0 @@ -import ./edge_driver/edge_driver - -export edge_driver diff --git a/waku/node/edge_driver/edge_driver.nim b/waku/node/edge_driver/edge_driver.nim deleted file mode 100644 index 93e47350d..000000000 --- a/waku/node/edge_driver/edge_driver.nim +++ /dev/null @@ -1,28 +0,0 @@ -{.push raises: [].} - -import chronicles, waku/waku_core/topics - -# Plan: -# - drive the continuous fulfillment and healing of edge peering and topic subscriptions -# - offload the edgeXXX stuff from WakuNode into this and finish it - -type EdgeDriver* = ref object of RootObj # TODO: bg worker, ... - -proc new*(T: typedesc[EdgeDriver]): T = - return EdgeDriver() - -proc start*(self: EdgeDriver) = - # TODO - debug "TODO: EdgeDriver: start bg worker" - -proc stop*(self: EdgeDriver) = - # TODO - debug "TODO: EdgeDriver: stop bg worker" - -proc subscribe*(self: EdgeDriver, shard: PubsubTopic, topic: ContentTopic) = - # TODO: this is an event that can be used to drive an event-driven edge health checker - debug "TODO: EdgeDriver: got subscribe notification", shard = shard, topic = topic - -proc unsubscribe*(self: EdgeDriver, shard: PubsubTopic, topic: ContentTopic) = - # TODO: this is an event that can be used to drive an event-driven edge health checker - debug "TODO: EdgeDriver: got unsubscribe notification", shard = shard, topic = topic diff --git a/waku/node/kernel_api/relay.nim b/waku/node/kernel_api/relay.nim index 6a49c72ca..ec4d05ddd 100644 --- a/waku/node/kernel_api/relay.nim +++ b/waku/node/kernel_api/relay.nim @@ -98,7 +98,7 @@ proc registerRelayHandler( node.wakuStoreReconciliation.messageIngress(topic, msg) proc internalHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = - MessageReceivedInternalEvent.emit(node.brokerCtx, topic, msg) + MessageSeenEvent.emit(node.brokerCtx, topic, msg) let uniqueTopicHandler = proc( topic: PubsubTopic, msg: WakuMessage @@ -110,7 +110,7 @@ proc registerRelayHandler( await internalHandler(topic, msg) # Call the legacy (kernel API) app handler if it exists. - # Normally, hasKey is false and the MessageReceivedInternalEvent bus (new API) is used instead. + # Normally, hasKey is false and the MessageSeenEvent bus (new API) is used instead. # But we need to support legacy behavior (kernel API use), hence this. # NOTE: We can delete `legacyAppHandlers` if instead we refactor WakuRelay to support multiple # PubsubTopic handlers, since that's actually supported by libp2p PubSub (bigger refactor...) diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 32ce67255..0cef4cc5d 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -69,7 +69,6 @@ import waku/discovery/waku_kademlia, ./net_config, ./peer_manager, - ./edge_driver, ./health_monitor/health_status, ./health_monitor/topic_health @@ -115,7 +114,6 @@ type WakuNode* = ref object peerManager*: PeerManager switch*: Switch - edgeDriver*: EdgeDriver wakuRelay*: WakuRelay wakuArchive*: waku_archive.WakuArchive wakuLegacyArchive*: waku_archive_legacy.WakuArchive @@ -235,7 +233,6 @@ proc new*( let node = WakuNode( peerManager: peerManager, switch: switch, - edgeDriver: EdgeDriver.new(), rng: rng, brokerCtx: brokerCtx, enr: enr,