diff --git a/tests/api/test_all.nim b/tests/api/test_all.nim index 57f7f37f2..4617c8cdb 100644 --- a/tests/api/test_all.nim +++ b/tests/api/test_all.nim @@ -1,3 +1,8 @@ {.used.} -import ./test_entry_nodes, ./test_node_conf, ./test_api_send, ./test_api_health +import + ./test_entry_nodes, + ./test_node_conf, + ./test_api_send, + ./test_api_subscription, + ./test_api_health diff --git a/tests/api/test_api_subscription.nim b/tests/api/test_api_subscription.nim new file mode 100644 index 000000000..42b3937fc --- /dev/null +++ b/tests/api/test_api_subscription.nim @@ -0,0 +1,109 @@ +{.used.} + +import std/strutils +import chronos, testutils/unittests, stew/byteutils +import ../testlib/[common, testasync] +import + waku, waku/[waku_node, waku_core, common/broker/broker_context, events/message_events] +import waku/api/api_conf, waku/factory/waku_conf + +type ReceiveEventListenerManager = ref object + brokerCtx: BrokerContext + receivedListener: MessageReceivedEventListener + receivedFuture: Future[void] + receivedMessages: seq[WakuMessage] + +proc newReceiveEventListenerManager( + brokerCtx: BrokerContext +): ReceiveEventListenerManager = + let manager = ReceiveEventListenerManager(brokerCtx: brokerCtx, receivedMessages: @[]) + manager.receivedFuture = newFuture[void]("receivedEvent") + + manager.receivedListener = MessageReceivedEvent.listen( + brokerCtx, + proc(event: MessageReceivedEvent) {.async: (raises: []).} = + manager.receivedMessages.add(event.message) + echo "RECEIVED EVENT TRIGGERED: contentTopic=", event.message.contentTopic + + if not manager.receivedFuture.finished(): + manager.receivedFuture.complete() + , + ).valueOr: + raiseAssert error + + return manager + +proc teardown(manager: ReceiveEventListenerManager) = + MessageReceivedEvent.dropListener(manager.brokerCtx, manager.receivedListener) + +proc waitForEvent( + manager: ReceiveEventListenerManager, timeout: Duration +): Future[bool] {.async.} = + return await manager.receivedFuture.withTimeout(timeout) + +proc createApiNodeConf(mode: WakuMode = WakuMode.Core): NodeConfig = + let netConf = NetworkingConfig(listenIpv4: "0.0.0.0", p2pTcpPort: 0, discv5UdpPort: 0) + result = NodeConfig.init( + mode = mode, + protocolsConfig = ProtocolsConfig.init( + entryNodes = @[], + clusterId = 1, + autoShardingConfig = AutoShardingConfig(numShardsInCluster: 1), + ), + networkingConfig = netConf, + p2pReliability = true, + ) + +suite "Waku API - Subscription Service": + asyncTest "Subscription API, two relays with subscribe and receive message": + var node1, node2: Waku + + lockNewGlobalBrokerContext: + node1 = (await createNode(createApiNodeConf())).valueOr: + raiseAssert error + (await startWaku(addr node1)).isOkOr: + raiseAssert "Failed to start node1" + + lockNewGlobalBrokerContext: + node2 = (await createNode(createApiNodeConf())).valueOr: + raiseAssert error + (await startWaku(addr node2)).isOkOr: + raiseAssert "Failed to start node2" + + let node2PeerInfo = node2.node.peerInfo.toRemotePeerInfo() + await node1.node.connectToNodes(@[node2PeerInfo]) + + await sleepAsync(2.seconds) + + let testTopic = ContentTopic("/waku/2/test-content/proto") + + (await node1.subscribe(testTopic)).isOkOr: + raiseAssert "Node1 failed to subscribe: " & error + + (await node2.subscribe(testTopic)).isOkOr: + raiseAssert "Node2 failed to subscribe: " & error + + await sleepAsync(2.seconds) + + let eventManager = newReceiveEventListenerManager(node2.brokerCtx) + defer: + eventManager.teardown() + + let envelope = MessageEnvelope.init(testTopic, "hello world payload") + let sendResult = await node1.send(envelope) + check sendResult.isOk() + + const eventTimeout = 5.seconds + let receivedInTime = await eventManager.waitForEvent(eventTimeout) + + check receivedInTime == true + check eventManager.receivedMessages.len == 1 + + let receivedMsg = eventManager.receivedMessages[0] + check receivedMsg.contentTopic == testTopic + check string.fromBytes(receivedMsg.payload) == "hello world payload" + + (await node1.stop()).isOkOr: + raiseAssert error + (await node2.stop()).isOkOr: + raiseAssert error diff --git a/waku/api/api.nim b/waku/api/api.nim index 3493513a3..6b2535d4d 100644 --- a/waku/api/api.nim +++ b/waku/api/api.nim @@ -48,6 +48,15 @@ proc send*( ): Future[Result[RequestId, string]] {.async.} = ?checkApiAvailability(w) + let isSubbed = w.deliveryService.subscriptionService + .isSubscribed(envelope.contentTopic) + .valueOr(false) + if not isSubbed: + info "Auto-subscribing to topic on send", contentTopic = envelope.contentTopic + let subRes = w.deliveryService.subscriptionService.subscribe(envelope.contentTopic) + if subRes.isErr(): + warn "Failed to auto-subscribe", error = subRes.error + let requestId = RequestId.new(w.rng) let deliveryTask = DeliveryTask.new(requestId, envelope, w.brokerCtx).valueOr: diff --git a/waku/node/delivery_service/subscription_service.nim b/waku/node/delivery_service/subscription_service.nim index 78763161b..6aa847ff9 100644 --- a/waku/node/delivery_service/subscription_service.nim +++ b/waku/node/delivery_service/subscription_service.nim @@ -1,64 +1,167 @@ -import chronos, chronicles +import std/[sets, tables, options, strutils], chronos, chronicles, results import waku/[ waku_core, waku_core/topics, + waku_core/topics/sharding, events/message_events, waku_node, + waku_relay, common/broker/broker_context, ] type SubscriptionService* = ref object of RootObj - brokerCtx: BrokerContext node: WakuNode + shardSubs: HashSet[PubsubTopic] + contentTopicSubs: Table[PubsubTopic, HashSet[ContentTopic]] + relayHandler: WakuRelayHandler proc new*(T: typedesc[SubscriptionService], node: WakuNode): T = - ## The storeClient will help to acquire any possible missed messages + let service = SubscriptionService( + node: node, + shardSubs: initHashSet[PubsubTopic](), + contentTopicSubs: initTable[PubsubTopic, HashSet[ContentTopic]](), + ) - return SubscriptionService(brokerCtx: node.brokerCtx, node: node) + service.relayHandler = proc( + topic: PubsubTopic, msg: WakuMessage + ): Future[void] {.async, gcsafe.} = + if not service.contentTopicSubs.hasKey(topic) or + not service.contentTopicSubs[topic].contains(msg.contentTopic): + return + + let msgHash = computeMessageHash(topic, msg).to0xHex() + info "MessageReceivedEvent", + pubsubTopic = topic, contentTopic = msg.contentTopic, msgHash = msgHash + + MessageReceivedEvent.emit(service.node.brokerCtx, msgHash, msg) + + return service + +proc getShardForContentTopic( + self: SubscriptionService, topic: ContentTopic +): Result[PubsubTopic, string] = + if self.node.wakuAutoSharding.isSome(): + let shardObj = ?self.node.wakuAutoSharding.get().getShard(topic) + return ok($shardObj) + + return + err("Manual sharding is not supported in this API. Autosharding must be enabled.") + +proc doSubscribe(self: SubscriptionService, shard: PubsubTopic): Result[void, string] = + self.node.subscribe((kind: PubsubSub, topic: shard), self.relayHandler).isOkOr: + error "Failed to subscribe to Relay shard", shard = shard, error = error + return err("Failed to subscribe: " & error) + return ok() + +proc doUnsubscribe( + self: SubscriptionService, shard: PubsubTopic +): Result[void, string] = + self.node.unsubscribe((kind: PubsubUnsub, topic: shard)).isOkOr: + error "Failed to unsubscribe from Relay shard", shard = shard, error = error + return err("Failed to unsubscribe: " & error) + return ok() proc isSubscribed*( self: SubscriptionService, topic: ContentTopic ): Result[bool, string] = - var isSubscribed = false - if self.node.wakuRelay.isNil() == false: - return self.node.isSubscribed((kind: ContentSub, topic: topic)) + if self.node.wakuRelay.isNil(): + return err("SubscriptionService currently only supports Relay (Core) mode.") - # TODO: Add support for edge mode with Filter subscription management - return ok(isSubscribed) + let shard = ?self.getShardForContentTopic(topic) -#TODO: later PR may consider to refactor or place this function elsewhere -# The only important part is that it emits MessageReceivedEvent -proc getReceiveHandler(self: SubscriptionService): WakuRelayHandler = - return proc(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} = - let msgHash = computeMessageHash(topic, msg).to0xHex() - info "API received message", - pubsubTopic = topic, contentTopic = msg.contentTopic, msgHash = msgHash + if self.contentTopicSubs.hasKey(shard) and self.contentTopicSubs[shard].contains( + topic + ): + return ok(true) - MessageReceivedEvent.emit(self.brokerCtx, msgHash, msg) + return ok(false) proc subscribe*(self: SubscriptionService, topic: ContentTopic): Result[void, string] = - let isSubscribed = self.isSubscribed(topic).valueOr: - error "Failed to check subscription status: ", error = error - return err("Failed to check subscription status: " & error) + if self.node.wakuRelay.isNil(): + return err("SubscriptionService currently only supports Relay (Core) mode.") - if isSubscribed == false: - if self.node.wakuRelay.isNil() == false: - self.node.subscribe((kind: ContentSub, topic: topic), self.getReceiveHandler()).isOkOr: - error "Failed to subscribe: ", error = error - return err("Failed to subscribe: " & error) + let shard = ?self.getShardForContentTopic(topic) - # TODO: Add support for edge mode with Filter subscription management + let needShardSub = + not self.shardSubs.contains(shard) and not self.contentTopicSubs.hasKey(shard) + + if needShardSub: + ?self.doSubscribe(shard) + + self.contentTopicSubs.mgetOrPut(shard, initHashSet[ContentTopic]()).incl(topic) return ok() proc unsubscribe*( self: SubscriptionService, topic: ContentTopic ): Result[void, string] = - if self.node.wakuRelay.isNil() == false: - self.node.unsubscribe((kind: ContentSub, topic: topic)).isOkOr: - error "Failed to unsubscribe: ", error = error - return err("Failed to unsubscribe: " & error) + if self.node.wakuRelay.isNil(): + return err("SubscriptionService currently only supports Relay (Core) mode.") + + let shard = ?self.getShardForContentTopic(topic) + + if self.contentTopicSubs.hasKey(shard) and self.contentTopicSubs[shard].contains( + topic + ): + let isLastTopic = self.contentTopicSubs[shard].len == 1 + let needShardUnsub = isLastTopic and not self.shardSubs.contains(shard) + + if needShardUnsub: + ?self.doUnsubscribe(shard) + + self.contentTopicSubs[shard].excl(topic) + if self.contentTopicSubs[shard].len == 0: + self.contentTopicSubs.del(shard) + + return ok() + +proc subscribeShard*( + self: SubscriptionService, shards: seq[PubsubTopic] +): Result[void, string] = + if self.node.wakuRelay.isNil(): + return err("SubscriptionService currently only supports Relay (Core) mode.") + + var errors: seq[string] = @[] + + for shard in shards: + if not self.shardSubs.contains(shard): + let needShardSub = not self.contentTopicSubs.hasKey(shard) + + if needShardSub: + let res = self.doSubscribe(shard) + if res.isErr(): + errors.add("Shard " & shard & " failed: " & res.error) + continue + + self.shardSubs.incl(shard) + + if errors.len > 0: + return err("Batch subscribe had errors: " & errors.join("; ")) + + return ok() + +proc unsubscribeShard*( + self: SubscriptionService, shards: seq[PubsubTopic] +): Result[void, string] = + if self.node.wakuRelay.isNil(): + return err("SubscriptionService currently only supports Relay (Core) mode.") + + var errors: seq[string] = @[] + + for shard in shards: + if self.shardSubs.contains(shard): + let needShardUnsub = not self.contentTopicSubs.hasKey(shard) + + if needShardUnsub: + let res = self.doUnsubscribe(shard) + if res.isErr(): + errors.add("Shard " & shard & " failed: " & res.error) + continue + + self.shardSubs.excl(shard) + + if errors.len > 0: + return err("Batch unsubscribe had errors: " & errors.join("; ")) - # TODO: Add support for edge mode with Filter subscription management return ok() diff --git a/waku/node/peer_manager/peer_manager.nim b/waku/node/peer_manager/peer_manager.nim index 0c435468f..6c49f46e0 100644 --- a/waku/node/peer_manager/peer_manager.nim +++ b/waku/node/peer_manager/peer_manager.nim @@ -131,6 +131,14 @@ proc protocolMatcher*(codec: string): Matcher = return match +proc peerSupportsShard*(peer: RemotePeerInfo, shardInfo: RelayShard): bool = + ## Returns true if the given peer has an ENR record with the given shard + ## or if it has the shard in its shards list (populated from metadata protocol). + + return + (peer.enr.isSome() and peer.enr.get().containsShard(shardInfo)) or + (peer.shards.len > 0 and peer.shards.contains(shardInfo.shardId)) + #~~~~~~~~~~~~~~~~~~~~~~~~~~# # Peer Storage Management # #~~~~~~~~~~~~~~~~~~~~~~~~~~# @@ -217,54 +225,60 @@ proc loadFromStorage(pm: PeerManager) {.gcsafe.} = trace "recovered peers from storage", amount = amount -proc selectPeer*( - pm: PeerManager, proto: string, shard: Option[PubsubTopic] = none(PubsubTopic) -): Option[RemotePeerInfo] = - # Selects the best peer for a given protocol +proc selectPeers*( + pm: PeerManager, + proto: string, + amount: int, + shard: Option[PubsubTopic] = none(PubsubTopic), +): seq[RemotePeerInfo] = + # Selects the best peers for a given protocol var peers = pm.switch.peerStore.getPeersByProtocol(proto) - trace "Selecting peer from peerstore", - protocol = proto, peers, address = cast[uint](pm.switch.peerStore) + trace "Selecting peers from peerstore", + amount = amount, protocol = proto, peers, address = cast[uint](pm.switch.peerStore) if shard.isSome(): # Parse the shard from the pubsub topic to get cluster and shard ID let shardInfo = RelayShard.parse(shard.get()).valueOr: trace "Failed to parse shard from pubsub topic", topic = shard.get() - return none(RemotePeerInfo) + return @[] # Filter peers that support the requested shard - # Check both ENR (if present) and the shards field on RemotePeerInfo - peers.keepItIf( - # Check ENR if available - (it.enr.isSome() and it.enr.get().containsShard(shard.get())) or - # Otherwise check the shards field directly - (it.shards.len > 0 and it.shards.contains(shardInfo.shardId)) - ) + peers.keepItIf(it.peerSupportsShard(shardInfo)) - shuffle(peers) + # For non relay protocols, we may select the peer that is slotted for the given protocol + if proto != WakuRelayCodec: + pm.serviceSlots.withValue(proto, serviceSlot): + trace "Got peer from service slots", + peerId = serviceSlot[].peerId, multi = serviceSlot[].addrs[0], protocol = proto + return @[serviceSlot[]] - # No criteria for selecting a peer for WakuRelay, random one - if proto == WakuRelayCodec: - # TODO: proper heuristic here that compares peer scores and selects "best" one. For now the first peer for the given protocol is returned - if peers.len > 0: - trace "Got peer from peerstore", - peerId = peers[0].peerId, multi = peers[0].addrs[0], protocol = proto - return some(peers[0]) - trace "No peer found for protocol", protocol = proto - return none(RemotePeerInfo) - - # For other protocols, we select the peer that is slotted for the given protocol - pm.serviceSlots.withValue(proto, serviceSlot): - trace "Got peer from service slots", - peerId = serviceSlot[].peerId, multi = serviceSlot[].addrs[0], protocol = proto - return some(serviceSlot[]) - - # If not slotted, we select a random peer for the given protocol + # If no slotted peer available, select random peers for the given protocol if peers.len > 0: - trace "Got peer from peerstore", - peerId = peers[0].peerId, multi = peers[0].addrs[0], protocol = proto - return some(peers[0]) + # TODO: proper heuristic here that compares peer scores and selects the best ones. + shuffle(peers) + let count = min(peers.len, amount) + let selected = peers[0 ..< count] + + for i, peer in selected: + trace "Selected peer from peerstore", + peerId = peer.peerId, + multi = peer.addrs[0], + protocol = proto, + num = $(i + 1) & "/" & $count + + return selected + trace "No peer found for protocol", protocol = proto + return @[] + +proc selectPeer*( + pm: PeerManager, proto: string, shard: Option[PubsubTopic] = none(PubsubTopic) +): Option[RemotePeerInfo] = + let peers = pm.selectPeers(proto, 1, shard) + if peers.len > 0: + return some(peers[0]) + return none(RemotePeerInfo) # Adds a peer to the service slots, which is a list of peers that are slotted for a given protocol