diff --git a/logos_delivery/waku/waku.nim b/logos_delivery/waku/waku.nim index 2c18c5f63..963d3fcb7 100644 --- a/logos_delivery/waku/waku.nim +++ b/logos_delivery/waku/waku.nim @@ -574,418 +574,4 @@ proc stop*(waku: Waku): Future[Result[void, string]] {.async: (raises: []).} = return ok() -## Kernel API realization -## -# --- topic construction --- -proc buildContentTopic*( - self: Waku, appName: string, appVersion: uint32, name: string, encoding: string -): Future[Result[ContentTopic, string]] {.async.} = - try: - return ok(ContentTopic(fmt"/{appName}/{appVersion}/{name}/{encoding}")) - except CatchableError as e: - return err(e.msg) - -proc buildPubsubTopic*( - self: Waku, topicName: string -): Future[Result[PubsubTopic, string]] {.async.} = - try: - return ok(PubsubTopic(fmt"/waku/2/{topicName}")) - except CatchableError as e: - return err(e.msg) - -proc defaultPubsubTopic*(self: Waku): Future[Result[PubsubTopic, string]] {.async.} = - return ok(DefaultPubsubTopic) - -# --- relay --- -proc relayPublish*( - self: Waku, pubsubTopic: PubsubTopic, message: WakuMessage, timeoutMs: uint32 -): Future[Result[int, string]] {.async.} = - try: - if self.node.wakuRelay.isNil(): - return err("relayPublish: WakuRelay not mounted") - - let numPeers = (await self.node.wakuRelay.publish(pubsubTopic, message)).valueOr: - return err($error) - - return ok(numPeers) - except CatchableError as e: - return err(e.msg) - -proc relaySubscribe*( - self: Waku, pubsubTopic: PubsubTopic -): Future[Result[bool, string]] {.async.} = - try: - if self.node.wakuRelay.isNil(): - return err("relaySubscribe: WakuRelay not mounted") - - self.node.subscribe( - (kind: SubscriptionKind.PubsubSub, topic: pubsubTopic), WakuRelayHandler(nil) - ).isOkOr: - return err($error) - - return ok(true) - except CatchableError as e: - return err(e.msg) - -proc relayUnsubscribe*( - self: Waku, pubsubTopic: PubsubTopic -): Future[Result[bool, string]] {.async.} = - try: - if self.node.wakuRelay.isNil(): - return err("relayUnsubscribe: WakuRelay not mounted") - - self.node.unsubscribe((kind: SubscriptionKind.PubsubSub, topic: pubsubTopic)).isOkOr: - return err($error) - - return ok(true) - except CatchableError as e: - return err(e.msg) - -proc relayAddProtectedShard*( - self: Waku, clusterId: uint16, shardId: uint16, publicKey: string -): Future[Result[bool, string]] {.async.} = - try: - if self.node.wakuRelay.isNil(): - return err("relayAddProtectedShard: WakuRelay not mounted") - - let pubKey = SkPublicKey.fromHex(publicKey).valueOr: - return err("relayAddProtectedShard: invalid public key: " & $error) - - let protectedShard = ProtectedShard(shard: shardId, key: pubKey) - self.node.wakuRelay.addSignedShardsValidator(@[protectedShard], clusterId) - return ok(true) - except CatchableError as e: - return err(e.msg) - -proc relayConnectedPeers*( - self: Waku, pubsubTopic: PubsubTopic -): Future[Result[seq[string], string]] {.async.} = - try: - if self.node.wakuRelay.isNil(): - return err("relayConnectedPeers: WakuRelay not mounted") - - let connPeers = self.node.wakuRelay.getConnectedPeers(pubsubTopic).valueOr: - return err($error) - - return ok(connPeers.mapIt($it)) - except CatchableError as e: - return err(e.msg) - -proc relayPeersInMesh*( - self: Waku, pubsubTopic: PubsubTopic -): Future[Result[seq[string], string]] {.async.} = - try: - if self.node.wakuRelay.isNil(): - return err("relayPeersInMesh: WakuRelay not mounted") - - let meshPeers = self.node.wakuRelay.getPeersInMesh(pubsubTopic).valueOr: - return err($error) - - return ok(meshPeers.mapIt($it)) - except CatchableError as e: - return err(e.msg) - -# --- filter --- -proc filterSubscribe*( - self: Waku, - pubsubTopic: Option[PubsubTopic], - contentTopics: seq[ContentTopic], - peer: string, -): Future[Result[bool, string]] {.async.} = - try: - if self.node.wakuFilterClient.isNil(): - return err("wakuFilterClient is not mounted") - - let subFut = self.node.filterSubscribe(pubsubTopic, contentTopics, peer) - if not await subFut.withTimeout(FilterOpTimeout): - return err("filter subscription timed out") - subFut.read().isOkOr: - return err($error) - - return ok(true) - except CatchableError as e: - return err(e.msg) - -proc filterUnsubscribe*( - self: Waku, - pubsubTopic: Option[PubsubTopic], - contentTopics: seq[ContentTopic], - peer: string, -): Future[Result[bool, string]] {.async.} = - try: - if self.node.wakuFilterClient.isNil(): - return err("wakuFilterClient is not mounted") - - let unsubFut = self.node.filterUnsubscribe(pubsubTopic, contentTopics, peer) - if not await unsubFut.withTimeout(FilterOpTimeout): - return err("filter un-subscription timed out") - unsubFut.read().isOkOr: - return err($error) - - return ok(true) - except CatchableError as e: - return err(e.msg) - -proc filterUnsubscribeAll*( - self: Waku, peer: string -): Future[Result[bool, string]] {.async.} = - try: - if self.node.wakuFilterClient.isNil(): - return err("wakuFilterClient is not mounted") - - let unsubFut = self.node.filterUnsubscribeAll(peer) - if not await unsubFut.withTimeout(FilterOpTimeout): - return err("filter un-subscription all timed out") - unsubFut.read().isOkOr: - return err($error) - - return ok(true) - except CatchableError as e: - return err(e.msg) - -# --- lightpush --- -proc lightpushPublish*( - self: Waku, pubsubTopic: PubsubTopic, message: WakuMessage, peer: string -): Future[Result[string, string]] {.async.} = - try: - if self.node.wakuLegacyLightpushClient.isNil(): - return err("wakuLegacyLightpushClient is not mounted") - - let remotePeer = parsePeerInfo(peer).valueOr: - return err("lightpushPublish failed to parse peer addr: " & $error) - - let msgHashHex = ( - await self.node.wakuLegacyLightpushClient.publish( - pubsubTopic, message, remotePeer - ) - ).valueOr: - return err($error) - - return ok(msgHashHex) - except CatchableError as e: - return err(e.msg) - -# --- store --- -proc storeQuery*( - self: Waku, request: StoreQueryRequest, peer: string, timeoutMs: int -): Future[Result[StoreQueryResponse, string]] {.async.} = - try: - if self.node.wakuStoreClient.isNil(): - return err("wakuStoreClient is not mounted") - - let remotePeer = parsePeerInfo(peer).valueOr: - return err("storeQuery failed to parse peer addr: " & $error) - - let queryFut = self.node.wakuStoreClient.query(request, remotePeer) - if not await queryFut.withTimeout(timeoutMs.milliseconds): - return err("storeQuery timed out") - - let queryResponse = queryFut.read().valueOr: - return err("storeQuery failed: " & $error) - - return ok(queryResponse) - except CatchableError as e: - return err(e.msg) - -# --- peer management --- -proc connect*( - self: Waku, peers: seq[string], timeoutMs: uint32 -): Future[Result[bool, string]] {.async.} = - try: - await self.node.connectToNodes(peers.mapIt(strip(it)), source = "static") - return ok(true) - except CatchableError as e: - return err(e.msg) - -proc disconnectPeerById*( - self: Waku, peerId: string -): Future[Result[bool, string]] {.async.} = - try: - let pId = PeerId.init(peerId).valueOr: - return err($error) - await self.node.peerManager.disconnectNode(pId) - return ok(true) - except CatchableError as e: - return err(e.msg) - -proc disconnectAllPeers*(self: Waku): Future[Result[bool, string]] {.async.} = - try: - await self.node.peerManager.disconnectAllPeers() - return ok(true) - except CatchableError as e: - return err(e.msg) - -proc dialPeer*( - self: Waku, peerAddr: string, protocol: string, timeoutMs: int -): Future[Result[bool, string]] {.async.} = - try: - let remotePeerInfo = parsePeerInfo(peerAddr).valueOr: - return err($error) - let conn = await self.node.peerManager.dialPeer(remotePeerInfo, protocol) - if conn.isNone(): - return err("failed dialing peer") - return ok(true) - except CatchableError as e: - return err(e.msg) - -proc dialPeerById*( - self: Waku, peerId: string, protocol: string, timeoutMs: int -): Future[Result[bool, string]] {.async.} = - try: - let pId = PeerId.init(peerId).valueOr: - return err($error) - let conn = await self.node.peerManager.dialPeer(pId, protocol) - if conn.isNone(): - return err("failed dialing peer") - return ok(true) - except CatchableError as e: - return err(e.msg) - -proc peerIdsFromPeerstore*(self: Waku): Future[Result[seq[string], string]] {.async.} = - try: - return ok(self.node.peerManager.switch.peerStore.peers().mapIt($it.peerId)) - except CatchableError as e: - return err(e.msg) - -proc connectedPeersInfo*(self: Waku): Future[Result[seq[string], string]] {.async.} = - try: - return ok( - self.node.peerManager.switch.peerStore - .peers() - .filterIt(it.connectedness == Connected) - .mapIt($it.peerId) - ) - except CatchableError as e: - return err(e.msg) - -proc connectedPeers*(self: Waku): Future[Result[seq[string], string]] {.async.} = - try: - let (inPeerIds, outPeerIds) = self.node.peerManager.connectedPeers() - return ok(concat(inPeerIds, outPeerIds).mapIt($it)) - except CatchableError as e: - return err(e.msg) - -proc peerIdsByProtocol*( - self: Waku, protocol: string -): Future[Result[seq[string], string]] {.async.} = - try: - return ok( - self.node.peerManager.switch.peerStore - .peers(protocol) - .filterIt(it.connectedness == Connected) - .mapIt($it.peerId) - ) - except CatchableError as e: - return err(e.msg) - -# --- discovery --- -proc dnsDiscovery*( - self: Waku, enrTreeUrl: string, nameServer: string, timeoutMs: int -): Future[Result[seq[string], string]] {.async.} = - try: - let dnsNameServers = @[parseIpAddress(nameServer)] - let discoveredPeers = ( - await retrieveDynamicBootstrapNodes(enrTreeUrl, dnsNameServers) - ).valueOr: - return err("failed discovering peers from DNS: " & $error) - - var multiAddresses = newSeq[string]() - for discPeer in discoveredPeers: - for address in discPeer.addrs: - multiAddresses.add($address & "/p2p/" & $discPeer) - - return ok(multiAddresses) - except CatchableError as e: - return err(e.msg) - -proc discv5UpdateBootnodes*( - self: Waku, bootnodes: seq[string] -): Future[Result[bool, string]] {.async.} = - try: - if self.wakuDiscv5.isNil(): - return err("discv5 not started") - let jsonArray = "[" & bootnodes.mapIt("\"" & it & "\"").join(",") & "]" - self.wakuDiscv5.updateBootstrapRecords(jsonArray).isOkOr: - return err("error in discv5UpdateBootnodes: " & $error) - return ok(true) - except CatchableError as e: - return err(e.msg) - -proc startDiscv5*(self: Waku): Future[Result[bool, string]] {.async.} = - try: - if self.wakuDiscv5.isNil(): - return err("discv5 not started") - (await self.wakuDiscv5.start()).isOkOr: - return err("error starting discv5: " & $error) - return ok(true) - except CatchableError as e: - return err(e.msg) - -proc stopDiscv5*(self: Waku): Future[Result[bool, string]] {.async.} = - try: - if self.wakuDiscv5.isNil(): - return err("discv5 not started") - await self.wakuDiscv5.stop() - return ok(true) - except CatchableError as e: - return err(e.msg) - -proc peerExchangeRequest*( - self: Waku, numPeers: uint64 -): Future[Result[int, string]] {.async.} = - try: - let numPeersRecv = (await self.node.fetchPeerExchangePeers(numPeers)).valueOr: - return err("failed peer exchange: " & $error) - return ok(numPeersRecv) - except CatchableError as e: - return err(e.msg) - -# --- debug / info --- -proc version*(self: Waku): Future[Result[string, string]] {.async.} = - return ok(WakuNodeVersionString) - -proc listenAddresses*(self: Waku): Future[Result[seq[string], string]] {.async.} = - try: - return ok(self.node.info().listenAddresses) - except CatchableError as e: - return err(e.msg) - -proc myEnr*(self: Waku): Future[Result[string, string]] {.async.} = - try: - return ok(self.node.enr.toURI()) - except CatchableError as e: - return err(e.msg) - -proc myPeerId*(self: Waku): Future[Result[string, string]] {.async.} = - try: - return ok($self.node.peerId()) - except CatchableError as e: - return err(e.msg) - -proc metrics*(self: Waku): Future[Result[string, string]] {.async.} = - {.gcsafe.}: - try: - return ok(defaultRegistry.toText()) - except CatchableError as e: - return err(e.msg) - -proc pingPeer*( - self: Waku, peerAddr: string, timeoutMs: int -): Future[Result[int64, string]] {.async.} = - try: - let peerInfo = parsePeerInfo(peerAddr).valueOr: - return err("pingPeer failed to parse peer addr: " & $error) - - let conn = await self.node.switch.dial(peerInfo.peerId, peerInfo.addrs, PingCodec) - defer: - await conn.close() - let pingRTT = await self.node.libp2pPing.ping(conn) - - if pingRTT == 0.nanos: - return err("could not ping peer: rtt-0") - - return ok(pingRTT.nanos) - except CatchableError as e: - return err(e.msg) - {.pop.}