diff --git a/logos_delivery/waku/api/debug.nim b/logos_delivery/waku/api/debug.nim new file mode 100644 index 000000000..107903752 --- /dev/null +++ b/logos_delivery/waku/api/debug.nim @@ -0,0 +1,24 @@ +## Waku layer API — debug / info getters (all synchronous). +{.push raises: [].} + +import metrics +import eth/p2p/discoveryv5/enr + +import logos_delivery/waku/waku +import logos_delivery/waku/node/waku_node + +proc version*(self: Waku): string = + return WakuNodeVersionString + +proc listenAddresses*(self: Waku): seq[string] = + return self.node.info().listenAddresses + +proc myEnr*(self: Waku): string = + return self.node.enr.toURI() + +proc myPeerId*(self: Waku): string = + return $self.node.peerId() + +proc metrics*(self: Waku): string = + {.gcsafe.}: + return defaultRegistry.toText() diff --git a/logos_delivery/waku/api/discovery.nim b/logos_delivery/waku/api/discovery.nim new file mode 100644 index 000000000..fefeb3d5d --- /dev/null +++ b/logos_delivery/waku/api/discovery.nim @@ -0,0 +1,76 @@ +## Waku layer API — discovery operations (DNS, discv5, peer exchange). +{.push raises: [].} + +import std/[net, sequtils] +import results, chronos, chronicles + +import logos_delivery/waku/waku +import + logos_delivery/waku/[ + waku_core, + node/waku_node, + node/waku_node/peer_exchange, + discovery/waku_dnsdisc, + discovery/waku_discv5, + ] + +proc dnsDiscovery*( + self: Waku, enrTreeUrl: string, nameServer: string, timeoutMs: int +): Future[Result[seq[string], string]] {.async.} = + try: + let dnsNameServers = @[parseIpAddress(nameServer)] + let discoveredPeers = ( + await retrieveDynamicBootstrapNodes(enrTreeUrl, dnsNameServers) + ).valueOr: + return err("failed discovering peers from DNS: " & $error) + + var multiAddresses = newSeq[string]() + for discPeer in discoveredPeers: + for address in discPeer.addrs: + multiAddresses.add($address & "/p2p/" & $discPeer) + + return ok(multiAddresses) + except CatchableError as e: + return err(e.msg) + +proc discv5UpdateBootnodes*( + self: Waku, bootnodes: string +): Future[Result[bool, string]] {.async.} = + ## `bootnodes` is a JSON array of ENRs, e.g. `["enr:...", "enr:..."]`. + try: + if self.wakuDiscv5.isNil(): + return err("discv5 not started") + self.wakuDiscv5.updateBootstrapRecords(bootnodes).isOkOr: + return err("error in discv5UpdateBootnodes: " & $error) + return ok(true) + except CatchableError as e: + return err(e.msg) + +proc startDiscv5*(self: Waku): Future[Result[bool, string]] {.async.} = + try: + if self.wakuDiscv5.isNil(): + return err("discv5 not started") + (await self.wakuDiscv5.start()).isOkOr: + return err("error starting discv5: " & $error) + return ok(true) + except CatchableError as e: + return err(e.msg) + +proc stopDiscv5*(self: Waku): Future[Result[bool, string]] {.async.} = + try: + if self.wakuDiscv5.isNil(): + return err("discv5 not started") + await self.wakuDiscv5.stop() + return ok(true) + except CatchableError as e: + return err(e.msg) + +proc peerExchangeRequest*( + self: Waku, numPeers: uint64 +): Future[Result[int, string]] {.async.} = + try: + let numPeersRecv = (await self.node.fetchPeerExchangePeers(numPeers)).valueOr: + return err("failed peer exchange: " & $error) + return ok(numPeersRecv) + except CatchableError as e: + return err(e.msg) diff --git a/logos_delivery/waku/events/discovery_events.nim b/logos_delivery/waku/api/events/discovery_events.nim similarity index 100% rename from logos_delivery/waku/events/discovery_events.nim rename to logos_delivery/waku/api/events/discovery_events.nim diff --git a/logos_delivery/waku/api/events/events.nim b/logos_delivery/waku/api/events/events.nim new file mode 100644 index 000000000..88b23234a --- /dev/null +++ b/logos_delivery/waku/api/events/events.nim @@ -0,0 +1,7 @@ +import + ./[ + message_events, filter_subscribe_events, health_events, peer_events, discovery_events, + ] + +export + message_events, filter_subscribe_events, health_events, peer_events, discovery_events diff --git a/logos_delivery/waku/events/delivery_events.nim b/logos_delivery/waku/api/events/filter_subscribe_events.nim similarity index 100% rename from logos_delivery/waku/events/delivery_events.nim rename to logos_delivery/waku/api/events/filter_subscribe_events.nim diff --git a/logos_delivery/waku/events/health_events.nim b/logos_delivery/waku/api/events/health_events.nim similarity index 100% rename from logos_delivery/waku/events/health_events.nim rename to logos_delivery/waku/api/events/health_events.nim diff --git a/logos_delivery/waku/api/events/message_events.nim b/logos_delivery/waku/api/events/message_events.nim new file mode 100644 index 000000000..638d4f38a --- /dev/null +++ b/logos_delivery/waku/api/events/message_events.nim @@ -0,0 +1,10 @@ +import brokers/event_broker +import logos_delivery/api/types +import logos_delivery/waku/[waku_core/message, waku_core/topics] +export event_broker, types + +EventBroker: + # Internal event emitted when a message arrives from the network via any protocol + type MessageSeenEvent* = object + topic*: PubsubTopic + message*: WakuMessage diff --git a/logos_delivery/waku/events/peer_events.nim b/logos_delivery/waku/api/events/peer_events.nim similarity index 100% rename from logos_delivery/waku/events/peer_events.nim rename to logos_delivery/waku/api/events/peer_events.nim diff --git a/logos_delivery/waku/api/health.nim b/logos_delivery/waku/api/health.nim new file mode 100644 index 000000000..2933c1397 --- /dev/null +++ b/logos_delivery/waku/api/health.nim @@ -0,0 +1,10 @@ +## Waku layer API — health / connectivity. +{.push raises: [].} + +import results, chronos, chronicles + +import logos_delivery/waku/waku +import logos_delivery/waku/[node/health_monitor, node/health_monitor/online_monitor] + +proc isOnline*(self: Waku): bool = + return self.healthMonitor.onlineMonitor.amIOnline() diff --git a/logos_delivery/waku/api/lightpush.nim b/logos_delivery/waku/api/lightpush.nim new file mode 100644 index 000000000..8c290f38b --- /dev/null +++ b/logos_delivery/waku/api/lightpush.nim @@ -0,0 +1,35 @@ +## Waku layer API — lightpush (light client publish) operations. +import logos_delivery/waku/compat/option_valueor +{.push raises: [].} + +import results, chronos, chronicles + +import logos_delivery/waku/waku +import + logos_delivery/waku/[ + waku_core, + waku_core/codecs, + node/waku_node, + node/peer_manager, + waku_lightpush_legacy/client, + ] + +proc lightpushPublish*( + self: Waku, pubsubTopic: PubsubTopic, message: WakuMessage +): Future[Result[string, string]] {.async.} = + ## Selects a lightpush service peer and publishes; returns the message hash. + try: + if self.node.wakuLegacyLightpushClient.isNil(): + return err("wakuLegacyLightpushClient is not mounted") + + let remotePeer = self.node.peerManager.selectPeer(WakuLightPushCodec).valueOr: + return err("failed to lightpublish message, no suitable remote peers") + + let msgHashHex = ( + await self.node.wakuLegacyLightpushClient.publish(pubsubTopic, message, remotePeer) + ).valueOr: + return err($error) + + return ok(msgHashHex) + except CatchableError as e: + return err(e.msg) diff --git a/logos_delivery/waku/api/ping.nim b/logos_delivery/waku/api/ping.nim new file mode 100644 index 000000000..9cf2dfc33 --- /dev/null +++ b/logos_delivery/waku/api/ping.nim @@ -0,0 +1,45 @@ +## Waku layer API — ping operation. +{.push raises: [].} + +import results, chronos, chronicles +import libp2p/protocols/ping +import libp2p/switch + +import logos_delivery/waku/waku +import logos_delivery/waku/[waku_core, node/waku_node, node/waku_node/ping] + +proc pingPeer*( + self: Waku, peerAddr: string, timeoutMs: int +): Future[Result[int64, string]] {.async.} = + ## Pings the peer; `timeoutMs <= 0` means no timeout. Returns RTT in nanos. + try: + let peerInfo = parsePeerInfo(peerAddr).valueOr: + return err("pingPeer failed to parse peer addr: " & $error) + + proc doPing(): Future[Result[Duration, string]] {.async.} = + try: + let conn = + await self.node.switch.dial(peerInfo.peerId, peerInfo.addrs, PingCodec) + defer: + await conn.close() + let rtt = await self.node.libp2pPing.ping(conn) + if rtt == 0.nanos: + return err("could not ping peer: rtt-0") + return ok(rtt) + except CatchableError as e: + return err("could not ping peer: " & e.msg) + + let pingFut = doPing() + let rtt: Duration = + if timeoutMs <= 0: + (await pingFut).valueOr: + return err(error) + else: + if not await pingFut.withTimeout(chronos.milliseconds(timeoutMs)): + return err("ping timed out") + pingFut.read().valueOr: + return err(error) + + return ok(rtt.nanos) + except CatchableError as e: + return err(e.msg) diff --git a/logos_delivery/waku/api/store.nim b/logos_delivery/waku/api/store.nim new file mode 100644 index 000000000..c526f91b1 --- /dev/null +++ b/logos_delivery/waku/api/store.nim @@ -0,0 +1,31 @@ +## Waku layer API — store (historical query) operations. +{.push raises: [].} + +import results, chronos, chronicles + +import logos_delivery/waku/waku +import + logos_delivery/waku/[ + waku_core, node/waku_node, waku_store/common, waku_store/client + ] + +proc storeQuery*( + self: Waku, request: StoreQueryRequest, peer: string, timeoutMs: int +): Future[Result[StoreQueryResponse, string]] {.async.} = + try: + if self.node.wakuStoreClient.isNil(): + return err("wakuStoreClient is not mounted") + + let remotePeer = parsePeerInfo(peer).valueOr: + return err("storeQuery failed to parse peer addr: " & $error) + + let queryFut = self.node.wakuStoreClient.query(request, remotePeer) + if not await queryFut.withTimeout(timeoutMs.milliseconds): + return err("storeQuery timed out") + + let queryResponse = queryFut.read().valueOr: + return err("storeQuery failed: " & $error) + + return ok(queryResponse) + except CatchableError as e: + return err(e.msg) diff --git a/logos_delivery/waku/api/topics.nim b/logos_delivery/waku/api/topics.nim new file mode 100644 index 000000000..c11586fca --- /dev/null +++ b/logos_delivery/waku/api/topics.nim @@ -0,0 +1,27 @@ +## Waku layer API — topic construction. +{.push raises: [].} + +import std/strformat +import results + +import logos_delivery/waku/waku +import logos_delivery/waku/waku_core + +proc buildContentTopic*( + self: Waku, appName: string, appVersion: uint32, name: string, encoding: string +): Result[ContentTopic, string] = + try: + return ok(ContentTopic(fmt"/{appName}/{appVersion}/{name}/{encoding}")) + except CatchableError as e: + return err(e.msg) + +proc buildPubsubTopic*( + self: Waku, topicName: string +): Result[PubsubTopic, string] = + try: + return ok(PubsubTopic(fmt"/waku/2/{topicName}")) + except CatchableError as e: + return err(e.msg) + +proc defaultPubsubTopic*(self: Waku): PubsubTopic = + return DefaultPubsubTopic diff --git a/logos_delivery/waku/events/events.nim b/logos_delivery/waku/events/events.nim deleted file mode 100644 index 130d7c018..000000000 --- a/logos_delivery/waku/events/events.nim +++ /dev/null @@ -1,9 +0,0 @@ -import - ./[ - message_events, delivery_events, health_events, peer_events, lifecycle_events, - discovery_events, - ] - -export - message_events, delivery_events, health_events, peer_events, lifecycle_events, - discovery_events