From 10634f8a05ba59a63f792c532ea7882de6a78068 Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Tue, 23 Jun 2026 11:24:11 +0200 Subject: [PATCH] Realize Kernel API in scope of Waku class --- logos_delivery/waku/waku.nim | 424 +++++++++++++++++++++++++++++++++++ 1 file changed, 424 insertions(+) diff --git a/logos_delivery/waku/waku.nim b/logos_delivery/waku/waku.nim index 963d3fcb7..0112d15d7 100644 --- a/logos_delivery/waku/waku.nim +++ b/logos_delivery/waku/waku.nim @@ -574,4 +574,428 @@ proc stop*(waku: Waku): Future[Result[void, string]] {.async: (raises: []).} = return ok() +## Kernel API realization +## +# --- topic construction --- +proc buildContentTopic*( + self: Waku, appName: string, appVersion: uint32, name: string, encoding: string +): Future[Result[ContentTopic, string]] {.async.} = + try: + return ok(ContentTopic(fmt"/{appName}/{appVersion}/{name}/{encoding}")) + except CatchableError as e: + return err(e.msg) + +proc buildPubsubTopic*( + self: Waku, topicName: string +): Future[Result[PubsubTopic, string]] {.async.} = + try: + return ok(PubsubTopic(fmt"/waku/2/{topicName}")) + except CatchableError as e: + return err(e.msg) + +proc defaultPubsubTopic*(self: Waku): Future[Result[PubsubTopic, string]] {.async.} = + return ok(DefaultPubsubTopic) + +# --- relay --- +proc relayPublish*( + self: Waku, pubsubTopic: PubsubTopic, message: WakuMessage, timeoutMs: uint32 +): Future[Result[int, string]] {.async.} = + try: + if self.node.wakuRelay.isNil(): + return err("relayPublish: WakuRelay not mounted") + + let numPeers = (await self.node.wakuRelay.publish(pubsubTopic, message)).valueOr: + return err($error) + + return ok(numPeers) + except CatchableError as e: + return err(e.msg) + +proc relaySubscribe*( + self: Waku, pubsubTopic: PubsubTopic +): Future[Result[bool, string]] {.async.} = + try: + if self.node.wakuRelay.isNil(): + return err("relaySubscribe: WakuRelay not mounted") + + let handler = proc(topic: PubsubTopic, msg: WakuMessage) {.async.} = + ## Bridge inbound relay traffic to the `ReceivedMessage` kernel event + ## (replaces libwaku's set_event_callback message path). + ReceivedMessage.emit( + self.brokerCtx, ReceivedMessage(pubsubTopic: topic, message: msg) + ) + + self.node.subscribe( + (kind: SubscriptionKind.PubsubSub, topic: pubsubTopic), WakuRelayHandler(handler) + ).isOkOr: + return err($error) + + return ok(true) + except CatchableError as e: + return err(e.msg) + +proc relayUnsubscribe*( + self: Waku, pubsubTopic: PubsubTopic +): Future[Result[bool, string]] {.async.} = + try: + if self.node.wakuRelay.isNil(): + return err("relayUnsubscribe: WakuRelay not mounted") + + self.node.unsubscribe((kind: SubscriptionKind.PubsubSub, topic: pubsubTopic)).isOkOr: + return err($error) + + return ok(true) + except CatchableError as e: + return err(e.msg) + +proc relayAddProtectedShard*( + self: Waku, clusterId: uint16, shardId: uint16, publicKey: string +): Future[Result[bool, string]] {.async.} = + try: + if self.node.wakuRelay.isNil(): + return err("relayAddProtectedShard: WakuRelay not mounted") + + let pubKey = SkPublicKey.fromHex(publicKey).valueOr: + return err("relayAddProtectedShard: invalid public key: " & $error) + + let protectedShard = ProtectedShard(shard: shardId, key: pubKey) + self.node.wakuRelay.addSignedShardsValidator(@[protectedShard], clusterId) + return ok(true) + except CatchableError as e: + return err(e.msg) + +proc relayConnectedPeers*( + self: Waku, pubsubTopic: PubsubTopic +): Future[Result[seq[string], string]] {.async.} = + try: + if self.node.wakuRelay.isNil(): + return err("relayConnectedPeers: WakuRelay not mounted") + + let connPeers = self.node.wakuRelay.getConnectedPeers(pubsubTopic).valueOr: + return err($error) + + return ok(connPeers.mapIt($it)) + except CatchableError as e: + return err(e.msg) + +proc relayPeersInMesh*( + self: Waku, pubsubTopic: PubsubTopic +): Future[Result[seq[string], string]] {.async.} = + try: + if self.node.wakuRelay.isNil(): + return err("relayPeersInMesh: WakuRelay not mounted") + + let meshPeers = self.node.wakuRelay.getPeersInMesh(pubsubTopic).valueOr: + return err($error) + + return ok(meshPeers.mapIt($it)) + except CatchableError as e: + return err(e.msg) + +# --- filter --- +proc filterSubscribe*( + self: Waku, + pubsubTopic: Option[PubsubTopic], + contentTopics: seq[ContentTopic], + peer: string, +): Future[Result[bool, string]] {.async.} = + try: + if self.node.wakuFilterClient.isNil(): + return err("wakuFilterClient is not mounted") + + let subFut = self.node.filterSubscribe(pubsubTopic, contentTopics, peer) + if not await subFut.withTimeout(FilterOpTimeout): + return err("filter subscription timed out") + subFut.read().isOkOr: + return err($error) + + return ok(true) + except CatchableError as e: + return err(e.msg) + +proc filterUnsubscribe*( + self: Waku, + pubsubTopic: Option[PubsubTopic], + contentTopics: seq[ContentTopic], + peer: string, +): Future[Result[bool, string]] {.async.} = + try: + if self.node.wakuFilterClient.isNil(): + return err("wakuFilterClient is not mounted") + + let unsubFut = self.node.filterUnsubscribe(pubsubTopic, contentTopics, peer) + if not await unsubFut.withTimeout(FilterOpTimeout): + return err("filter un-subscription timed out") + unsubFut.read().isOkOr: + return err($error) + + return ok(true) + except CatchableError as e: + return err(e.msg) + +proc filterUnsubscribeAll*( + self: Waku, peer: string +): Future[Result[bool, string]] {.async.} = + try: + if self.node.wakuFilterClient.isNil(): + return err("wakuFilterClient is not mounted") + + let unsubFut = self.node.filterUnsubscribeAll(peer) + if not await unsubFut.withTimeout(FilterOpTimeout): + return err("filter un-subscription all timed out") + unsubFut.read().isOkOr: + return err($error) + + return ok(true) + except CatchableError as e: + return err(e.msg) + +# --- lightpush --- +proc lightpushPublish*( + self: Waku, pubsubTopic: PubsubTopic, message: WakuMessage, peer: string +): Future[Result[string, string]] {.async.} = + try: + if self.node.wakuLegacyLightpushClient.isNil(): + return err("wakuLegacyLightpushClient is not mounted") + + let remotePeer = parsePeerInfo(peer).valueOr: + return err("lightpushPublish failed to parse peer addr: " & $error) + + let msgHashHex = ( + await self.node.wakuLegacyLightpushClient.publish( + pubsubTopic, message, remotePeer + ) + ).valueOr: + return err($error) + + return ok(msgHashHex) + except CatchableError as e: + return err(e.msg) + +# --- store --- +proc storeQuery*( + self: Waku, request: StoreQueryRequest, peer: string, timeoutMs: int +): Future[Result[StoreQueryResponse, string]] {.async.} = + try: + if self.node.wakuStoreClient.isNil(): + return err("wakuStoreClient is not mounted") + + let remotePeer = parsePeerInfo(peer).valueOr: + return err("storeQuery failed to parse peer addr: " & $error) + + let queryFut = self.node.wakuStoreClient.query(request, remotePeer) + if not await queryFut.withTimeout(timeoutMs.milliseconds): + return err("storeQuery timed out") + + let queryResponse = queryFut.read().valueOr: + return err("storeQuery failed: " & $error) + + return ok(queryResponse) + except CatchableError as e: + return err(e.msg) + +# --- peer management --- +proc connect*( + self: Waku, peers: seq[string], timeoutMs: uint32 +): Future[Result[bool, string]] {.async.} = + try: + await self.node.connectToNodes(peers.mapIt(strip(it)), source = "static") + return ok(true) + except CatchableError as e: + return err(e.msg) + +proc disconnectPeerById*( + self: Waku, peerId: string +): Future[Result[bool, string]] {.async.} = + try: + let pId = PeerId.init(peerId).valueOr: + return err($error) + await self.node.peerManager.disconnectNode(pId) + return ok(true) + except CatchableError as e: + return err(e.msg) + +proc disconnectAllPeers*(self: Waku): Future[Result[bool, string]] {.async.} = + try: + await self.node.peerManager.disconnectAllPeers() + return ok(true) + except CatchableError as e: + return err(e.msg) + +proc dialPeer*( + self: Waku, peerAddr: string, protocol: string, timeoutMs: int +): Future[Result[bool, string]] {.async.} = + try: + let remotePeerInfo = parsePeerInfo(peerAddr).valueOr: + return err($error) + let conn = await self.node.peerManager.dialPeer(remotePeerInfo, protocol) + if conn.isNone(): + return err("failed dialing peer") + return ok(true) + except CatchableError as e: + return err(e.msg) + +proc dialPeerById*( + self: Waku, peerId: string, protocol: string, timeoutMs: int +): Future[Result[bool, string]] {.async.} = + try: + let pId = PeerId.init(peerId).valueOr: + return err($error) + let conn = await self.node.peerManager.dialPeer(pId, protocol) + if conn.isNone(): + return err("failed dialing peer") + return ok(true) + except CatchableError as e: + return err(e.msg) + +proc peerIdsFromPeerstore*(self: Waku): Future[Result[seq[string], string]] {.async.} = + try: + return ok(self.node.peerManager.switch.peerStore.peers().mapIt($it.peerId)) + except CatchableError as e: + return err(e.msg) + +proc connectedPeersInfo*(self: Waku): Future[Result[seq[string], string]] {.async.} = + try: + return ok( + self.node.peerManager.switch.peerStore + .peers() + .filterIt(it.connectedness == Connected) + .mapIt($it.peerId) + ) + except CatchableError as e: + return err(e.msg) + +proc connectedPeers*(self: Waku): Future[Result[seq[string], string]] {.async.} = + try: + let (inPeerIds, outPeerIds) = self.node.peerManager.connectedPeers() + return ok(concat(inPeerIds, outPeerIds).mapIt($it)) + except CatchableError as e: + return err(e.msg) + +proc peerIdsByProtocol*( + self: Waku, protocol: string +): Future[Result[seq[string], string]] {.async.} = + try: + return ok( + self.node.peerManager.switch.peerStore + .peers(protocol) + .filterIt(it.connectedness == Connected) + .mapIt($it.peerId) + ) + except CatchableError as e: + return err(e.msg) + +# --- discovery --- +proc dnsDiscovery*( + self: Waku, enrTreeUrl: string, nameServer: string, timeoutMs: int +): Future[Result[seq[string], string]] {.async.} = + try: + let dnsNameServers = @[parseIpAddress(nameServer)] + let discoveredPeers = ( + await retrieveDynamicBootstrapNodes(enrTreeUrl, dnsNameServers) + ).valueOr: + return err("failed discovering peers from DNS: " & $error) + + var multiAddresses = newSeq[string]() + for discPeer in discoveredPeers: + for address in discPeer.addrs: + multiAddresses.add($address & "/p2p/" & $discPeer) + + return ok(multiAddresses) + except CatchableError as e: + return err(e.msg) + +proc discv5UpdateBootnodes*( + self: Waku, bootnodes: seq[string] +): Future[Result[bool, string]] {.async.} = + try: + if self.wakuDiscv5.isNil(): + return err("discv5 not started") + let jsonArray = "[" & bootnodes.mapIt("\"" & it & "\"").join(",") & "]" + self.wakuDiscv5.updateBootstrapRecords(jsonArray).isOkOr: + return err("error in discv5UpdateBootnodes: " & $error) + return ok(true) + except CatchableError as e: + return err(e.msg) + +proc startDiscv5*(self: Waku): Future[Result[bool, string]] {.async.} = + try: + if self.wakuDiscv5.isNil(): + return err("discv5 not started") + (await self.wakuDiscv5.start()).isOkOr: + return err("error starting discv5: " & $error) + return ok(true) + except CatchableError as e: + return err(e.msg) + +proc stopDiscv5*(self: Waku): Future[Result[bool, string]] {.async.} = + try: + if self.wakuDiscv5.isNil(): + return err("discv5 not started") + await self.wakuDiscv5.stop() + return ok(true) + except CatchableError as e: + return err(e.msg) + +proc peerExchangeRequest*( + self: Waku, numPeers: uint64 +): Future[Result[int, string]] {.async.} = + try: + let numPeersRecv = (await self.node.fetchPeerExchangePeers(numPeers)).valueOr: + return err("failed peer exchange: " & $error) + return ok(numPeersRecv) + except CatchableError as e: + return err(e.msg) + +# --- debug / info --- +proc version*(self: Waku): Future[Result[string, string]] {.async.} = + return ok(WakuNodeVersionString) + +proc listenAddresses*(self: Waku): Future[Result[seq[string], string]] {.async.} = + try: + return ok(self.node.info().listenAddresses) + except CatchableError as e: + return err(e.msg) + +proc myEnr*(self: Waku): Future[Result[string, string]] {.async.} = + try: + return ok(self.node.enr.toURI()) + except CatchableError as e: + return err(e.msg) + +proc myPeerId*(self: Waku): Future[Result[string, string]] {.async.} = + try: + return ok($self.node.peerId()) + except CatchableError as e: + return err(e.msg) + +proc metrics*(self: Waku): Future[Result[string, string]] {.async.} = + {.gcsafe.}: + try: + return ok(defaultRegistry.toText()) + except CatchableError as e: + return err(e.msg) + +proc isOnline*(self: Waku): Future[Result[bool, string]] {.async.} = + return ok(self.healthMonitor.onlineMonitor.amIOnline()) + +proc pingPeer*( + self: Waku, peerAddr: string, timeoutMs: int +): Future[Result[int64, string]] {.async.} = + try: + let peerInfo = parsePeerInfo(peerAddr).valueOr: + return err("pingPeer failed to parse peer addr: " & $error) + + let conn = await self.node.switch.dial(peerInfo.peerId, peerInfo.addrs, PingCodec) + defer: + await conn.close() + let pingRTT = await self.node.libp2pPing.ping(conn) + + if pingRTT == 0.nanos: + return err("could not ping peer: rtt-0") + + return ok(pingRTT.nanos) + except CatchableError as e: + return err(e.msg) + {.pop.}