diff --git a/tests/api/test_all.nim b/tests/api/test_all.nim index 57f7f37f2..4617c8cdb 100644 --- a/tests/api/test_all.nim +++ b/tests/api/test_all.nim @@ -1,3 +1,8 @@ {.used.} -import ./test_entry_nodes, ./test_node_conf, ./test_api_send, ./test_api_health +import + ./test_entry_nodes, + ./test_node_conf, + ./test_api_send, + ./test_api_subscription, + ./test_api_health diff --git a/tests/api/test_api_subscription.nim b/tests/api/test_api_subscription.nim new file mode 100644 index 000000000..770dcd12d --- /dev/null +++ b/tests/api/test_api_subscription.nim @@ -0,0 +1,319 @@ +{.used.} + +import std/[strutils, net, options] +import chronos, testutils/unittests, stew/byteutils +import libp2p/[peerid, peerinfo, multiaddress, crypto/crypto] +import ../testlib/[common, wakucore, wakunode, testasync] + +import + waku, + waku/[ + waku_node, + waku_core, + common/broker/broker_context, + events/message_events, + waku_relay/protocol, + ] +import waku/api/api_conf, waku/factory/waku_conf + +# TODO: Edge testing (after MAPI edge support is completed) + +const TestTimeout = chronos.seconds(10) +const NegativeTestTimeout = chronos.seconds(2) +const DefaultShard = PubsubTopic("/waku/2/rs/1/0") + +type ReceiveEventListenerManager = ref object + brokerCtx: BrokerContext + receivedListener: MessageReceivedEventListener + receivedEvent: AsyncEvent + receivedMessages: seq[WakuMessage] + targetCount: int + +proc newReceiveEventListenerManager( + brokerCtx: BrokerContext, expectedCount: int = 1 +): ReceiveEventListenerManager = + let manager = ReceiveEventListenerManager( + brokerCtx: brokerCtx, receivedMessages: @[], targetCount: expectedCount + ) + manager.receivedEvent = newAsyncEvent() + + manager.receivedListener = MessageReceivedEvent + .listen( + brokerCtx, + proc(event: MessageReceivedEvent) {.async: (raises: []).} = + manager.receivedMessages.add(event.message) + + if manager.receivedMessages.len >= manager.targetCount: + manager.receivedEvent.fire() + , + ) + .expect("Failed to listen to MessageReceivedEvent") + + return manager + +proc teardown(manager: ReceiveEventListenerManager) = + MessageReceivedEvent.dropListener(manager.brokerCtx, manager.receivedListener) + +proc waitForEvents( + manager: ReceiveEventListenerManager, timeout: Duration +): Future[bool] {.async.} = + return await manager.receivedEvent.wait().withTimeout(timeout) + +proc createApiNodeConf(mode: WakuMode = WakuMode.Core): NodeConfig = + let netConf = NetworkingConfig(listenIpv4: "0.0.0.0", p2pTcpPort: 0, discv5UdpPort: 0) + result = NodeConfig.init( + mode = mode, + protocolsConfig = ProtocolsConfig.init( + entryNodes = @[], + clusterId = 1, + autoShardingConfig = AutoShardingConfig(numShardsInCluster: 1), + ), + networkingConfig = netConf, + p2pReliability = true, + ) + +proc setupSubscriberNode(conf: NodeConfig): Future[Waku] {.async.} = + var node: Waku + lockNewGlobalBrokerContext: + node = (await createNode(conf)).expect("Failed to create subscriber node") + (await startWaku(addr node)).expect("Failed to start subscriber node") + return node + +proc waitForMesh*(node: WakuNode, shard: PubsubTopic) {.async.} = + for _ in 0 ..< 50: + if node.wakuRelay.getNumPeersInMesh(shard).valueOr(0) > 0: + return + await sleepAsync(100.milliseconds) + raise newException(ValueError, "GossipSub Mesh failed to stabilize") + +proc publishWhenMeshReady( + publisher: WakuNode, + pubsubTopic: PubsubTopic, + contentTopic: ContentTopic, + payload: seq[byte], +): Future[Result[int, string]] {.async.} = + await waitForMesh(publisher, pubsubTopic) + + let msg = WakuMessage( + payload: payload, contentTopic: contentTopic, version: 0, timestamp: now() + ) + return await publisher.publish(some(pubsubTopic), msg) + +suite "Messaging API, SubscriptionService": + var + publisherNode {.threadvar.}: WakuNode + publisherPeerInfo {.threadvar.}: RemotePeerInfo + publisherPeerId {.threadvar.}: PeerId + + subscriberNode {.threadvar.}: Waku + + asyncSetup: + lockNewGlobalBrokerContext: + publisherNode = + newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + + publisherNode.mountMetadata(1, @[0'u16]).expect("Failed to mount metadata") + (await publisherNode.mountRelay()).expect("Failed to mount relay") + await publisherNode.mountLibp2pPing() + await publisherNode.start() + + publisherPeerInfo = publisherNode.peerInfo.toRemotePeerInfo() + publisherPeerId = publisherNode.peerInfo.peerId + + proc dummyHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = + discard + + publisherNode.subscribe((kind: PubsubSub, topic: DefaultShard), dummyHandler).expect( + "Failed to subscribe publisherNode" + ) + + asyncTeardown: + if not subscriberNode.isNil(): + (await subscriberNode.stop()).expect("Failed to stop subscriber node") + subscriberNode = nil + + if not publisherNode.isNil(): + await publisherNode.stop() + publisherNode = nil + + asyncTest "Subscription API, relay node auto subscribe and receive message": + subscriberNode = await setupSubscriberNode(createApiNodeConf(WakuMode.Core)) + await subscriberNode.node.connectToNodes(@[publisherPeerInfo]) + let testTopic = ContentTopic("/waku/2/test-content/proto") + + (await subscriberNode.subscribe(testTopic)).expect( + "subscriberNode failed to subscribe" + ) + + let eventManager = newReceiveEventListenerManager(subscriberNode.brokerCtx, 1) + defer: + eventManager.teardown() + + discard ( + await publishWhenMeshReady( + publisherNode, DefaultShard, testTopic, "Hello, world!".toBytes() + ) + ).expect("Publish failed") + + require await eventManager.waitForEvents(TestTimeout) + require eventManager.receivedMessages.len == 1 + check eventManager.receivedMessages[0].contentTopic == testTopic + + asyncTest "Subscription API, relay node ignores unsubscribed content topics on same shard": + subscriberNode = await setupSubscriberNode(createApiNodeConf(WakuMode.Core)) + await subscriberNode.node.connectToNodes(@[publisherPeerInfo]) + + let subbedTopic = ContentTopic("/waku/2/subbed-topic/proto") + let ignoredTopic = ContentTopic("/waku/2/ignored-topic/proto") + (await subscriberNode.subscribe(subbedTopic)).expect("failed to subscribe") + + let eventManager = newReceiveEventListenerManager(subscriberNode.brokerCtx, 1) + defer: + eventManager.teardown() + + discard ( + await publishWhenMeshReady( + publisherNode, DefaultShard, ignoredTopic, "Ghost Msg".toBytes() + ) + ).expect("Publish failed") + + check not await eventManager.waitForEvents(NegativeTestTimeout) + check eventManager.receivedMessages.len == 0 + + asyncTest "Subscription API, relay node unsubscribe stops message receipt": + subscriberNode = await setupSubscriberNode(createApiNodeConf(WakuMode.Core)) + await subscriberNode.node.connectToNodes(@[publisherPeerInfo]) + let testTopic = ContentTopic("/waku/2/unsub-test/proto") + + (await subscriberNode.subscribe(testTopic)).expect("failed to subscribe") + subscriberNode.unsubscribe(testTopic).expect("failed to unsubscribe") + + let eventManager = newReceiveEventListenerManager(subscriberNode.brokerCtx, 1) + defer: + eventManager.teardown() + + discard ( + await publishWhenMeshReady( + publisherNode, DefaultShard, testTopic, "Should be dropped".toBytes() + ) + ).expect("Publish failed") + + check not await eventManager.waitForEvents(NegativeTestTimeout) + check eventManager.receivedMessages.len == 0 + + asyncTest "Subscription API, overlapping topics on same shard maintain correct isolation": + subscriberNode = await setupSubscriberNode(createApiNodeConf(WakuMode.Core)) + await subscriberNode.node.connectToNodes(@[publisherPeerInfo]) + + let topicA = ContentTopic("/waku/2/topic-a/proto") + let topicB = ContentTopic("/waku/2/topic-b/proto") + (await subscriberNode.subscribe(topicA)).expect("failed to sub A") + (await subscriberNode.subscribe(topicB)).expect("failed to sub B") + + let eventManager = newReceiveEventListenerManager(subscriberNode.brokerCtx, 1) + defer: + eventManager.teardown() + + await waitForMesh(publisherNode, DefaultShard) + + subscriberNode.unsubscribe(topicA).expect("failed to unsub A") + + discard ( + await publisherNode.publish( + some(DefaultShard), + WakuMessage( + payload: "Dropped Message".toBytes(), + contentTopic: topicA, + version: 0, + timestamp: now(), + ), + ) + ).expect("Publish A failed") + + discard ( + await publisherNode.publish( + some(DefaultShard), + WakuMessage( + payload: "Kept Msg".toBytes(), + contentTopic: topicB, + version: 0, + timestamp: now(), + ), + ) + ).expect("Publish B failed") + + require await eventManager.waitForEvents(TestTimeout) + require eventManager.receivedMessages.len == 1 + check eventManager.receivedMessages[0].contentTopic == topicB + + asyncTest "Subscription API, redundant subs tolerated and subs are removed": + subscriberNode = await setupSubscriberNode(createApiNodeConf(WakuMode.Core)) + await subscriberNode.node.connectToNodes(@[publisherPeerInfo]) + let glitchTopic = ContentTopic("/waku/2/glitch/proto") + + (await subscriberNode.subscribe(glitchTopic)).expect("failed to sub") + (await subscriberNode.subscribe(glitchTopic)).expect("failed to double sub") + subscriberNode.unsubscribe(glitchTopic).expect("failed to unsub") + + let eventManager = newReceiveEventListenerManager(subscriberNode.brokerCtx, 1) + defer: + eventManager.teardown() + + discard ( + await publishWhenMeshReady( + publisherNode, DefaultShard, glitchTopic, "Ghost Msg".toBytes() + ) + ).expect("Publish failed") + + check not await eventManager.waitForEvents(NegativeTestTimeout) + check eventManager.receivedMessages.len == 0 + + asyncTest "Subscription API, resubscribe to an unsubscribed topic": + subscriberNode = await setupSubscriberNode(createApiNodeConf(WakuMode.Core)) + await subscriberNode.node.connectToNodes(@[publisherPeerInfo]) + let testTopic = ContentTopic("/waku/2/resub-test/proto") + + # Subscribe + (await subscriberNode.subscribe(testTopic)).expect("Initial sub failed") + + var eventManager = newReceiveEventListenerManager(subscriberNode.brokerCtx, 1) + discard ( + await publishWhenMeshReady( + publisherNode, DefaultShard, testTopic, "Msg 1".toBytes() + ) + ).expect("Pub 1 failed") + + require await eventManager.waitForEvents(TestTimeout) + eventManager.teardown() + + # Unsubscribe and verify teardown + subscriberNode.unsubscribe(testTopic).expect("Unsub failed") + eventManager = newReceiveEventListenerManager(subscriberNode.brokerCtx, 1) + + discard ( + await publisherNode.publish( + some(DefaultShard), + WakuMessage( + payload: "Ghost".toBytes(), + contentTopic: testTopic, + version: 0, + timestamp: now(), + ), + ) + ).expect("Ghost pub failed") + + check not await eventManager.waitForEvents(NegativeTestTimeout) + eventManager.teardown() + + # Resubscribe + (await subscriberNode.subscribe(testTopic)).expect("Resub failed") + eventManager = newReceiveEventListenerManager(subscriberNode.brokerCtx, 1) + + discard ( + await publishWhenMeshReady( + publisherNode, DefaultShard, testTopic, "Msg 2".toBytes() + ) + ).expect("Pub 2 failed") + + require await eventManager.waitForEvents(TestTimeout) + check eventManager.receivedMessages[0].payload == "Msg 2".toBytes() diff --git a/tests/waku_store/test_wakunode_store.nim b/tests/waku_store/test_wakunode_store.nim index e30854906..9239435af 100644 --- a/tests/waku_store/test_wakunode_store.nim +++ b/tests/waku_store/test_wakunode_store.nim @@ -374,6 +374,12 @@ procSuite "WakuNode - Store": waitFor allFutures(client.stop(), server.stop()) test "Store protocol queries overrun request rate limitation": + when defined(macosx): + # on macos CI, this test is resulting a code 200 (OK) instead of a 429 error + # means the runner is somehow too slow to cause a request limit failure + skip() + return + ## Setup let serverKey = generateSecp256k1Key() diff --git a/waku/api/api.nim b/waku/api/api.nim index 3493513a3..f48c39c35 100644 --- a/waku/api/api.nim +++ b/waku/api/api.nim @@ -48,6 +48,15 @@ proc send*( ): Future[Result[RequestId, string]] {.async.} = ?checkApiAvailability(w) + let isSubbed = w.deliveryService.subscriptionService + .isSubscribed(envelope.contentTopic) + .valueOr(false) + if not isSubbed: + info "Auto-subscribing to topic on send", contentTopic = envelope.contentTopic + w.deliveryService.subscriptionService.subscribe(envelope.contentTopic).isOkOr: + warn "Failed to auto-subscribe", error = error + return err("Failed to auto-subscribe before sending: " & error) + let requestId = RequestId.new(w.rng) let deliveryTask = DeliveryTask.new(requestId, envelope, w.brokerCtx).valueOr: diff --git a/waku/events/message_events.nim b/waku/events/message_events.nim index cf3dac9b7..677a4a433 100644 --- a/waku/events/message_events.nim +++ b/waku/events/message_events.nim @@ -1,6 +1,4 @@ -import waku/common/broker/event_broker -import waku/api/types -import waku/waku_core/message +import waku/[api/types, waku_core/message, waku_core/topics, common/broker/event_broker] export types @@ -28,3 +26,9 @@ EventBroker: type MessageReceivedEvent* = object messageHash*: string message*: WakuMessage + +EventBroker: + # Internal event emitted when a message arrives from the network via any protocol + type MessageSeenEvent* = object + topic*: PubsubTopic + message*: WakuMessage diff --git a/waku/factory/waku.nim b/waku/factory/waku.nim index 9803a53a9..c46f92fa8 100644 --- a/waku/factory/waku.nim +++ b/waku/factory/waku.nim @@ -35,6 +35,7 @@ import node/health_monitor, node/waku_metrics, node/delivery_service/delivery_service, + node/delivery_service/subscription_service, rest_api/message_cache, rest_api/endpoint/server, rest_api/endpoint/builder as rest_server_builder, @@ -453,7 +454,7 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises: ).isOkOr: error "Failed to set RequestProtocolHealth provider", error = error - ## Setup RequestHealthReport provider (The lost child) + ## Setup RequestHealthReport provider RequestHealthReport.setProvider( globalBrokerContext(), @@ -514,6 +515,10 @@ proc stop*(waku: Waku): Future[Result[void, string]] {.async: (raises: []).} = if not waku.wakuDiscv5.isNil(): await waku.wakuDiscv5.stop() + if not waku.deliveryService.isNil(): + await waku.deliveryService.stopDeliveryService() + waku.deliveryService = nil + if not waku.node.isNil(): await waku.node.stop() diff --git a/waku/node/delivery_service/delivery_service.nim b/waku/node/delivery_service/delivery_service.nim index 8106cba9f..f8b275fe7 100644 --- a/waku/node/delivery_service/delivery_service.nim +++ b/waku/node/delivery_service/delivery_service.nim @@ -38,9 +38,11 @@ proc new*( ) proc startDeliveryService*(self: DeliveryService) = - self.sendService.startSendService() + self.subscriptionService.startSubscriptionService() self.recvService.startRecvService() + self.sendService.startSendService() proc stopDeliveryService*(self: DeliveryService) {.async.} = - self.sendService.stopSendService() + await self.sendService.stopSendService() await self.recvService.stopRecvService() + await self.subscriptionService.stopSubscriptionService() diff --git a/waku/node/delivery_service/recv_service/recv_service.nim b/waku/node/delivery_service/recv_service/recv_service.nim index 12780033a..6706d5cb1 100644 --- a/waku/node/delivery_service/recv_service/recv_service.nim +++ b/waku/node/delivery_service/recv_service/recv_service.nim @@ -2,7 +2,7 @@ ## receive and is backed by store-v3 requests to get an additional degree of certainty ## -import std/[tables, sequtils, options] +import std/[tables, sequtils, options, sets] import chronos, chronicles, libp2p/utility import ../[subscription_service] import @@ -13,6 +13,7 @@ import waku_filter_v2/client, waku_core/topics, events/delivery_events, + events/message_events, waku_node, common/broker/broker_context, ] @@ -35,13 +36,8 @@ type RecvMessage = object type RecvService* = ref object of RootObj brokerCtx: BrokerContext - topicsInterest: Table[PubsubTopic, seq[ContentTopic]] - ## Tracks message verification requests and when was the last time a - ## pubsub topic was verified for missing messages - ## The key contains pubsub-topics node: WakuNode - onSubscribeListener: OnFilterSubscribeEventListener - onUnsubscribeListener: OnFilterUnsubscribeEventListener + seenMsgListener: MessageSeenEventListener subscriptionService: SubscriptionService recentReceivedMsgs: seq[RecvMessage] @@ -95,20 +91,20 @@ proc msgChecker(self: RecvService) {.async.} = self.endTimeToCheck = getNowInNanosecondTime() var msgHashesInStore = newSeq[WakuMessageHash](0) - for pubsubTopic, cTopics in self.topicsInterest.pairs: + for sub in self.subscriptionService.getActiveSubscriptions(): let storeResp: StoreQueryResponse = ( await self.node.wakuStoreClient.queryToAny( StoreQueryRequest( includeData: false, - pubsubTopic: some(PubsubTopic(pubsubTopic)), - contentTopics: cTopics, + pubsubTopic: some(PubsubTopic(sub.pubsubTopic)), + contentTopics: sub.contentTopics, startTime: some(self.startTimeToCheck - DelayExtra.nanos), endTime: some(self.endTimeToCheck + DelayExtra.nanos), ) ) ).valueOr: error "msgChecker failed to get remote msgHashes", - pubsubTopic, cTopics, error = $error + pubsubTopic = sub.pubsubTopic, cTopics = sub.contentTopics, error = $error continue msgHashesInStore.add(storeResp.messages.mapIt(it.messageHash)) @@ -133,29 +129,18 @@ proc msgChecker(self: RecvService) {.async.} = ## update next check times self.startTimeToCheck = self.endTimeToCheck -proc onSubscribe( - self: RecvService, pubsubTopic: string, contentTopics: seq[string] -) {.gcsafe, raises: [].} = - info "onSubscribe", pubsubTopic, contentTopics - self.topicsInterest.withValue(pubsubTopic, contentTopicsOfInterest): - contentTopicsOfInterest[].add(contentTopics) - do: - self.topicsInterest[pubsubTopic] = contentTopics +proc processIncomingMessageOfInterest( + self: RecvService, pubsubTopic: string, message: WakuMessage +) = + ## Resolve an incoming network message that was already filtered by topic. + ## Deduplicate (by hash), store (saves in recently-seen messages) and emit + ## the MAPI MessageReceivedEvent for every unique incoming message. -proc onUnsubscribe( - self: RecvService, pubsubTopic: string, contentTopics: seq[string] -) {.gcsafe, raises: [].} = - info "onUnsubscribe", pubsubTopic, contentTopics - - self.topicsInterest.withValue(pubsubTopic, contentTopicsOfInterest): - let remainingCTopics = - contentTopicsOfInterest[].filterIt(not contentTopics.contains(it)) - contentTopicsOfInterest[] = remainingCTopics - - if remainingCTopics.len == 0: - self.topicsInterest.del(pubsubTopic) - do: - error "onUnsubscribe unsubscribing from wrong topic", pubsubTopic, contentTopics + let msgHash = computeMessageHash(pubsubTopic, message) + if not self.recentReceivedMsgs.anyIt(it.msgHash == msgHash): + let rxMsg = RecvMessage(msgHash: msgHash, rxTime: message.timestamp) + self.recentReceivedMsgs.add(rxMsg) + MessageReceivedEvent.emit(self.brokerCtx, msgHash.to0xHex(), message) proc new*(T: typedesc[RecvService], node: WakuNode, s: SubscriptionService): T = ## The storeClient will help to acquire any possible missed messages @@ -166,21 +151,12 @@ proc new*(T: typedesc[RecvService], node: WakuNode, s: SubscriptionService): T = startTimeToCheck: now, brokerCtx: node.brokerCtx, subscriptionService: s, - topicsInterest: initTable[PubsubTopic, seq[ContentTopic]](), recentReceivedMsgs: @[], ) - if not node.wakuFilterClient.isNil(): - let filterPushHandler = proc( - pubsubTopic: PubsubTopic, message: WakuMessage - ) {.async, closure.} = - ## Captures all the messages received through filter - - let msgHash = computeMessageHash(pubSubTopic, message) - let rxMsg = RecvMessage(msgHash: msgHash, rxTime: message.timestamp) - recvService.recentReceivedMsgs.add(rxMsg) - - node.wakuFilterClient.registerPushHandler(filterPushHandler) + # TODO: For MAPI Edge support, either call node.wakuFilterClient.registerPushHandler + # so that the RecvService listens to incoming filter messages, + # or have the filter client emit MessageSeenEvent. return recvService @@ -194,26 +170,24 @@ proc startRecvService*(self: RecvService) = self.msgCheckerHandler = self.msgChecker() self.msgPrunerHandler = self.loopPruneOldMessages() - self.onSubscribeListener = OnFilterSubscribeEvent.listen( + self.seenMsgListener = MessageSeenEvent.listen( self.brokerCtx, - proc(subsEv: OnFilterSubscribeEvent) {.async: (raises: []).} = - self.onSubscribe(subsEv.pubsubTopic, subsEv.contentTopics), - ).valueOr: - error "Failed to set OnFilterSubscribeEvent listener", error = error - quit(QuitFailure) + proc(event: MessageSeenEvent) {.async: (raises: []).} = + if not self.subscriptionService.isSubscribed( + event.topic, event.message.contentTopic + ): + return - self.onUnsubscribeListener = OnFilterUnsubscribeEvent.listen( - self.brokerCtx, - proc(subsEv: OnFilterUnsubscribeEvent) {.async: (raises: []).} = - self.onUnsubscribe(subsEv.pubsubTopic, subsEv.contentTopics), + self.processIncomingMessageOfInterest(event.topic, event.message), ).valueOr: - error "Failed to set OnFilterUnsubscribeEvent listener", error = error + error "Failed to set MessageSeenEvent listener", error = error quit(QuitFailure) proc stopRecvService*(self: RecvService) {.async.} = - OnFilterSubscribeEvent.dropListener(self.brokerCtx, self.onSubscribeListener) - OnFilterUnsubscribeEvent.dropListener(self.brokerCtx, self.onUnsubscribeListener) + MessageSeenEvent.dropListener(self.brokerCtx, self.seenMsgListener) if not self.msgCheckerHandler.isNil(): await self.msgCheckerHandler.cancelAndWait() + self.msgCheckerHandler = nil if not self.msgPrunerHandler.isNil(): await self.msgPrunerHandler.cancelAndWait() + self.msgPrunerHandler = nil diff --git a/waku/node/delivery_service/send_service/send_service.nim b/waku/node/delivery_service/send_service/send_service.nim index a41d07786..4c917c7e4 100644 --- a/waku/node/delivery_service/send_service/send_service.nim +++ b/waku/node/delivery_service/send_service/send_service.nim @@ -250,9 +250,9 @@ proc serviceLoop(self: SendService) {.async.} = proc startSendService*(self: SendService) = self.serviceLoopHandle = self.serviceLoop() -proc stopSendService*(self: SendService) = +proc stopSendService*(self: SendService) {.async.} = if not self.serviceLoopHandle.isNil(): - discard self.serviceLoopHandle.cancelAndWait() + await self.serviceLoopHandle.cancelAndWait() proc send*(self: SendService, task: DeliveryTask) {.async.} = assert(not task.isNil(), "task for send must not be nil") diff --git a/waku/node/delivery_service/subscription_service.nim b/waku/node/delivery_service/subscription_service.nim index 78763161b..1270db60f 100644 --- a/waku/node/delivery_service/subscription_service.nim +++ b/waku/node/delivery_service/subscription_service.nim @@ -1,64 +1,175 @@ -import chronos, chronicles +import std/[sets, tables, options, strutils], chronos, chronicles, results import waku/[ waku_core, waku_core/topics, - events/message_events, + waku_core/topics/sharding, waku_node, + waku_relay, common/broker/broker_context, + events/delivery_events, ] type SubscriptionService* = ref object of RootObj - brokerCtx: BrokerContext node: WakuNode + contentTopicSubs: Table[PubsubTopic, HashSet[ContentTopic]] + ## Map of Shard to ContentTopic needed because e.g. WakuRelay is PubsubTopic only. + ## A present key with an empty HashSet value means pubsubtopic already subscribed + ## (via subscribePubsubTopics()) but there's no specific content topic interest yet. proc new*(T: typedesc[SubscriptionService], node: WakuNode): T = - ## The storeClient will help to acquire any possible missed messages + SubscriptionService( + node: node, contentTopicSubs: initTable[PubsubTopic, HashSet[ContentTopic]]() + ) - return SubscriptionService(brokerCtx: node.brokerCtx, node: node) +proc addContentTopicInterest( + self: SubscriptionService, shard: PubsubTopic, topic: ContentTopic +): Result[void, string] = + if not self.contentTopicSubs.hasKey(shard): + self.contentTopicSubs[shard] = initHashSet[ContentTopic]() + + self.contentTopicSubs.withValue(shard, cTopics): + if not cTopics[].contains(topic): + cTopics[].incl(topic) + + # TODO: Call a "subscribe(shard, topic)" on filter client here, + # so the filter client can know that subscriptions changed. + + return ok() + +proc removeContentTopicInterest( + self: SubscriptionService, shard: PubsubTopic, topic: ContentTopic +): Result[void, string] = + self.contentTopicSubs.withValue(shard, cTopics): + if cTopics[].contains(topic): + cTopics[].excl(topic) + + if cTopics[].len == 0 and isNil(self.node.wakuRelay): + self.contentTopicSubs.del(shard) # We're done with cTopics here + + # TODO: Call a "unsubscribe(shard, topic)" on filter client here, + # so the filter client can know that subscriptions changed. + + return ok() + +proc subscribePubsubTopics*( + self: SubscriptionService, shards: seq[PubsubTopic] +): Result[void, string] = + if isNil(self.node.wakuRelay): + return err("subscribeShard requires a Relay") + + var errors: seq[string] = @[] + + for shard in shards: + if not self.contentTopicSubs.hasKey(shard): + self.node.subscribe((kind: PubsubSub, topic: shard), nil).isOkOr: + errors.add("shard " & shard & ": " & error) + continue + + self.contentTopicSubs[shard] = initHashSet[ContentTopic]() + + if errors.len > 0: + return err("subscribeShard errors: " & errors.join("; ")) + + return ok() + +proc startSubscriptionService*(self: SubscriptionService) = + if not isNil(self.node.wakuRelay): + if self.node.wakuAutoSharding.isSome(): + # Subscribe relay to all shards in autosharding. + let autoSharding = self.node.wakuAutoSharding.get() + let clusterId = autoSharding.clusterId + let numShards = autoSharding.shardCountGenZero + + if numShards > 0: + var clusterPubsubTopics = newSeqOfCap[PubsubTopic](numShards) + + for i in 0 ..< numShards: + let shardObj = RelayShard(clusterId: clusterId, shardId: uint16(i)) + clusterPubsubTopics.add(PubsubTopic($shardObj)) + + self.subscribePubsubTopics(clusterPubsubTopics).isOkOr: + error "Failed to auto-subscribe Relay to cluster shards: ", error = error + else: + # NOTE: We can't fallback to configured shards when no autosharding here since + # we don't currently have access to Waku.conf here. However, we don't support + # manual/static sharding at the MAPI level anyway so wiring that up now is not needed. + # When we no longer auto-subscribe to all shards in Core boot, we will probably + # scan the shard config due to fleet nodes; then shard conf will have to be reachable here. + # For non-fleet, interactive Core nodes (e.g. Desktop apps) this can't matter + # as much since shard subscriptions originate from subscription to content topics, but + # I guess even in that case subbing to some conf shards may make sense for some apps. + info "SubscriptionService has no AutoSharding for Relay, won't subscribe to shards by default." + + discard + +proc stopSubscriptionService*(self: SubscriptionService) {.async.} = + discard + +proc getActiveSubscriptions*( + self: SubscriptionService +): seq[tuple[pubsubTopic: string, contentTopics: seq[ContentTopic]]] = + var activeSubs: seq[tuple[pubsubTopic: string, contentTopics: seq[ContentTopic]]] = + @[] + + for pubsub, cTopicSet in self.contentTopicSubs.pairs: + if cTopicSet.len > 0: + var cTopicSeq = newSeqOfCap[ContentTopic](cTopicSet.len) + for t in cTopicSet: + cTopicSeq.add(t) + activeSubs.add((pubsub, cTopicSeq)) + + return activeSubs + +proc getShardForContentTopic( + self: SubscriptionService, topic: ContentTopic +): Result[PubsubTopic, string] = + if self.node.wakuAutoSharding.isSome(): + let shardObj = ?self.node.wakuAutoSharding.get().getShard(topic) + return ok($shardObj) + + return err("SubscriptionService requires AutoSharding") proc isSubscribed*( self: SubscriptionService, topic: ContentTopic ): Result[bool, string] = - var isSubscribed = false - if self.node.wakuRelay.isNil() == false: - return self.node.isSubscribed((kind: ContentSub, topic: topic)) + let shard = ?self.getShardForContentTopic(topic) + return ok( + self.contentTopicSubs.hasKey(shard) and self.contentTopicSubs[shard].contains(topic) + ) - # TODO: Add support for edge mode with Filter subscription management - return ok(isSubscribed) - -#TODO: later PR may consider to refactor or place this function elsewhere -# The only important part is that it emits MessageReceivedEvent -proc getReceiveHandler(self: SubscriptionService): WakuRelayHandler = - return proc(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} = - let msgHash = computeMessageHash(topic, msg).to0xHex() - info "API received message", - pubsubTopic = topic, contentTopic = msg.contentTopic, msgHash = msgHash - - MessageReceivedEvent.emit(self.brokerCtx, msgHash, msg) +proc isSubscribed*( + self: SubscriptionService, shard: PubsubTopic, contentTopic: ContentTopic +): bool {.raises: [].} = + try: + return + self.contentTopicSubs.hasKey(shard) and + self.contentTopicSubs[shard].contains(contentTopic) + except KeyError: + discard proc subscribe*(self: SubscriptionService, topic: ContentTopic): Result[void, string] = - let isSubscribed = self.isSubscribed(topic).valueOr: - error "Failed to check subscription status: ", error = error - return err("Failed to check subscription status: " & error) + if isNil(self.node.wakuRelay) and isNil(self.node.wakuFilterClient): + return err("SubscriptionService requires either Relay or Filter Client.") - if isSubscribed == false: - if self.node.wakuRelay.isNil() == false: - self.node.subscribe((kind: ContentSub, topic: topic), self.getReceiveHandler()).isOkOr: - error "Failed to subscribe: ", error = error - return err("Failed to subscribe: " & error) + let shard = ?self.getShardForContentTopic(topic) - # TODO: Add support for edge mode with Filter subscription management + if not isNil(self.node.wakuRelay) and not self.contentTopicSubs.hasKey(shard): + ?self.subscribePubsubTopics(@[shard]) + + ?self.addContentTopicInterest(shard, topic) return ok() proc unsubscribe*( self: SubscriptionService, topic: ContentTopic ): Result[void, string] = - if self.node.wakuRelay.isNil() == false: - self.node.unsubscribe((kind: ContentSub, topic: topic)).isOkOr: - error "Failed to unsubscribe: ", error = error - return err("Failed to unsubscribe: " & error) + if isNil(self.node.wakuRelay) and isNil(self.node.wakuFilterClient): + return err("SubscriptionService requires either Relay or Filter Client.") + + let shard = ?self.getShardForContentTopic(topic) + + if self.isSubscribed(shard, topic): + ?self.removeContentTopicInterest(shard, topic) - # TODO: Add support for edge mode with Filter subscription management return ok() diff --git a/waku/node/kernel_api/relay.nim b/waku/node/kernel_api/relay.nim index a0a128449..ec4d05ddd 100644 --- a/waku/node/kernel_api/relay.nim +++ b/waku/node/kernel_api/relay.nim @@ -19,16 +19,20 @@ import libp2p/utility import - ../waku_node, - ../../waku_relay, - ../../waku_core, - ../../waku_core/topics/sharding, - ../../waku_filter_v2, - ../../waku_archive_legacy, - ../../waku_archive, - ../../waku_store_sync, - ../peer_manager, - ../../waku_rln_relay + waku/[ + waku_relay, + waku_core, + waku_core/topics/sharding, + waku_filter_v2, + waku_archive_legacy, + waku_archive, + waku_store_sync, + waku_rln_relay, + node/waku_node, + node/peer_manager, + common/broker/broker_context, + events/message_events, + ] export waku_relay.WakuRelayHandler @@ -44,14 +48,25 @@ logScope: ## Waku relay proc registerRelayHandler( - node: WakuNode, topic: PubsubTopic, appHandler: WakuRelayHandler -) = + node: WakuNode, topic: PubsubTopic, appHandler: WakuRelayHandler = nil +): bool = ## Registers the only handler for the given topic. ## Notice that this handler internally calls other handlers, such as filter, ## archive, etc, plus the handler provided by the application. + ## Returns `true` if a mesh subscription was created or `false` if the relay + ## was already subscribed to the topic. - if node.wakuRelay.isSubscribed(topic): - return + let alreadySubscribed = node.wakuRelay.isSubscribed(topic) + + if not appHandler.isNil(): + if not alreadySubscribed or not node.legacyAppHandlers.hasKey(topic): + node.legacyAppHandlers[topic] = appHandler + else: + debug "Legacy appHandler already exists for active PubsubTopic, ignoring new handler", + topic = topic + + if alreadySubscribed: + return false proc traceHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = let msgSizeKB = msg.payload.len / 1000 @@ -82,6 +97,9 @@ proc registerRelayHandler( node.wakuStoreReconciliation.messageIngress(topic, msg) + proc internalHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = + MessageSeenEvent.emit(node.brokerCtx, topic, msg) + let uniqueTopicHandler = proc( topic: PubsubTopic, msg: WakuMessage ): Future[void] {.async, gcsafe.} = @@ -89,7 +107,15 @@ proc registerRelayHandler( await filterHandler(topic, msg) await archiveHandler(topic, msg) await syncHandler(topic, msg) - await appHandler(topic, msg) + await internalHandler(topic, msg) + + # Call the legacy (kernel API) app handler if it exists. + # Normally, hasKey is false and the MessageSeenEvent bus (new API) is used instead. + # But we need to support legacy behavior (kernel API use), hence this. + # NOTE: We can delete `legacyAppHandlers` if instead we refactor WakuRelay to support multiple + # PubsubTopic handlers, since that's actually supported by libp2p PubSub (bigger refactor...) + if node.legacyAppHandlers.hasKey(topic) and not node.legacyAppHandlers[topic].isNil(): + await node.legacyAppHandlers[topic](topic, msg) node.wakuRelay.subscribe(topic, uniqueTopicHandler) @@ -115,8 +141,11 @@ proc subscribe*( ): Result[void, string] = ## Subscribes to a PubSub or Content topic. Triggers handler when receiving messages on ## this topic. WakuRelayHandler is a method that takes a topic and a Waku message. + ## If `handler` is nil, the API call will subscribe to the topic in the relay mesh + ## but no app handler will be registered at this time (it can be registered later with + ## another call to this proc for the same gossipsub topic). - if node.wakuRelay.isNil(): + if isNil(node.wakuRelay): error "Invalid API call to `subscribe`. WakuRelay not mounted." return err("Invalid API call to `subscribe`. WakuRelay not mounted.") @@ -124,13 +153,15 @@ proc subscribe*( error "Failed to decode subscription event", error = error return err("Failed to decode subscription event: " & error) - if node.wakuRelay.isSubscribed(pubsubTopic): - warn "No-effect API call to subscribe. Already subscribed to topic", pubsubTopic - return ok() - - info "subscribe", pubsubTopic, contentTopicOp - node.registerRelayHandler(pubsubTopic, handler) - node.topicSubscriptionQueue.emit((kind: PubsubSub, topic: pubsubTopic)) + if node.registerRelayHandler(pubsubTopic, handler): + info "subscribe", pubsubTopic, contentTopicOp + node.topicSubscriptionQueue.emit((kind: PubsubSub, topic: pubsubTopic)) + else: + if isNil(handler): + warn "No-effect API call to subscribe. Already subscribed to topic", pubsubTopic + else: + info "subscribe (was already subscribed in the mesh; appHandler set)", + pubsubTopic = pubsubTopic return ok() @@ -138,8 +169,10 @@ proc unsubscribe*( node: WakuNode, subscription: SubscriptionEvent ): Result[void, string] = ## Unsubscribes from a specific PubSub or Content topic. + ## This will both unsubscribe from the relay mesh and remove the app handler, if any. + ## NOTE: This works because using MAPI and Kernel API at the same time is unsupported. - if node.wakuRelay.isNil(): + if isNil(node.wakuRelay): error "Invalid API call to `unsubscribe`. WakuRelay not mounted." return err("Invalid API call to `unsubscribe`. WakuRelay not mounted.") @@ -147,13 +180,20 @@ proc unsubscribe*( error "Failed to decode unsubscribe event", error = error return err("Failed to decode unsubscribe event: " & error) - if not node.wakuRelay.isSubscribed(pubsubTopic): - warn "No-effect API call to `unsubscribe`. Was not subscribed", pubsubTopic - return ok() + let hadHandler = node.legacyAppHandlers.hasKey(pubsubTopic) + if hadHandler: + node.legacyAppHandlers.del(pubsubTopic) - info "unsubscribe", pubsubTopic, contentTopicOp - node.wakuRelay.unsubscribe(pubsubTopic) - node.topicSubscriptionQueue.emit((kind: PubsubUnsub, topic: pubsubTopic)) + if node.wakuRelay.isSubscribed(pubsubTopic): + info "unsubscribe", pubsubTopic, contentTopicOp + node.wakuRelay.unsubscribe(pubsubTopic) + node.topicSubscriptionQueue.emit((kind: PubsubUnsub, topic: pubsubTopic)) + else: + if not hadHandler: + warn "No-effect API call to `unsubscribe`. Was not subscribed", pubsubTopic + else: + info "unsubscribe (was not subscribed in the mesh; appHandler removed)", + pubsubTopic = pubsubTopic return ok() diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 254387c32..0cef4cc5d 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -146,6 +146,8 @@ type started*: bool # Indicates that node has started listening topicSubscriptionQueue*: AsyncEventQueue[SubscriptionEvent] rateLimitSettings*: ProtocolRateLimitSettings + legacyAppHandlers*: Table[PubsubTopic, WakuRelayHandler] + ## Kernel API Relay appHandlers (if any) wakuMix*: WakuMix edgeTopicsHealth*: Table[PubsubTopic, TopicHealth] edgeHealthEvent*: AsyncEvent