diff --git a/tests/api/test_all.nim b/tests/api/test_all.nim index 57f7f37f2..4617c8cdb 100644 --- a/tests/api/test_all.nim +++ b/tests/api/test_all.nim @@ -1,3 +1,8 @@ {.used.} -import ./test_entry_nodes, ./test_node_conf, ./test_api_send, ./test_api_health +import + ./test_entry_nodes, + ./test_node_conf, + ./test_api_send, + ./test_api_subscription, + ./test_api_health diff --git a/tests/api/test_api_subscription.nim b/tests/api/test_api_subscription.nim new file mode 100644 index 000000000..8983c2934 --- /dev/null +++ b/tests/api/test_api_subscription.nim @@ -0,0 +1,399 @@ +{.used.} + +import std/[strutils, net, options, sets] +import chronos, testutils/unittests, stew/byteutils +import libp2p/[peerid, peerinfo, multiaddress, crypto/crypto] +import ../testlib/[common, wakucore, wakunode, testasync] + +import + waku, + waku/[ + waku_node, + waku_core, + common/broker/broker_context, + events/message_events, + waku_relay/protocol, + ] +import waku/api/api_conf, waku/factory/waku_conf + +# TODO: Edge testing (after MAPI edge support is completed) + +const TestTimeout = chronos.seconds(10) +const NegativeTestTimeout = chronos.seconds(2) + +type ReceiveEventListenerManager = ref object + brokerCtx: BrokerContext + receivedListener: MessageReceivedEventListener + receivedEvent: AsyncEvent + receivedMessages: seq[WakuMessage] + targetCount: int + +proc newReceiveEventListenerManager( + brokerCtx: BrokerContext, expectedCount: int = 1 +): ReceiveEventListenerManager = + let manager = ReceiveEventListenerManager( + brokerCtx: brokerCtx, receivedMessages: @[], targetCount: expectedCount + ) + manager.receivedEvent = newAsyncEvent() + + manager.receivedListener = MessageReceivedEvent + .listen( + brokerCtx, + proc(event: MessageReceivedEvent) {.async: (raises: []).} = + manager.receivedMessages.add(event.message) + + if manager.receivedMessages.len >= manager.targetCount: + manager.receivedEvent.fire() + , + ) + .expect("Failed to listen to MessageReceivedEvent") + + return manager + +proc teardown(manager: ReceiveEventListenerManager) = + MessageReceivedEvent.dropListener(manager.brokerCtx, manager.receivedListener) + +proc waitForEvents( + manager: ReceiveEventListenerManager, timeout: Duration +): Future[bool] {.async.} = + return await manager.receivedEvent.wait().withTimeout(timeout) + +type TestNetwork = ref object + publisher: WakuNode + subscriber: Waku + publisherPeerInfo: RemotePeerInfo + +proc createApiNodeConf( + mode: WakuMode = WakuMode.Core, numShards: uint16 = 1 +): NodeConfig = + let netConf = NetworkingConfig(listenIpv4: "0.0.0.0", p2pTcpPort: 0, discv5UdpPort: 0) + result = NodeConfig.init( + mode = mode, + protocolsConfig = ProtocolsConfig.init( + entryNodes = @[], + clusterId = 1, + autoShardingConfig = AutoShardingConfig(numShardsInCluster: numShards), + ), + networkingConfig = netConf, + p2pReliability = true, + ) + +proc setupSubscriberNode(conf: NodeConfig): Future[Waku] {.async.} = + var node: Waku + lockNewGlobalBrokerContext: + node = (await createNode(conf)).expect("Failed to create subscriber node") + (await startWaku(addr node)).expect("Failed to start subscriber node") + return node + +proc setupNetwork( + numShards: uint16 = 1, mode: WakuMode = WakuMode.Core +): Future[TestNetwork] {.async.} = + var net = TestNetwork() + + lockNewGlobalBrokerContext: + net.publisher = + newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + net.publisher.mountMetadata(1, @[0'u16]).expect("Failed to mount metadata") + (await net.publisher.mountRelay()).expect("Failed to mount relay") + await net.publisher.mountLibp2pPing() + await net.publisher.start() + + net.publisherPeerInfo = net.publisher.peerInfo.toRemotePeerInfo() + + proc dummyHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = + discard + + # Subscribe the publisher to all shards to guarantee a GossipSub mesh with the subscriber. + # Currently, Core/Relay nodes auto-subscribe to all network shards on boot, but if + # that changes, this will be needed to cause the publisher to have shard interest + # for any shards the subscriber may want to use, which is required for waitForMesh to work. + for i in 0 ..< numShards.int: + let shard = PubsubTopic("/waku/2/rs/1/" & $i) + net.publisher.subscribe((kind: PubsubSub, topic: shard), dummyHandler).expect( + "Failed to sub publisher" + ) + + net.subscriber = await setupSubscriberNode(createApiNodeConf(mode, numShards)) + + await net.subscriber.node.connectToNodes(@[net.publisherPeerInfo]) + + return net + +proc teardown(net: TestNetwork) {.async.} = + if not isNil(net.subscriber): + (await net.subscriber.stop()).expect("Failed to stop subscriber node") + net.subscriber = nil + + if not isNil(net.publisher): + await net.publisher.stop() + net.publisher = nil + +proc getRelayShard(node: WakuNode, contentTopic: ContentTopic): PubsubTopic = + let autoSharding = node.wakuAutoSharding.get() + let shardObj = autoSharding.getShard(contentTopic).expect("Failed to get shard") + return PubsubTopic($shardObj) + +proc waitForMesh(node: WakuNode, shard: PubsubTopic) {.async.} = + for _ in 0 ..< 50: + if node.wakuRelay.getNumPeersInMesh(shard).valueOr(0) > 0: + return + await sleepAsync(100.milliseconds) + raise newException(ValueError, "GossipSub Mesh failed to stabilize on " & shard) + +proc publishToMesh( + net: TestNetwork, contentTopic: ContentTopic, payload: seq[byte] +): Future[Result[int, string]] {.async.} = + let shard = net.subscriber.node.getRelayShard(contentTopic) + + await waitForMesh(net.publisher, shard) + + let msg = WakuMessage( + payload: payload, contentTopic: contentTopic, version: 0, timestamp: now() + ) + return await net.publisher.publish(some(shard), msg) + +suite "Messaging API, SubscriptionManager": + asyncTest "Subscription API, relay node auto subscribe and receive message": + let net = await setupNetwork(1) + defer: + await net.teardown() + + let testTopic = ContentTopic("/waku/2/test-content/proto") + (await net.subscriber.subscribe(testTopic)).expect( + "subscriberNode failed to subscribe" + ) + + let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1) + defer: + eventManager.teardown() + + discard (await net.publishToMesh(testTopic, "Hello, world!".toBytes())).expect( + "Publish failed" + ) + + require await eventManager.waitForEvents(TestTimeout) + require eventManager.receivedMessages.len == 1 + check eventManager.receivedMessages[0].contentTopic == testTopic + + asyncTest "Subscription API, relay node ignores unsubscribed content topics on same shard": + let net = await setupNetwork(1) + defer: + await net.teardown() + + let subbedTopic = ContentTopic("/waku/2/subbed-topic/proto") + let ignoredTopic = ContentTopic("/waku/2/ignored-topic/proto") + (await net.subscriber.subscribe(subbedTopic)).expect("failed to subscribe") + + let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1) + defer: + eventManager.teardown() + + discard (await net.publishToMesh(ignoredTopic, "Ghost Msg".toBytes())).expect( + "Publish failed" + ) + + check not await eventManager.waitForEvents(NegativeTestTimeout) + check eventManager.receivedMessages.len == 0 + + asyncTest "Subscription API, relay node unsubscribe stops message receipt": + let net = await setupNetwork(1) + defer: + await net.teardown() + + let testTopic = ContentTopic("/waku/2/unsub-test/proto") + + (await net.subscriber.subscribe(testTopic)).expect("failed to subscribe") + net.subscriber.unsubscribe(testTopic).expect("failed to unsubscribe") + + let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1) + defer: + eventManager.teardown() + + discard (await net.publishToMesh(testTopic, "Should be dropped".toBytes())).expect( + "Publish failed" + ) + + check not await eventManager.waitForEvents(NegativeTestTimeout) + check eventManager.receivedMessages.len == 0 + + asyncTest "Subscription API, overlapping topics on same shard maintain correct isolation": + let net = await setupNetwork(1) + defer: + await net.teardown() + + let topicA = ContentTopic("/waku/2/topic-a/proto") + let topicB = ContentTopic("/waku/2/topic-b/proto") + (await net.subscriber.subscribe(topicA)).expect("failed to sub A") + (await net.subscriber.subscribe(topicB)).expect("failed to sub B") + + let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1) + defer: + eventManager.teardown() + + net.subscriber.unsubscribe(topicA).expect("failed to unsub A") + + discard (await net.publishToMesh(topicA, "Dropped Message".toBytes())).expect( + "Publish A failed" + ) + discard + (await net.publishToMesh(topicB, "Kept Msg".toBytes())).expect("Publish B failed") + + require await eventManager.waitForEvents(TestTimeout) + require eventManager.receivedMessages.len == 1 + check eventManager.receivedMessages[0].contentTopic == topicB + + asyncTest "Subscription API, redundant subs tolerated and subs are removed": + let net = await setupNetwork(1) + defer: + await net.teardown() + + let glitchTopic = ContentTopic("/waku/2/glitch/proto") + + (await net.subscriber.subscribe(glitchTopic)).expect("failed to sub") + (await net.subscriber.subscribe(glitchTopic)).expect("failed to double sub") + net.subscriber.unsubscribe(glitchTopic).expect("failed to unsub") + + let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1) + defer: + eventManager.teardown() + + discard (await net.publishToMesh(glitchTopic, "Ghost Msg".toBytes())).expect( + "Publish failed" + ) + + check not await eventManager.waitForEvents(NegativeTestTimeout) + check eventManager.receivedMessages.len == 0 + + asyncTest "Subscription API, resubscribe to an unsubscribed topic": + let net = await setupNetwork(1) + defer: + await net.teardown() + + let testTopic = ContentTopic("/waku/2/resub-test/proto") + + # Subscribe + (await net.subscriber.subscribe(testTopic)).expect("Initial sub failed") + + var eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1) + discard + (await net.publishToMesh(testTopic, "Msg 1".toBytes())).expect("Pub 1 failed") + + require await eventManager.waitForEvents(TestTimeout) + eventManager.teardown() + + # Unsubscribe and verify teardown + net.subscriber.unsubscribe(testTopic).expect("Unsub failed") + eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1) + + discard + (await net.publishToMesh(testTopic, "Ghost".toBytes())).expect("Ghost pub failed") + + check not await eventManager.waitForEvents(NegativeTestTimeout) + eventManager.teardown() + + # Resubscribe + (await net.subscriber.subscribe(testTopic)).expect("Resub failed") + eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1) + + discard + (await net.publishToMesh(testTopic, "Msg 2".toBytes())).expect("Pub 2 failed") + + require await eventManager.waitForEvents(TestTimeout) + check eventManager.receivedMessages[0].payload == "Msg 2".toBytes() + + asyncTest "Subscription API, two content topics in different shards": + let net = await setupNetwork(8) + defer: + await net.teardown() + + var topicA = ContentTopic("/appA/2/shard-test-a/proto") + var topicB = ContentTopic("/appB/2/shard-test-b/proto") + + # generate two content topics that land in two different shards + var i = 0 + while net.subscriber.node.getRelayShard(topicA) == + net.subscriber.node.getRelayShard(topicB): + topicB = ContentTopic("/appB" & $i & "/2/shard-test-b/proto") + inc i + + (await net.subscriber.subscribe(topicA)).expect("failed to sub A") + (await net.subscriber.subscribe(topicB)).expect("failed to sub B") + + let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 2) + defer: + eventManager.teardown() + + discard (await net.publishToMesh(topicA, "Msg on Shard A".toBytes())).expect( + "Publish A failed" + ) + discard (await net.publishToMesh(topicB, "Msg on Shard B".toBytes())).expect( + "Publish B failed" + ) + + require await eventManager.waitForEvents(TestTimeout) + require eventManager.receivedMessages.len == 2 + + asyncTest "Subscription API, many content topics in many shards": + let net = await setupNetwork(8) + defer: + await net.teardown() + + var allTopics: seq[ContentTopic] + for i in 0 ..< 100: + allTopics.add(ContentTopic("/stress-app-" & $i & "/2/state-test/proto")) + + var activeSubs: seq[ContentTopic] + + proc verifyNetworkState(expected: seq[ContentTopic]) {.async.} = + let eventManager = + newReceiveEventListenerManager(net.subscriber.brokerCtx, expected.len) + + for topic in allTopics: + discard (await net.publishToMesh(topic, "Stress Payload".toBytes())).expect( + "publish failed" + ) + + require await eventManager.waitForEvents(TestTimeout) + + # here we just give a chance for any messages that we don't expect to arrive + await sleepAsync(1.seconds) + eventManager.teardown() + + # weak check (but catches most bugs) + require eventManager.receivedMessages.len == expected.len + + # strict expected receipt test + var receivedTopics = initHashSet[ContentTopic]() + for msg in eventManager.receivedMessages: + receivedTopics.incl(msg.contentTopic) + var expectedTopics = initHashSet[ContentTopic]() + for t in expected: + expectedTopics.incl(t) + + check receivedTopics == expectedTopics + + # subscribe to all content topics we generated + for t in allTopics: + (await net.subscriber.subscribe(t)).expect("sub failed") + activeSubs.add(t) + + await verifyNetworkState(activeSubs) + + # unsubscribe from some content topics + for i in 0 ..< 50: + let t = allTopics[i] + net.subscriber.unsubscribe(t).expect("unsub failed") + + let idx = activeSubs.find(t) + if idx >= 0: + activeSubs.del(idx) + + await verifyNetworkState(activeSubs) + + # re-subscribe to some content topics + for i in 0 ..< 25: + let t = allTopics[i] + (await net.subscriber.subscribe(t)).expect("resub failed") + activeSubs.add(t) + + await verifyNetworkState(activeSubs) diff --git a/tests/waku_store/test_wakunode_store.nim b/tests/waku_store/test_wakunode_store.nim index e30854906..9239435af 100644 --- a/tests/waku_store/test_wakunode_store.nim +++ b/tests/waku_store/test_wakunode_store.nim @@ -374,6 +374,12 @@ procSuite "WakuNode - Store": waitFor allFutures(client.stop(), server.stop()) test "Store protocol queries overrun request rate limitation": + when defined(macosx): + # on macos CI, this test is resulting a code 200 (OK) instead of a 429 error + # means the runner is somehow too slow to cause a request limit failure + skip() + return + ## Setup let serverKey = generateSecp256k1Key() diff --git a/waku/api/api.nim b/waku/api/api.nim index 3493513a3..ba6f83b78 100644 --- a/waku/api/api.nim +++ b/waku/api/api.nim @@ -3,7 +3,7 @@ import chronicles, chronos, results, std/strutils import waku/factory/waku import waku/[requests/health_requests, waku_core, waku_node] import waku/node/delivery_service/send_service -import waku/node/delivery_service/subscription_service +import waku/node/delivery_service/subscription_manager import libp2p/peerid import ./[api_conf, types] @@ -36,18 +36,27 @@ proc subscribe*( ): Future[Result[void, string]] {.async.} = ?checkApiAvailability(w) - return w.deliveryService.subscriptionService.subscribe(contentTopic) + return w.deliveryService.subscriptionManager.subscribe(contentTopic) proc unsubscribe*(w: Waku, contentTopic: ContentTopic): Result[void, string] = ?checkApiAvailability(w) - return w.deliveryService.subscriptionService.unsubscribe(contentTopic) + return w.deliveryService.subscriptionManager.unsubscribe(contentTopic) proc send*( w: Waku, envelope: MessageEnvelope ): Future[Result[RequestId, string]] {.async.} = ?checkApiAvailability(w) + let isSubbed = w.deliveryService.subscriptionManager + .isSubscribed(envelope.contentTopic) + .valueOr(false) + if not isSubbed: + info "Auto-subscribing to topic on send", contentTopic = envelope.contentTopic + w.deliveryService.subscriptionManager.subscribe(envelope.contentTopic).isOkOr: + warn "Failed to auto-subscribe", error = error + return err("Failed to auto-subscribe before sending: " & error) + let requestId = RequestId.new(w.rng) let deliveryTask = DeliveryTask.new(requestId, envelope, w.brokerCtx).valueOr: diff --git a/waku/events/message_events.nim b/waku/events/message_events.nim index cf3dac9b7..677a4a433 100644 --- a/waku/events/message_events.nim +++ b/waku/events/message_events.nim @@ -1,6 +1,4 @@ -import waku/common/broker/event_broker -import waku/api/types -import waku/waku_core/message +import waku/[api/types, waku_core/message, waku_core/topics, common/broker/event_broker] export types @@ -28,3 +26,9 @@ EventBroker: type MessageReceivedEvent* = object messageHash*: string message*: WakuMessage + +EventBroker: + # Internal event emitted when a message arrives from the network via any protocol + type MessageSeenEvent* = object + topic*: PubsubTopic + message*: WakuMessage diff --git a/waku/factory/waku.nim b/waku/factory/waku.nim index 9803a53a9..ff0ab0568 100644 --- a/waku/factory/waku.nim +++ b/waku/factory/waku.nim @@ -35,6 +35,7 @@ import node/health_monitor, node/waku_metrics, node/delivery_service/delivery_service, + node/delivery_service/subscription_manager, rest_api/message_cache, rest_api/endpoint/server, rest_api/endpoint/builder as rest_server_builder, @@ -453,7 +454,7 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises: ).isOkOr: error "Failed to set RequestProtocolHealth provider", error = error - ## Setup RequestHealthReport provider (The lost child) + ## Setup RequestHealthReport provider RequestHealthReport.setProvider( globalBrokerContext(), @@ -514,6 +515,10 @@ proc stop*(waku: Waku): Future[Result[void, string]] {.async: (raises: []).} = if not waku.wakuDiscv5.isNil(): await waku.wakuDiscv5.stop() + if not waku.deliveryService.isNil(): + await waku.deliveryService.stopDeliveryService() + waku.deliveryService = nil + if not waku.node.isNil(): await waku.node.stop() diff --git a/waku/node/delivery_service/delivery_service.nim b/waku/node/delivery_service/delivery_service.nim index 8106cba9f..258c01e95 100644 --- a/waku/node/delivery_service/delivery_service.nim +++ b/waku/node/delivery_service/delivery_service.nim @@ -5,7 +5,7 @@ import chronos import ./recv_service, ./send_service, - ./subscription_service, + ./subscription_manager, waku/[ waku_core, waku_node, @@ -18,29 +18,31 @@ import type DeliveryService* = ref object sendService*: SendService recvService: RecvService - subscriptionService*: SubscriptionService + subscriptionManager*: SubscriptionManager proc new*( T: type DeliveryService, useP2PReliability: bool, w: WakuNode ): Result[T, string] = ## storeClient is needed to give store visitility to DeliveryService ## wakuRelay and wakuLightpushClient are needed to give a mechanism to SendService to re-publish - let subscriptionService = SubscriptionService.new(w) - let sendService = ?SendService.new(useP2PReliability, w, subscriptionService) - let recvService = RecvService.new(w, subscriptionService) + let subscriptionManager = SubscriptionManager.new(w) + let sendService = ?SendService.new(useP2PReliability, w, subscriptionManager) + let recvService = RecvService.new(w, subscriptionManager) return ok( DeliveryService( sendService: sendService, recvService: recvService, - subscriptionService: subscriptionService, + subscriptionManager: subscriptionManager, ) ) proc startDeliveryService*(self: DeliveryService) = - self.sendService.startSendService() + self.subscriptionManager.startSubscriptionManager() self.recvService.startRecvService() + self.sendService.startSendService() proc stopDeliveryService*(self: DeliveryService) {.async.} = - self.sendService.stopSendService() + await self.sendService.stopSendService() await self.recvService.stopRecvService() + await self.subscriptionManager.stopSubscriptionManager() diff --git a/waku/node/delivery_service/recv_service/recv_service.nim b/waku/node/delivery_service/recv_service/recv_service.nim index 12780033a..0eba2c450 100644 --- a/waku/node/delivery_service/recv_service/recv_service.nim +++ b/waku/node/delivery_service/recv_service/recv_service.nim @@ -2,9 +2,9 @@ ## receive and is backed by store-v3 requests to get an additional degree of certainty ## -import std/[tables, sequtils, options] +import std/[tables, sequtils, options, sets] import chronos, chronicles, libp2p/utility -import ../[subscription_service] +import ../[subscription_manager] import waku/[ waku_core, @@ -13,6 +13,7 @@ import waku_filter_v2/client, waku_core/topics, events/delivery_events, + events/message_events, waku_node, common/broker/broker_context, ] @@ -35,14 +36,9 @@ type RecvMessage = object type RecvService* = ref object of RootObj brokerCtx: BrokerContext - topicsInterest: Table[PubsubTopic, seq[ContentTopic]] - ## Tracks message verification requests and when was the last time a - ## pubsub topic was verified for missing messages - ## The key contains pubsub-topics node: WakuNode - onSubscribeListener: OnFilterSubscribeEventListener - onUnsubscribeListener: OnFilterUnsubscribeEventListener - subscriptionService: SubscriptionService + seenMsgListener: MessageSeenEventListener + subscriptionManager: SubscriptionManager recentReceivedMsgs: seq[RecvMessage] @@ -95,20 +91,20 @@ proc msgChecker(self: RecvService) {.async.} = self.endTimeToCheck = getNowInNanosecondTime() var msgHashesInStore = newSeq[WakuMessageHash](0) - for pubsubTopic, cTopics in self.topicsInterest.pairs: + for sub in self.subscriptionManager.getActiveSubscriptions(): let storeResp: StoreQueryResponse = ( await self.node.wakuStoreClient.queryToAny( StoreQueryRequest( includeData: false, - pubsubTopic: some(PubsubTopic(pubsubTopic)), - contentTopics: cTopics, + pubsubTopic: some(PubsubTopic(sub.pubsubTopic)), + contentTopics: sub.contentTopics, startTime: some(self.startTimeToCheck - DelayExtra.nanos), endTime: some(self.endTimeToCheck + DelayExtra.nanos), ) ) ).valueOr: error "msgChecker failed to get remote msgHashes", - pubsubTopic, cTopics, error = $error + pubsubTopic = sub.pubsubTopic, cTopics = sub.contentTopics, error = $error continue msgHashesInStore.add(storeResp.messages.mapIt(it.messageHash)) @@ -133,31 +129,20 @@ proc msgChecker(self: RecvService) {.async.} = ## update next check times self.startTimeToCheck = self.endTimeToCheck -proc onSubscribe( - self: RecvService, pubsubTopic: string, contentTopics: seq[string] -) {.gcsafe, raises: [].} = - info "onSubscribe", pubsubTopic, contentTopics - self.topicsInterest.withValue(pubsubTopic, contentTopicsOfInterest): - contentTopicsOfInterest[].add(contentTopics) - do: - self.topicsInterest[pubsubTopic] = contentTopics +proc processIncomingMessageOfInterest( + self: RecvService, pubsubTopic: string, message: WakuMessage +) = + ## Resolve an incoming network message that was already filtered by topic. + ## Deduplicate (by hash), store (saves in recently-seen messages) and emit + ## the MAPI MessageReceivedEvent for every unique incoming message. -proc onUnsubscribe( - self: RecvService, pubsubTopic: string, contentTopics: seq[string] -) {.gcsafe, raises: [].} = - info "onUnsubscribe", pubsubTopic, contentTopics + let msgHash = computeMessageHash(pubsubTopic, message) + if not self.recentReceivedMsgs.anyIt(it.msgHash == msgHash): + let rxMsg = RecvMessage(msgHash: msgHash, rxTime: message.timestamp) + self.recentReceivedMsgs.add(rxMsg) + MessageReceivedEvent.emit(self.brokerCtx, msgHash.to0xHex(), message) - self.topicsInterest.withValue(pubsubTopic, contentTopicsOfInterest): - let remainingCTopics = - contentTopicsOfInterest[].filterIt(not contentTopics.contains(it)) - contentTopicsOfInterest[] = remainingCTopics - - if remainingCTopics.len == 0: - self.topicsInterest.del(pubsubTopic) - do: - error "onUnsubscribe unsubscribing from wrong topic", pubsubTopic, contentTopics - -proc new*(T: typedesc[RecvService], node: WakuNode, s: SubscriptionService): T = +proc new*(T: typedesc[RecvService], node: WakuNode, s: SubscriptionManager): T = ## The storeClient will help to acquire any possible missed messages let now = getNowInNanosecondTime() @@ -165,22 +150,13 @@ proc new*(T: typedesc[RecvService], node: WakuNode, s: SubscriptionService): T = node: node, startTimeToCheck: now, brokerCtx: node.brokerCtx, - subscriptionService: s, - topicsInterest: initTable[PubsubTopic, seq[ContentTopic]](), + subscriptionManager: s, recentReceivedMsgs: @[], ) - if not node.wakuFilterClient.isNil(): - let filterPushHandler = proc( - pubsubTopic: PubsubTopic, message: WakuMessage - ) {.async, closure.} = - ## Captures all the messages received through filter - - let msgHash = computeMessageHash(pubSubTopic, message) - let rxMsg = RecvMessage(msgHash: msgHash, rxTime: message.timestamp) - recvService.recentReceivedMsgs.add(rxMsg) - - node.wakuFilterClient.registerPushHandler(filterPushHandler) + # TODO: For MAPI Edge support, either call node.wakuFilterClient.registerPushHandler + # so that the RecvService listens to incoming filter messages, + # or have the filter client emit MessageSeenEvent. return recvService @@ -194,26 +170,26 @@ proc startRecvService*(self: RecvService) = self.msgCheckerHandler = self.msgChecker() self.msgPrunerHandler = self.loopPruneOldMessages() - self.onSubscribeListener = OnFilterSubscribeEvent.listen( + self.seenMsgListener = MessageSeenEvent.listen( self.brokerCtx, - proc(subsEv: OnFilterSubscribeEvent) {.async: (raises: []).} = - self.onSubscribe(subsEv.pubsubTopic, subsEv.contentTopics), - ).valueOr: - error "Failed to set OnFilterSubscribeEvent listener", error = error - quit(QuitFailure) + proc(event: MessageSeenEvent) {.async: (raises: []).} = + if not self.subscriptionManager.isSubscribed( + event.topic, event.message.contentTopic + ): + trace "skipping message as I am not subscribed", + shard = event.topic, contenttopic = event.message.contentTopic + return - self.onUnsubscribeListener = OnFilterUnsubscribeEvent.listen( - self.brokerCtx, - proc(subsEv: OnFilterUnsubscribeEvent) {.async: (raises: []).} = - self.onUnsubscribe(subsEv.pubsubTopic, subsEv.contentTopics), + self.processIncomingMessageOfInterest(event.topic, event.message), ).valueOr: - error "Failed to set OnFilterUnsubscribeEvent listener", error = error + error "Failed to set MessageSeenEvent listener", error = error quit(QuitFailure) proc stopRecvService*(self: RecvService) {.async.} = - OnFilterSubscribeEvent.dropListener(self.brokerCtx, self.onSubscribeListener) - OnFilterUnsubscribeEvent.dropListener(self.brokerCtx, self.onUnsubscribeListener) + MessageSeenEvent.dropListener(self.brokerCtx, self.seenMsgListener) if not self.msgCheckerHandler.isNil(): await self.msgCheckerHandler.cancelAndWait() + self.msgCheckerHandler = nil if not self.msgPrunerHandler.isNil(): await self.msgPrunerHandler.cancelAndWait() + self.msgPrunerHandler = nil diff --git a/waku/node/delivery_service/send_service/send_service.nim b/waku/node/delivery_service/send_service/send_service.nim index a41d07786..a3c44bc0c 100644 --- a/waku/node/delivery_service/send_service/send_service.nim +++ b/waku/node/delivery_service/send_service/send_service.nim @@ -5,7 +5,7 @@ import std/[sequtils, tables, options] import chronos, chronicles, libp2p/utility import ./[send_processor, relay_processor, lightpush_processor, delivery_task], - ../[subscription_service], + ../[subscription_manager], waku/[ waku_core, node/waku_node, @@ -58,7 +58,7 @@ type SendService* = ref object of RootObj node: WakuNode checkStoreForMessages: bool - subscriptionService: SubscriptionService + subscriptionManager: SubscriptionManager proc setupSendProcessorChain( peerManager: PeerManager, @@ -99,7 +99,7 @@ proc new*( T: typedesc[SendService], preferP2PReliability: bool, w: WakuNode, - s: SubscriptionService, + s: SubscriptionManager, ): Result[T, string] = if w.wakuRelay.isNil() and w.wakuLightpushClient.isNil(): return err( @@ -120,7 +120,7 @@ proc new*( sendProcessor: sendProcessorChain, node: w, checkStoreForMessages: checkStoreForMessages, - subscriptionService: s, + subscriptionManager: s, ) return ok(sendService) @@ -250,9 +250,9 @@ proc serviceLoop(self: SendService) {.async.} = proc startSendService*(self: SendService) = self.serviceLoopHandle = self.serviceLoop() -proc stopSendService*(self: SendService) = +proc stopSendService*(self: SendService) {.async.} = if not self.serviceLoopHandle.isNil(): - discard self.serviceLoopHandle.cancelAndWait() + await self.serviceLoopHandle.cancelAndWait() proc send*(self: SendService, task: DeliveryTask) {.async.} = assert(not task.isNil(), "task for send must not be nil") @@ -260,7 +260,7 @@ proc send*(self: SendService, task: DeliveryTask) {.async.} = info "SendService.send: processing delivery task", requestId = task.requestId, msgHash = task.msgHash.to0xHex() - self.subscriptionService.subscribe(task.msg.contentTopic).isOkOr: + self.subscriptionManager.subscribe(task.msg.contentTopic).isOkOr: error "SendService.send: failed to subscribe to content topic", contentTopic = task.msg.contentTopic, error = error diff --git a/waku/node/delivery_service/subscription_manager.nim b/waku/node/delivery_service/subscription_manager.nim new file mode 100644 index 000000000..22df47413 --- /dev/null +++ b/waku/node/delivery_service/subscription_manager.nim @@ -0,0 +1,164 @@ +import std/[sets, tables, options, strutils], chronos, chronicles, results +import + waku/[ + waku_core, + waku_core/topics, + waku_core/topics/sharding, + waku_node, + waku_relay, + common/broker/broker_context, + events/delivery_events, + ] + +type SubscriptionManager* = ref object of RootObj + node: WakuNode + contentTopicSubs: Table[PubsubTopic, HashSet[ContentTopic]] + ## Map of Shard to ContentTopic needed because e.g. WakuRelay is PubsubTopic only. + ## A present key with an empty HashSet value means pubsubtopic already subscribed + ## (via subscribePubsubTopics()) but there's no specific content topic interest yet. + +proc new*(T: typedesc[SubscriptionManager], node: WakuNode): T = + SubscriptionManager( + node: node, contentTopicSubs: initTable[PubsubTopic, HashSet[ContentTopic]]() + ) + +proc addContentTopicInterest( + self: SubscriptionManager, shard: PubsubTopic, topic: ContentTopic +): Result[void, string] = + if not self.contentTopicSubs.hasKey(shard): + self.contentTopicSubs[shard] = initHashSet[ContentTopic]() + + self.contentTopicSubs.withValue(shard, cTopics): + if not cTopics[].contains(topic): + cTopics[].incl(topic) + + # TODO: Call a "subscribe(shard, topic)" on filter client here, + # so the filter client can know that subscriptions changed. + + return ok() + +proc removeContentTopicInterest( + self: SubscriptionManager, shard: PubsubTopic, topic: ContentTopic +): Result[void, string] = + self.contentTopicSubs.withValue(shard, cTopics): + if cTopics[].contains(topic): + cTopics[].excl(topic) + + if cTopics[].len == 0 and isNil(self.node.wakuRelay): + self.contentTopicSubs.del(shard) # We're done with cTopics here + + # TODO: Call a "unsubscribe(shard, topic)" on filter client here, + # so the filter client can know that subscriptions changed. + + return ok() + +proc subscribePubsubTopics( + self: SubscriptionManager, shards: seq[PubsubTopic] +): Result[void, string] = + if isNil(self.node.wakuRelay): + return err("subscribePubsubTopics requires a Relay") + + var errors: seq[string] = @[] + + for shard in shards: + if not self.contentTopicSubs.hasKey(shard): + self.node.subscribe((kind: PubsubSub, topic: shard), nil).isOkOr: + errors.add("shard " & shard & ": " & error) + continue + + self.contentTopicSubs[shard] = initHashSet[ContentTopic]() + + if errors.len > 0: + return err("subscribeShard errors: " & errors.join("; ")) + + return ok() + +proc startSubscriptionManager*(self: SubscriptionManager) = + if isNil(self.node.wakuRelay): + return + + if self.node.wakuAutoSharding.isSome(): + # Subscribe relay to all shards in autosharding. + let autoSharding = self.node.wakuAutoSharding.get() + let clusterId = autoSharding.clusterId + let numShards = autoSharding.shardCountGenZero + + if numShards > 0: + var clusterPubsubTopics = newSeqOfCap[PubsubTopic](numShards) + + for i in 0 ..< numShards: + let shardObj = RelayShard(clusterId: clusterId, shardId: uint16(i)) + clusterPubsubTopics.add(PubsubTopic($shardObj)) + + self.subscribePubsubTopics(clusterPubsubTopics).isOkOr: + error "Failed to auto-subscribe Relay to cluster shards: ", error = error + else: + info "SubscriptionManager has no AutoSharding configured; skipping auto-subscribe." + +proc stopSubscriptionManager*(self: SubscriptionManager) {.async.} = + discard + +proc getActiveSubscriptions*( + self: SubscriptionManager +): seq[tuple[pubsubTopic: string, contentTopics: seq[ContentTopic]]] = + var activeSubs: seq[tuple[pubsubTopic: string, contentTopics: seq[ContentTopic]]] = + @[] + + for pubsub, cTopicSet in self.contentTopicSubs.pairs: + if cTopicSet.len > 0: + var cTopicSeq = newSeqOfCap[ContentTopic](cTopicSet.len) + for t in cTopicSet: + cTopicSeq.add(t) + activeSubs.add((pubsub, cTopicSeq)) + + return activeSubs + +proc getShardForContentTopic( + self: SubscriptionManager, topic: ContentTopic +): Result[PubsubTopic, string] = + if self.node.wakuAutoSharding.isSome(): + let shardObj = ?self.node.wakuAutoSharding.get().getShard(topic) + return ok($shardObj) + + return err("SubscriptionManager requires AutoSharding") + +proc isSubscribed*( + self: SubscriptionManager, topic: ContentTopic +): Result[bool, string] = + let shard = ?self.getShardForContentTopic(topic) + return ok( + self.contentTopicSubs.hasKey(shard) and self.contentTopicSubs[shard].contains(topic) + ) + +proc isSubscribed*( + self: SubscriptionManager, shard: PubsubTopic, contentTopic: ContentTopic +): bool {.raises: [].} = + self.contentTopicSubs.withValue(shard, cTopics): + return cTopics[].contains(contentTopic) + return false + +proc subscribe*(self: SubscriptionManager, topic: ContentTopic): Result[void, string] = + if isNil(self.node.wakuRelay) and isNil(self.node.wakuFilterClient): + return err("SubscriptionManager requires either Relay or Filter Client.") + + let shard = ?self.getShardForContentTopic(topic) + + if not isNil(self.node.wakuRelay) and not self.contentTopicSubs.hasKey(shard): + ?self.subscribePubsubTopics(@[shard]) + + ?self.addContentTopicInterest(shard, topic) + + return ok() + +proc unsubscribe*( + self: SubscriptionManager, topic: ContentTopic +): Result[void, string] = + if isNil(self.node.wakuRelay) and isNil(self.node.wakuFilterClient): + return err("SubscriptionManager requires either Relay or Filter Client.") + + let shard = ?self.getShardForContentTopic(topic) + + if self.isSubscribed(shard, topic): + ?self.removeContentTopicInterest(shard, topic) + + return ok() diff --git a/waku/node/delivery_service/subscription_service.nim b/waku/node/delivery_service/subscription_service.nim deleted file mode 100644 index 78763161b..000000000 --- a/waku/node/delivery_service/subscription_service.nim +++ /dev/null @@ -1,64 +0,0 @@ -import chronos, chronicles -import - waku/[ - waku_core, - waku_core/topics, - events/message_events, - waku_node, - common/broker/broker_context, - ] - -type SubscriptionService* = ref object of RootObj - brokerCtx: BrokerContext - node: WakuNode - -proc new*(T: typedesc[SubscriptionService], node: WakuNode): T = - ## The storeClient will help to acquire any possible missed messages - - return SubscriptionService(brokerCtx: node.brokerCtx, node: node) - -proc isSubscribed*( - self: SubscriptionService, topic: ContentTopic -): Result[bool, string] = - var isSubscribed = false - if self.node.wakuRelay.isNil() == false: - return self.node.isSubscribed((kind: ContentSub, topic: topic)) - - # TODO: Add support for edge mode with Filter subscription management - return ok(isSubscribed) - -#TODO: later PR may consider to refactor or place this function elsewhere -# The only important part is that it emits MessageReceivedEvent -proc getReceiveHandler(self: SubscriptionService): WakuRelayHandler = - return proc(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} = - let msgHash = computeMessageHash(topic, msg).to0xHex() - info "API received message", - pubsubTopic = topic, contentTopic = msg.contentTopic, msgHash = msgHash - - MessageReceivedEvent.emit(self.brokerCtx, msgHash, msg) - -proc subscribe*(self: SubscriptionService, topic: ContentTopic): Result[void, string] = - let isSubscribed = self.isSubscribed(topic).valueOr: - error "Failed to check subscription status: ", error = error - return err("Failed to check subscription status: " & error) - - if isSubscribed == false: - if self.node.wakuRelay.isNil() == false: - self.node.subscribe((kind: ContentSub, topic: topic), self.getReceiveHandler()).isOkOr: - error "Failed to subscribe: ", error = error - return err("Failed to subscribe: " & error) - - # TODO: Add support for edge mode with Filter subscription management - - return ok() - -proc unsubscribe*( - self: SubscriptionService, topic: ContentTopic -): Result[void, string] = - if self.node.wakuRelay.isNil() == false: - self.node.unsubscribe((kind: ContentSub, topic: topic)).isOkOr: - error "Failed to unsubscribe: ", error = error - return err("Failed to unsubscribe: " & error) - - # TODO: Add support for edge mode with Filter subscription management - return ok() diff --git a/waku/node/kernel_api/relay.nim b/waku/node/kernel_api/relay.nim index a0a128449..ec4d05ddd 100644 --- a/waku/node/kernel_api/relay.nim +++ b/waku/node/kernel_api/relay.nim @@ -19,16 +19,20 @@ import libp2p/utility import - ../waku_node, - ../../waku_relay, - ../../waku_core, - ../../waku_core/topics/sharding, - ../../waku_filter_v2, - ../../waku_archive_legacy, - ../../waku_archive, - ../../waku_store_sync, - ../peer_manager, - ../../waku_rln_relay + waku/[ + waku_relay, + waku_core, + waku_core/topics/sharding, + waku_filter_v2, + waku_archive_legacy, + waku_archive, + waku_store_sync, + waku_rln_relay, + node/waku_node, + node/peer_manager, + common/broker/broker_context, + events/message_events, + ] export waku_relay.WakuRelayHandler @@ -44,14 +48,25 @@ logScope: ## Waku relay proc registerRelayHandler( - node: WakuNode, topic: PubsubTopic, appHandler: WakuRelayHandler -) = + node: WakuNode, topic: PubsubTopic, appHandler: WakuRelayHandler = nil +): bool = ## Registers the only handler for the given topic. ## Notice that this handler internally calls other handlers, such as filter, ## archive, etc, plus the handler provided by the application. + ## Returns `true` if a mesh subscription was created or `false` if the relay + ## was already subscribed to the topic. - if node.wakuRelay.isSubscribed(topic): - return + let alreadySubscribed = node.wakuRelay.isSubscribed(topic) + + if not appHandler.isNil(): + if not alreadySubscribed or not node.legacyAppHandlers.hasKey(topic): + node.legacyAppHandlers[topic] = appHandler + else: + debug "Legacy appHandler already exists for active PubsubTopic, ignoring new handler", + topic = topic + + if alreadySubscribed: + return false proc traceHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = let msgSizeKB = msg.payload.len / 1000 @@ -82,6 +97,9 @@ proc registerRelayHandler( node.wakuStoreReconciliation.messageIngress(topic, msg) + proc internalHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = + MessageSeenEvent.emit(node.brokerCtx, topic, msg) + let uniqueTopicHandler = proc( topic: PubsubTopic, msg: WakuMessage ): Future[void] {.async, gcsafe.} = @@ -89,7 +107,15 @@ proc registerRelayHandler( await filterHandler(topic, msg) await archiveHandler(topic, msg) await syncHandler(topic, msg) - await appHandler(topic, msg) + await internalHandler(topic, msg) + + # Call the legacy (kernel API) app handler if it exists. + # Normally, hasKey is false and the MessageSeenEvent bus (new API) is used instead. + # But we need to support legacy behavior (kernel API use), hence this. + # NOTE: We can delete `legacyAppHandlers` if instead we refactor WakuRelay to support multiple + # PubsubTopic handlers, since that's actually supported by libp2p PubSub (bigger refactor...) + if node.legacyAppHandlers.hasKey(topic) and not node.legacyAppHandlers[topic].isNil(): + await node.legacyAppHandlers[topic](topic, msg) node.wakuRelay.subscribe(topic, uniqueTopicHandler) @@ -115,8 +141,11 @@ proc subscribe*( ): Result[void, string] = ## Subscribes to a PubSub or Content topic. Triggers handler when receiving messages on ## this topic. WakuRelayHandler is a method that takes a topic and a Waku message. + ## If `handler` is nil, the API call will subscribe to the topic in the relay mesh + ## but no app handler will be registered at this time (it can be registered later with + ## another call to this proc for the same gossipsub topic). - if node.wakuRelay.isNil(): + if isNil(node.wakuRelay): error "Invalid API call to `subscribe`. WakuRelay not mounted." return err("Invalid API call to `subscribe`. WakuRelay not mounted.") @@ -124,13 +153,15 @@ proc subscribe*( error "Failed to decode subscription event", error = error return err("Failed to decode subscription event: " & error) - if node.wakuRelay.isSubscribed(pubsubTopic): - warn "No-effect API call to subscribe. Already subscribed to topic", pubsubTopic - return ok() - - info "subscribe", pubsubTopic, contentTopicOp - node.registerRelayHandler(pubsubTopic, handler) - node.topicSubscriptionQueue.emit((kind: PubsubSub, topic: pubsubTopic)) + if node.registerRelayHandler(pubsubTopic, handler): + info "subscribe", pubsubTopic, contentTopicOp + node.topicSubscriptionQueue.emit((kind: PubsubSub, topic: pubsubTopic)) + else: + if isNil(handler): + warn "No-effect API call to subscribe. Already subscribed to topic", pubsubTopic + else: + info "subscribe (was already subscribed in the mesh; appHandler set)", + pubsubTopic = pubsubTopic return ok() @@ -138,8 +169,10 @@ proc unsubscribe*( node: WakuNode, subscription: SubscriptionEvent ): Result[void, string] = ## Unsubscribes from a specific PubSub or Content topic. + ## This will both unsubscribe from the relay mesh and remove the app handler, if any. + ## NOTE: This works because using MAPI and Kernel API at the same time is unsupported. - if node.wakuRelay.isNil(): + if isNil(node.wakuRelay): error "Invalid API call to `unsubscribe`. WakuRelay not mounted." return err("Invalid API call to `unsubscribe`. WakuRelay not mounted.") @@ -147,13 +180,20 @@ proc unsubscribe*( error "Failed to decode unsubscribe event", error = error return err("Failed to decode unsubscribe event: " & error) - if not node.wakuRelay.isSubscribed(pubsubTopic): - warn "No-effect API call to `unsubscribe`. Was not subscribed", pubsubTopic - return ok() + let hadHandler = node.legacyAppHandlers.hasKey(pubsubTopic) + if hadHandler: + node.legacyAppHandlers.del(pubsubTopic) - info "unsubscribe", pubsubTopic, contentTopicOp - node.wakuRelay.unsubscribe(pubsubTopic) - node.topicSubscriptionQueue.emit((kind: PubsubUnsub, topic: pubsubTopic)) + if node.wakuRelay.isSubscribed(pubsubTopic): + info "unsubscribe", pubsubTopic, contentTopicOp + node.wakuRelay.unsubscribe(pubsubTopic) + node.topicSubscriptionQueue.emit((kind: PubsubUnsub, topic: pubsubTopic)) + else: + if not hadHandler: + warn "No-effect API call to `unsubscribe`. Was not subscribed", pubsubTopic + else: + info "unsubscribe (was not subscribed in the mesh; appHandler removed)", + pubsubTopic = pubsubTopic return ok() diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 254387c32..0cef4cc5d 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -146,6 +146,8 @@ type started*: bool # Indicates that node has started listening topicSubscriptionQueue*: AsyncEventQueue[SubscriptionEvent] rateLimitSettings*: ProtocolRateLimitSettings + legacyAppHandlers*: Table[PubsubTopic, WakuRelayHandler] + ## Kernel API Relay appHandlers (if any) wakuMix*: WakuMix edgeTopicsHealth*: Table[PubsubTopic, TopicHealth] edgeHealthEvent*: AsyncEvent diff --git a/waku/waku_core/subscription/subscription_manager.nim b/waku/waku_core/subscription/subscription_manager.nim index 1b950b3b4..ccade763b 100644 --- a/waku/waku_core/subscription/subscription_manager.nim +++ b/waku/waku_core/subscription/subscription_manager.nim @@ -5,19 +5,19 @@ import std/tables, results, chronicles, chronos import ./push_handler, ../topics, ../message ## Subscription manager -type SubscriptionManager* = object +type LegacySubscriptionManager* = object subscriptions: TableRef[(string, ContentTopic), FilterPushHandler] -proc init*(T: type SubscriptionManager): T = - SubscriptionManager( +proc init*(T: type LegacySubscriptionManager): T = + LegacySubscriptionManager( subscriptions: newTable[(string, ContentTopic), FilterPushHandler]() ) -proc clear*(m: var SubscriptionManager) = +proc clear*(m: var LegacySubscriptionManager) = m.subscriptions.clear() proc registerSubscription*( - m: SubscriptionManager, + m: LegacySubscriptionManager, pubsubTopic: PubsubTopic, contentTopic: ContentTopic, handler: FilterPushHandler, @@ -29,12 +29,12 @@ proc registerSubscription*( error "failed to register filter subscription", error = getCurrentExceptionMsg() proc removeSubscription*( - m: SubscriptionManager, pubsubTopic: PubsubTopic, contentTopic: ContentTopic + m: LegacySubscriptionManager, pubsubTopic: PubsubTopic, contentTopic: ContentTopic ) = m.subscriptions.del((pubsubTopic, contentTopic)) proc notifySubscriptionHandler*( - m: SubscriptionManager, + m: LegacySubscriptionManager, pubsubTopic: PubsubTopic, contentTopic: ContentTopic, message: WakuMessage, @@ -48,5 +48,5 @@ proc notifySubscriptionHandler*( except CatchableError: discard -proc getSubscriptionsCount*(m: SubscriptionManager): int = +proc getSubscriptionsCount*(m: LegacySubscriptionManager): int = m.subscriptions.len()