From d7264a798cd801607ac74a2358bf9a92f013522a Mon Sep 17 00:00:00 2001 From: Ivan FB Date: Thu, 25 Jun 2026 04:12:55 +0200 Subject: [PATCH] waku: extract relay/filter/peer_manager API into waku/api/ Move the relay, filter and peer-manager operations out of the monolithic waku/waku.nim into focused logos_delivery/waku/api/ modules over the Waku type, with the matching node-level touch-ups. Co-Authored-By: Claude Opus 4.8 --- logos_delivery/waku/api/filter.nim | 88 ++++ logos_delivery/waku/api/peer_manager.nim | 112 +++++ logos_delivery/waku/api/relay.nim | 134 ++++++ .../waku/discovery/waku_kademlia.nim | 2 +- logos_delivery/waku/factory/app_callbacks.nim | 4 + logos_delivery/waku/factory/node_factory.nim | 2 +- .../health_monitor/node_health_monitor.nim | 4 +- .../waku/node/peer_manager/peer_manager.nim | 2 +- .../waku/node/subscription_manager.nim | 6 +- logos_delivery/waku/node/waku_node.nim | 6 +- logos_delivery/waku/node/waku_node/relay.nim | 2 +- logos_delivery/waku/waku.nim | 415 ------------------ logos_delivery/waku/waku_filter_v2/client.nim | 2 +- logos_delivery/waku/waku_relay/protocol.nim | 4 +- tests/node/test_wakunode_health_monitor.nim | 4 +- 15 files changed, 355 insertions(+), 432 deletions(-) create mode 100644 logos_delivery/waku/api/filter.nim create mode 100644 logos_delivery/waku/api/peer_manager.nim create mode 100644 logos_delivery/waku/api/relay.nim diff --git a/logos_delivery/waku/api/filter.nim b/logos_delivery/waku/api/filter.nim new file mode 100644 index 000000000..06c78213f --- /dev/null +++ b/logos_delivery/waku/api/filter.nim @@ -0,0 +1,88 @@ +## Waku layer API — filter (light client) operations. +import logos_delivery/waku/compat/option_valueor +{.push raises: [].} + +import std/options +import results, chronos, chronicles + +import logos_delivery/waku/waku +import + logos_delivery/waku/[ + waku_core, + waku_core/subscription/push_handler, + node/waku_node, + node/waku_node/filter, + node/peer_manager, + waku_filter_v2/client, + waku_filter_v2/common, + ] + +const FilterOpTimeout = 5.seconds + +proc filterSubscribe*( + self: Waku, + pubsubTopic: PubsubTopic, + contentTopics: seq[ContentTopic], + pushHandler: FilterPushHandler, +): Future[Result[bool, string]] {.async.} = + ## Registers `pushHandler` for incoming filtered messages, selects a filter + ## service peer, and subscribes. + try: + if self.node.wakuFilterClient.isNil(): + return err("wakuFilterClient is not mounted") + + self.node.wakuFilterClient.registerPushHandler(pushHandler) + + let peer = self.node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr: + return err("could not find peer with WakuFilterSubscribeCodec when subscribing") + + let subFut = self.node.filterSubscribe(some(pubsubTopic), contentTopics, peer) + if not await subFut.withTimeout(FilterOpTimeout): + return err("filter subscription timed out") + subFut.read().isOkOr: + return err($error) + + return ok(true) + except CatchableError as e: + return err(e.msg) + +proc filterUnsubscribe*( + self: Waku, pubsubTopic: PubsubTopic, contentTopics: seq[ContentTopic] +): Future[Result[bool, string]] {.async.} = + ## Selects a filter service peer and unsubscribes the given content topics. + try: + if self.node.wakuFilterClient.isNil(): + return err("wakuFilterClient is not mounted") + + let peer = self.node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr: + return err("could not find peer with WakuFilterSubscribeCodec when unsubscribing") + + let unsubFut = self.node.filterUnsubscribe(some(pubsubTopic), contentTopics, peer) + if not await unsubFut.withTimeout(FilterOpTimeout): + return err("filter un-subscription timed out") + unsubFut.read().isOkOr: + return err($error) + + return ok(true) + except CatchableError as e: + return err(e.msg) + +proc filterUnsubscribeAll*(self: Waku): Future[Result[bool, string]] {.async.} = + ## Selects a filter service peer and unsubscribes from everything. + try: + if self.node.wakuFilterClient.isNil(): + return err("wakuFilterClient is not mounted") + + let peer = self.node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr: + return + err("could not find peer with WakuFilterSubscribeCodec when unsubscribing all") + + let unsubFut = self.node.filterUnsubscribeAll(peer) + if not await unsubFut.withTimeout(FilterOpTimeout): + return err("filter un-subscription all timed out") + unsubFut.read().isOkOr: + return err($error) + + return ok(true) + except CatchableError as e: + return err(e.msg) diff --git a/logos_delivery/waku/api/peer_manager.nim b/logos_delivery/waku/api/peer_manager.nim new file mode 100644 index 000000000..26de46594 --- /dev/null +++ b/logos_delivery/waku/api/peer_manager.nim @@ -0,0 +1,112 @@ +## Waku layer API — peer management operations. +{.push raises: [].} + +import std/[options, sequtils, strutils] +import results, chronos, chronicles +import libp2p/[peerid, peerstore] + +import logos_delivery/waku/waku +import logos_delivery/waku/[waku_core, node/waku_node, node/peer_manager] + +type PeerConnInfo* = object ## structured connected-peer info for the api boundary + peerId*: string + protocols*: seq[string] + addresses*: seq[string] + +proc connect*( + self: Waku, peers: seq[string], timeoutMs: uint32 +): Future[Result[bool, string]] {.async.} = + try: + await self.node.connectToNodes(peers.mapIt(strip(it)), source = "static") + return ok(true) + except CatchableError as e: + return err(e.msg) + +proc disconnectPeerById*( + self: Waku, peerId: string +): Future[Result[bool, string]] {.async.} = + try: + let pId = PeerId.init(peerId).valueOr: + return err($error) + await self.node.peerManager.disconnectNode(pId) + return ok(true) + except CatchableError as e: + return err(e.msg) + +proc disconnectAllPeers*(self: Waku): Future[Result[bool, string]] {.async.} = + try: + await self.node.peerManager.disconnectAllPeers() + return ok(true) + except CatchableError as e: + return err(e.msg) + +proc dialPeer*( + self: Waku, peerAddr: string, protocol: string, timeoutMs: int +): Future[Result[bool, string]] {.async.} = + try: + let remotePeerInfo = parsePeerInfo(peerAddr).valueOr: + return err($error) + let conn = await self.node.peerManager.dialPeer(remotePeerInfo, protocol) + if conn.isNone(): + return err("failed dialing peer") + return ok(true) + except CatchableError as e: + return err(e.msg) + +proc dialPeerById*( + self: Waku, peerId: string, protocol: string, timeoutMs: int +): Future[Result[bool, string]] {.async.} = + try: + let pId = PeerId.init(peerId).valueOr: + return err($error) + let conn = await self.node.peerManager.dialPeer(pId, protocol) + if conn.isNone(): + return err("failed dialing peer") + return ok(true) + except CatchableError as e: + return err(e.msg) + +proc peerIdsFromPeerstore*(self: Waku): Future[Result[seq[string], string]] {.async.} = + try: + return ok(self.node.peerManager.switch.peerStore.peers().mapIt($it.peerId)) + except CatchableError as e: + return err(e.msg) + +proc connectedPeersInfo*( + self: Waku +): Future[Result[seq[PeerConnInfo], string]] {.async.} = + ## Structured info (protocols, addresses) for every connected peer. + try: + var infos: seq[PeerConnInfo] + for peer in self.node.peerManager.switch.peerStore.peers(): + if peer.connectedness == Connected: + infos.add( + PeerConnInfo( + peerId: $peer.peerId, + protocols: peer.protocols, + addresses: peer.addrs.mapIt($it), + ) + ) + return ok(infos) + except CatchableError as e: + return err(e.msg) + +proc connectedPeers*(self: Waku): Future[Result[seq[string], string]] {.async.} = + try: + let (inPeerIds, outPeerIds) = self.node.peerManager.connectedPeers() + return ok(concat(inPeerIds, outPeerIds).mapIt($it)) + except CatchableError as e: + return err(e.msg) + +proc peerIdsByProtocol*( + self: Waku, protocol: string +): Future[Result[seq[string], string]] {.async.} = + try: + return ok( + self.node.peerManager.switch.peerStore + .peers(protocol) + .filterIt(it.connectedness == Connected) + .mapIt($it.peerId) + ) + except CatchableError as e: + return err(e.msg) diff --git a/logos_delivery/waku/api/relay.nim b/logos_delivery/waku/api/relay.nim new file mode 100644 index 000000000..b8345be83 --- /dev/null +++ b/logos_delivery/waku/api/relay.nim @@ -0,0 +1,134 @@ +## Waku layer API — relay (gossipsub) operations. +{.push raises: [].} + +import std/sequtils +import results, chronos, chronicles, secp256k1, stew/byteutils + +import logos_delivery/waku/waku +import + logos_delivery/waku/[ + waku_core, + node/waku_node, + node/waku_node/relay, + node/subscription_manager, + waku_relay/protocol, + factory/waku_conf, + factory/validator_signed, + ] + +proc relayPublish*( + self: Waku, pubsubTopic: PubsubTopic, message: WakuMessage, timeoutMs: uint32 +): Future[Result[string, string]] {.async.} = + ## Publishes `message` and returns its message hash (0x-hex). + try: + if self.node.wakuRelay.isNil(): + return err("relayPublish: WakuRelay not mounted") + + (await self.node.wakuRelay.publish(pubsubTopic, message)).isOkOr: + return err($error) + + return ok(computeMessageHash(pubsubTopic, message).to0xHex) + except CatchableError as e: + return err(e.msg) + +proc relaySubscribe*( + self: Waku, + pubsubTopic: PubsubTopic, + handler: WakuRelayHandler = WakuRelayHandler(nil), +): Future[Result[bool, string]] {.async.} = + ## Subscribes to `pubsubTopic`. `handler` (optional) is invoked per message; + ## pass nil to subscribe without a message callback. + try: + if self.node.wakuRelay.isNil(): + return err("relaySubscribe: WakuRelay not mounted") + + self.node.subscribe( + (kind: SubscriptionKind.PubsubSub, topic: pubsubTopic), handler + ).isOkOr: + return err($error) + + return ok(true) + except CatchableError as e: + return err(e.msg) + +proc relayUnsubscribe*( + self: Waku, pubsubTopic: PubsubTopic +): Future[Result[bool, string]] {.async.} = + try: + if self.node.wakuRelay.isNil(): + return err("relayUnsubscribe: WakuRelay not mounted") + + self.node.unsubscribe((kind: SubscriptionKind.PubsubSub, topic: pubsubTopic)).isOkOr: + return err($error) + + return ok(true) + except CatchableError as e: + return err(e.msg) + +proc relayAddProtectedShard*( + self: Waku, clusterId: uint16, shardId: uint16, publicKey: string +): Future[Result[bool, string]] {.async.} = + try: + if self.node.wakuRelay.isNil(): + return err("relayAddProtectedShard: WakuRelay not mounted") + + let pubKey = SkPublicKey.fromHex(publicKey).valueOr: + return err("relayAddProtectedShard: invalid public key: " & $error) + + let protectedShard = ProtectedShard(shard: shardId, key: pubKey) + self.node.wakuRelay.addSignedShardsValidator(@[protectedShard], clusterId) + return ok(true) + except CatchableError as e: + return err(e.msg) + +proc relayConnectedPeers*( + self: Waku, pubsubTopic: PubsubTopic +): Future[Result[seq[string], string]] {.async.} = + try: + if self.node.wakuRelay.isNil(): + return err("relayConnectedPeers: WakuRelay not mounted") + + let connPeers = self.node.wakuRelay.getConnectedPeers(pubsubTopic).valueOr: + return err($error) + + return ok(connPeers.mapIt($it)) + except CatchableError as e: + return err(e.msg) + +proc relayPeersInMesh*( + self: Waku, pubsubTopic: PubsubTopic +): Future[Result[seq[string], string]] {.async.} = + try: + if self.node.wakuRelay.isNil(): + return err("relayPeersInMesh: WakuRelay not mounted") + + let meshPeers = self.node.wakuRelay.getPeersInMesh(pubsubTopic).valueOr: + return err($error) + + return ok(meshPeers.mapIt($it)) + except CatchableError as e: + return err(e.msg) + +proc relayNumPeersInMesh*( + self: Waku, pubsubTopic: PubsubTopic +): Future[Result[int, string]] {.async.} = + try: + if self.node.wakuRelay.isNil(): + return err("relayNumPeersInMesh: WakuRelay not mounted") + let n = self.node.wakuRelay.getNumPeersInMesh(pubsubTopic).valueOr: + return err($error) + return ok(n) + except CatchableError as e: + return err(e.msg) + +proc relayNumConnectedPeers*( + self: Waku, pubsubTopic: PubsubTopic +): Future[Result[int, string]] {.async.} = + try: + if self.node.wakuRelay.isNil(): + return err("relayNumConnectedPeers: WakuRelay not mounted") + let n = self.node.wakuRelay.getNumConnectedPeers(pubsubTopic).valueOr: + return err($error) + return ok(n) + except CatchableError as e: + return err(e.msg) diff --git a/logos_delivery/waku/discovery/waku_kademlia.nim b/logos_delivery/waku/discovery/waku_kademlia.nim index 9b0e43abc..e6775530c 100644 --- a/logos_delivery/waku/discovery/waku_kademlia.nim +++ b/logos_delivery/waku/discovery/waku_kademlia.nim @@ -21,7 +21,7 @@ import import logos_delivery/waku/waku_core, logos_delivery/waku/node/peer_manager, - logos_delivery/waku/events/discovery_events + logos_delivery/waku/api/events/discovery_events logScope: topics = "waku service discovery" diff --git a/logos_delivery/waku/factory/app_callbacks.nim b/logos_delivery/waku/factory/app_callbacks.nim index f1d3369be..0945b56bf 100644 --- a/logos_delivery/waku/factory/app_callbacks.nim +++ b/logos_delivery/waku/factory/app_callbacks.nim @@ -1,5 +1,9 @@ import ../waku_relay, ../node/peer_manager, ../node/health_monitor/connection_status +# Re-export the modules that define the handler types below, so that consumers +# of `AppCallbacks` (e.g. the FFI library) can construct the handlers. +export waku_relay, peer_manager, connection_status + type AppCallbacks* = ref object relayHandler*: WakuRelayHandler topicHealthChangeHandler*: TopicHealthChangeHandler diff --git a/logos_delivery/waku/factory/node_factory.nim b/logos_delivery/waku/factory/node_factory.nim index 30e37850a..c4f5ab32d 100644 --- a/logos_delivery/waku/factory/node_factory.nim +++ b/logos_delivery/waku/factory/node_factory.nim @@ -39,7 +39,7 @@ import ../node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations, ../waku_lightpush_legacy/common, ../common/rate_limit/setting, - ../events/discovery_events + ../api/events/discovery_events ## Peer persistence diff --git a/logos_delivery/waku/node/health_monitor/node_health_monitor.nim b/logos_delivery/waku/node/health_monitor/node_health_monitor.nim index 465598794..d55499d1a 100644 --- a/logos_delivery/waku/node/health_monitor/node_health_monitor.nim +++ b/logos_delivery/waku/node/health_monitor/node_health_monitor.nim @@ -12,8 +12,8 @@ import logos_delivery/waku/[ waku_relay, waku_rln_relay, - events/health_events, - events/peer_events, + api/events/health_events, + api/events/peer_events, node/waku_node, node/node_telemetry, node/peer_manager, diff --git a/logos_delivery/waku/node/peer_manager/peer_manager.nim b/logos_delivery/waku/node/peer_manager/peer_manager.nim index 60bd2acb1..31c6133ab 100644 --- a/logos_delivery/waku/node/peer_manager/peer_manager.nim +++ b/logos_delivery/waku/node/peer_manager/peer_manager.nim @@ -20,7 +20,7 @@ import waku_relay/protocol, waku_enr/sharding, waku_enr/capabilities, - events/peer_events, + api/events/peer_events, common/nimchronos, common/enr, common/callbacks, diff --git a/logos_delivery/waku/node/subscription_manager.nim b/logos_delivery/waku/node/subscription_manager.nim index 15b582ea6..69f67ab3b 100644 --- a/logos_delivery/waku/node/subscription_manager.nim +++ b/logos_delivery/waku/node/subscription_manager.nim @@ -15,9 +15,9 @@ import waku_filter_v2/common as filter_common, waku_filter_v2/client as filter_client, waku_filter_v2/protocol as filter_protocol, - events/health_events, - events/message_events, - events/peer_events, + api/events/health_events, + api/events/message_events, + api/events/peer_events, requests/health_requests, node/peer_manager, node/health_monitor/topic_health, diff --git a/logos_delivery/waku/node/waku_node.nim b/logos_delivery/waku/node/waku_node.nim index 2ad7dc601..a329226cc 100644 --- a/logos_delivery/waku/node/waku_node.nim +++ b/logos_delivery/waku/node/waku_node.nim @@ -59,9 +59,9 @@ import waku_mix, requests/node_requests, requests/health_requests, - events/health_events, - events/message_events, - events/peer_events, + api/events/health_events, + api/events/message_events, + api/events/peer_events, ], logos_delivery/waku/discovery/waku_kademlia, logos_delivery/waku/net/[bound_ports, net_config], diff --git a/logos_delivery/waku/node/waku_node/relay.nim b/logos_delivery/waku/node/waku_node/relay.nim index 57904dc94..f2a2772e0 100644 --- a/logos_delivery/waku/node/waku_node/relay.nim +++ b/logos_delivery/waku/node/waku_node/relay.nim @@ -32,7 +32,7 @@ import node/waku_node, node/subscription_manager, node/peer_manager, - events/message_events, + api/events/message_events, ] export waku_relay.WakuRelayHandler diff --git a/logos_delivery/waku/waku.nim b/logos_delivery/waku/waku.nim index 2c18c5f63..34e0a071e 100644 --- a/logos_delivery/waku/waku.nim +++ b/logos_delivery/waku/waku.nim @@ -63,7 +63,6 @@ logScope: # Git version in git describe format (defined at compile time) const git_version* {.strdefine.} = "n/a" -const FilterOpTimeout = 5.seconds type Waku* = ref object stateInfo*: WakuStateInfo @@ -574,418 +573,4 @@ proc stop*(waku: Waku): Future[Result[void, string]] {.async: (raises: []).} = return ok() -## Kernel API realization -## -# --- topic construction --- -proc buildContentTopic*( - self: Waku, appName: string, appVersion: uint32, name: string, encoding: string -): Future[Result[ContentTopic, string]] {.async.} = - try: - return ok(ContentTopic(fmt"/{appName}/{appVersion}/{name}/{encoding}")) - except CatchableError as e: - return err(e.msg) - -proc buildPubsubTopic*( - self: Waku, topicName: string -): Future[Result[PubsubTopic, string]] {.async.} = - try: - return ok(PubsubTopic(fmt"/waku/2/{topicName}")) - except CatchableError as e: - return err(e.msg) - -proc defaultPubsubTopic*(self: Waku): Future[Result[PubsubTopic, string]] {.async.} = - return ok(DefaultPubsubTopic) - -# --- relay --- -proc relayPublish*( - self: Waku, pubsubTopic: PubsubTopic, message: WakuMessage, timeoutMs: uint32 -): Future[Result[int, string]] {.async.} = - try: - if self.node.wakuRelay.isNil(): - return err("relayPublish: WakuRelay not mounted") - - let numPeers = (await self.node.wakuRelay.publish(pubsubTopic, message)).valueOr: - return err($error) - - return ok(numPeers) - except CatchableError as e: - return err(e.msg) - -proc relaySubscribe*( - self: Waku, pubsubTopic: PubsubTopic -): Future[Result[bool, string]] {.async.} = - try: - if self.node.wakuRelay.isNil(): - return err("relaySubscribe: WakuRelay not mounted") - - self.node.subscribe( - (kind: SubscriptionKind.PubsubSub, topic: pubsubTopic), WakuRelayHandler(nil) - ).isOkOr: - return err($error) - - return ok(true) - except CatchableError as e: - return err(e.msg) - -proc relayUnsubscribe*( - self: Waku, pubsubTopic: PubsubTopic -): Future[Result[bool, string]] {.async.} = - try: - if self.node.wakuRelay.isNil(): - return err("relayUnsubscribe: WakuRelay not mounted") - - self.node.unsubscribe((kind: SubscriptionKind.PubsubSub, topic: pubsubTopic)).isOkOr: - return err($error) - - return ok(true) - except CatchableError as e: - return err(e.msg) - -proc relayAddProtectedShard*( - self: Waku, clusterId: uint16, shardId: uint16, publicKey: string -): Future[Result[bool, string]] {.async.} = - try: - if self.node.wakuRelay.isNil(): - return err("relayAddProtectedShard: WakuRelay not mounted") - - let pubKey = SkPublicKey.fromHex(publicKey).valueOr: - return err("relayAddProtectedShard: invalid public key: " & $error) - - let protectedShard = ProtectedShard(shard: shardId, key: pubKey) - self.node.wakuRelay.addSignedShardsValidator(@[protectedShard], clusterId) - return ok(true) - except CatchableError as e: - return err(e.msg) - -proc relayConnectedPeers*( - self: Waku, pubsubTopic: PubsubTopic -): Future[Result[seq[string], string]] {.async.} = - try: - if self.node.wakuRelay.isNil(): - return err("relayConnectedPeers: WakuRelay not mounted") - - let connPeers = self.node.wakuRelay.getConnectedPeers(pubsubTopic).valueOr: - return err($error) - - return ok(connPeers.mapIt($it)) - except CatchableError as e: - return err(e.msg) - -proc relayPeersInMesh*( - self: Waku, pubsubTopic: PubsubTopic -): Future[Result[seq[string], string]] {.async.} = - try: - if self.node.wakuRelay.isNil(): - return err("relayPeersInMesh: WakuRelay not mounted") - - let meshPeers = self.node.wakuRelay.getPeersInMesh(pubsubTopic).valueOr: - return err($error) - - return ok(meshPeers.mapIt($it)) - except CatchableError as e: - return err(e.msg) - -# --- filter --- -proc filterSubscribe*( - self: Waku, - pubsubTopic: Option[PubsubTopic], - contentTopics: seq[ContentTopic], - peer: string, -): Future[Result[bool, string]] {.async.} = - try: - if self.node.wakuFilterClient.isNil(): - return err("wakuFilterClient is not mounted") - - let subFut = self.node.filterSubscribe(pubsubTopic, contentTopics, peer) - if not await subFut.withTimeout(FilterOpTimeout): - return err("filter subscription timed out") - subFut.read().isOkOr: - return err($error) - - return ok(true) - except CatchableError as e: - return err(e.msg) - -proc filterUnsubscribe*( - self: Waku, - pubsubTopic: Option[PubsubTopic], - contentTopics: seq[ContentTopic], - peer: string, -): Future[Result[bool, string]] {.async.} = - try: - if self.node.wakuFilterClient.isNil(): - return err("wakuFilterClient is not mounted") - - let unsubFut = self.node.filterUnsubscribe(pubsubTopic, contentTopics, peer) - if not await unsubFut.withTimeout(FilterOpTimeout): - return err("filter un-subscription timed out") - unsubFut.read().isOkOr: - return err($error) - - return ok(true) - except CatchableError as e: - return err(e.msg) - -proc filterUnsubscribeAll*( - self: Waku, peer: string -): Future[Result[bool, string]] {.async.} = - try: - if self.node.wakuFilterClient.isNil(): - return err("wakuFilterClient is not mounted") - - let unsubFut = self.node.filterUnsubscribeAll(peer) - if not await unsubFut.withTimeout(FilterOpTimeout): - return err("filter un-subscription all timed out") - unsubFut.read().isOkOr: - return err($error) - - return ok(true) - except CatchableError as e: - return err(e.msg) - -# --- lightpush --- -proc lightpushPublish*( - self: Waku, pubsubTopic: PubsubTopic, message: WakuMessage, peer: string -): Future[Result[string, string]] {.async.} = - try: - if self.node.wakuLegacyLightpushClient.isNil(): - return err("wakuLegacyLightpushClient is not mounted") - - let remotePeer = parsePeerInfo(peer).valueOr: - return err("lightpushPublish failed to parse peer addr: " & $error) - - let msgHashHex = ( - await self.node.wakuLegacyLightpushClient.publish( - pubsubTopic, message, remotePeer - ) - ).valueOr: - return err($error) - - return ok(msgHashHex) - except CatchableError as e: - return err(e.msg) - -# --- store --- -proc storeQuery*( - self: Waku, request: StoreQueryRequest, peer: string, timeoutMs: int -): Future[Result[StoreQueryResponse, string]] {.async.} = - try: - if self.node.wakuStoreClient.isNil(): - return err("wakuStoreClient is not mounted") - - let remotePeer = parsePeerInfo(peer).valueOr: - return err("storeQuery failed to parse peer addr: " & $error) - - let queryFut = self.node.wakuStoreClient.query(request, remotePeer) - if not await queryFut.withTimeout(timeoutMs.milliseconds): - return err("storeQuery timed out") - - let queryResponse = queryFut.read().valueOr: - return err("storeQuery failed: " & $error) - - return ok(queryResponse) - except CatchableError as e: - return err(e.msg) - -# --- peer management --- -proc connect*( - self: Waku, peers: seq[string], timeoutMs: uint32 -): Future[Result[bool, string]] {.async.} = - try: - await self.node.connectToNodes(peers.mapIt(strip(it)), source = "static") - return ok(true) - except CatchableError as e: - return err(e.msg) - -proc disconnectPeerById*( - self: Waku, peerId: string -): Future[Result[bool, string]] {.async.} = - try: - let pId = PeerId.init(peerId).valueOr: - return err($error) - await self.node.peerManager.disconnectNode(pId) - return ok(true) - except CatchableError as e: - return err(e.msg) - -proc disconnectAllPeers*(self: Waku): Future[Result[bool, string]] {.async.} = - try: - await self.node.peerManager.disconnectAllPeers() - return ok(true) - except CatchableError as e: - return err(e.msg) - -proc dialPeer*( - self: Waku, peerAddr: string, protocol: string, timeoutMs: int -): Future[Result[bool, string]] {.async.} = - try: - let remotePeerInfo = parsePeerInfo(peerAddr).valueOr: - return err($error) - let conn = await self.node.peerManager.dialPeer(remotePeerInfo, protocol) - if conn.isNone(): - return err("failed dialing peer") - return ok(true) - except CatchableError as e: - return err(e.msg) - -proc dialPeerById*( - self: Waku, peerId: string, protocol: string, timeoutMs: int -): Future[Result[bool, string]] {.async.} = - try: - let pId = PeerId.init(peerId).valueOr: - return err($error) - let conn = await self.node.peerManager.dialPeer(pId, protocol) - if conn.isNone(): - return err("failed dialing peer") - return ok(true) - except CatchableError as e: - return err(e.msg) - -proc peerIdsFromPeerstore*(self: Waku): Future[Result[seq[string], string]] {.async.} = - try: - return ok(self.node.peerManager.switch.peerStore.peers().mapIt($it.peerId)) - except CatchableError as e: - return err(e.msg) - -proc connectedPeersInfo*(self: Waku): Future[Result[seq[string], string]] {.async.} = - try: - return ok( - self.node.peerManager.switch.peerStore - .peers() - .filterIt(it.connectedness == Connected) - .mapIt($it.peerId) - ) - except CatchableError as e: - return err(e.msg) - -proc connectedPeers*(self: Waku): Future[Result[seq[string], string]] {.async.} = - try: - let (inPeerIds, outPeerIds) = self.node.peerManager.connectedPeers() - return ok(concat(inPeerIds, outPeerIds).mapIt($it)) - except CatchableError as e: - return err(e.msg) - -proc peerIdsByProtocol*( - self: Waku, protocol: string -): Future[Result[seq[string], string]] {.async.} = - try: - return ok( - self.node.peerManager.switch.peerStore - .peers(protocol) - .filterIt(it.connectedness == Connected) - .mapIt($it.peerId) - ) - except CatchableError as e: - return err(e.msg) - -# --- discovery --- -proc dnsDiscovery*( - self: Waku, enrTreeUrl: string, nameServer: string, timeoutMs: int -): Future[Result[seq[string], string]] {.async.} = - try: - let dnsNameServers = @[parseIpAddress(nameServer)] - let discoveredPeers = ( - await retrieveDynamicBootstrapNodes(enrTreeUrl, dnsNameServers) - ).valueOr: - return err("failed discovering peers from DNS: " & $error) - - var multiAddresses = newSeq[string]() - for discPeer in discoveredPeers: - for address in discPeer.addrs: - multiAddresses.add($address & "/p2p/" & $discPeer) - - return ok(multiAddresses) - except CatchableError as e: - return err(e.msg) - -proc discv5UpdateBootnodes*( - self: Waku, bootnodes: seq[string] -): Future[Result[bool, string]] {.async.} = - try: - if self.wakuDiscv5.isNil(): - return err("discv5 not started") - let jsonArray = "[" & bootnodes.mapIt("\"" & it & "\"").join(",") & "]" - self.wakuDiscv5.updateBootstrapRecords(jsonArray).isOkOr: - return err("error in discv5UpdateBootnodes: " & $error) - return ok(true) - except CatchableError as e: - return err(e.msg) - -proc startDiscv5*(self: Waku): Future[Result[bool, string]] {.async.} = - try: - if self.wakuDiscv5.isNil(): - return err("discv5 not started") - (await self.wakuDiscv5.start()).isOkOr: - return err("error starting discv5: " & $error) - return ok(true) - except CatchableError as e: - return err(e.msg) - -proc stopDiscv5*(self: Waku): Future[Result[bool, string]] {.async.} = - try: - if self.wakuDiscv5.isNil(): - return err("discv5 not started") - await self.wakuDiscv5.stop() - return ok(true) - except CatchableError as e: - return err(e.msg) - -proc peerExchangeRequest*( - self: Waku, numPeers: uint64 -): Future[Result[int, string]] {.async.} = - try: - let numPeersRecv = (await self.node.fetchPeerExchangePeers(numPeers)).valueOr: - return err("failed peer exchange: " & $error) - return ok(numPeersRecv) - except CatchableError as e: - return err(e.msg) - -# --- debug / info --- -proc version*(self: Waku): Future[Result[string, string]] {.async.} = - return ok(WakuNodeVersionString) - -proc listenAddresses*(self: Waku): Future[Result[seq[string], string]] {.async.} = - try: - return ok(self.node.info().listenAddresses) - except CatchableError as e: - return err(e.msg) - -proc myEnr*(self: Waku): Future[Result[string, string]] {.async.} = - try: - return ok(self.node.enr.toURI()) - except CatchableError as e: - return err(e.msg) - -proc myPeerId*(self: Waku): Future[Result[string, string]] {.async.} = - try: - return ok($self.node.peerId()) - except CatchableError as e: - return err(e.msg) - -proc metrics*(self: Waku): Future[Result[string, string]] {.async.} = - {.gcsafe.}: - try: - return ok(defaultRegistry.toText()) - except CatchableError as e: - return err(e.msg) - -proc pingPeer*( - self: Waku, peerAddr: string, timeoutMs: int -): Future[Result[int64, string]] {.async.} = - try: - let peerInfo = parsePeerInfo(peerAddr).valueOr: - return err("pingPeer failed to parse peer addr: " & $error) - - let conn = await self.node.switch.dial(peerInfo.peerId, peerInfo.addrs, PingCodec) - defer: - await conn.close() - let pingRTT = await self.node.libp2pPing.ping(conn) - - if pingRTT == 0.nanos: - return err("could not ping peer: rtt-0") - - return ok(pingRTT.nanos) - except CatchableError as e: - return err(e.msg) - {.pop.} diff --git a/logos_delivery/waku/waku_filter_v2/client.nim b/logos_delivery/waku/waku_filter_v2/client.nim index ddae363a3..51ed18af3 100644 --- a/logos_delivery/waku/waku_filter_v2/client.nim +++ b/logos_delivery/waku/waku_filter_v2/client.nim @@ -14,7 +14,7 @@ import brokers/broker_context import - logos_delivery/waku/[node/peer_manager, waku_core, events/delivery_events], + logos_delivery/waku/[node/peer_manager, waku_core, api/events/filter_subscribe_events], ./common, ./protocol_metrics, ./rpc_codec, diff --git a/logos_delivery/waku/waku_relay/protocol.nim b/logos_delivery/waku/waku_relay/protocol.nim index e677ec5a0..409b02929 100644 --- a/logos_delivery/waku/waku_relay/protocol.nim +++ b/logos_delivery/waku/waku_relay/protocol.nim @@ -24,9 +24,9 @@ import logos_delivery/waku/waku_core, logos_delivery/waku/node/health_monitor/topic_health, logos_delivery/waku/requests/health_requests, - logos_delivery/waku/events/health_events, + logos_delivery/waku/api/events/health_events, ./message_id, - logos_delivery/waku/events/peer_events + logos_delivery/waku/api/events/peer_events from logos_delivery/waku/waku_core/codecs import WakuRelayCodec export WakuRelayCodec diff --git a/tests/node/test_wakunode_health_monitor.nim b/tests/node/test_wakunode_health_monitor.nim index 04a39455a..acf0bc09a 100644 --- a/tests/node/test_wakunode_health_monitor.nim +++ b/tests/node/test_wakunode_health_monitor.nim @@ -19,8 +19,8 @@ import node/waku_node/store, node/waku_node/lightpush, node/waku_node/filter, - events/health_events, - events/peer_events, + api/events/health_events, + api/events/peer_events, waku_archive, ]