waku: extract node/query API into waku/api/ and move events

Move the discovery/lightpush/store/ping/debug/health/topics operations into
logos_delivery/waku/api/ and relocate the waku event definitions under
waku/api/events/.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Ivan FB 2026-06-25 04:15:23 +02:00
parent d7264a798c
commit 8c274be44d
No known key found for this signature in database
GPG Key ID: DF0C67A04C543270
14 changed files with 265 additions and 9 deletions

View File

@ -0,0 +1,24 @@
## Waku layer API — debug / info getters (all synchronous).
{.push raises: [].}
import metrics
import eth/p2p/discoveryv5/enr
import logos_delivery/waku/waku
import logos_delivery/waku/node/waku_node
proc version*(self: Waku): string =
return WakuNodeVersionString
proc listenAddresses*(self: Waku): seq[string] =
return self.node.info().listenAddresses
proc myEnr*(self: Waku): string =
return self.node.enr.toURI()
proc myPeerId*(self: Waku): string =
return $self.node.peerId()
proc metrics*(self: Waku): string =
{.gcsafe.}:
return defaultRegistry.toText()

View File

@ -0,0 +1,76 @@
## Waku layer API — discovery operations (DNS, discv5, peer exchange).
{.push raises: [].}
import std/[net, sequtils]
import results, chronos, chronicles
import logos_delivery/waku/waku
import
logos_delivery/waku/[
waku_core,
node/waku_node,
node/waku_node/peer_exchange,
discovery/waku_dnsdisc,
discovery/waku_discv5,
]
proc dnsDiscovery*(
self: Waku, enrTreeUrl: string, nameServer: string, timeoutMs: int
): Future[Result[seq[string], string]] {.async.} =
try:
let dnsNameServers = @[parseIpAddress(nameServer)]
let discoveredPeers = (
await retrieveDynamicBootstrapNodes(enrTreeUrl, dnsNameServers)
).valueOr:
return err("failed discovering peers from DNS: " & $error)
var multiAddresses = newSeq[string]()
for discPeer in discoveredPeers:
for address in discPeer.addrs:
multiAddresses.add($address & "/p2p/" & $discPeer)
return ok(multiAddresses)
except CatchableError as e:
return err(e.msg)
proc discv5UpdateBootnodes*(
self: Waku, bootnodes: string
): Future[Result[bool, string]] {.async.} =
## `bootnodes` is a JSON array of ENRs, e.g. `["enr:...", "enr:..."]`.
try:
if self.wakuDiscv5.isNil():
return err("discv5 not started")
self.wakuDiscv5.updateBootstrapRecords(bootnodes).isOkOr:
return err("error in discv5UpdateBootnodes: " & $error)
return ok(true)
except CatchableError as e:
return err(e.msg)
proc startDiscv5*(self: Waku): Future[Result[bool, string]] {.async.} =
try:
if self.wakuDiscv5.isNil():
return err("discv5 not started")
(await self.wakuDiscv5.start()).isOkOr:
return err("error starting discv5: " & $error)
return ok(true)
except CatchableError as e:
return err(e.msg)
proc stopDiscv5*(self: Waku): Future[Result[bool, string]] {.async.} =
try:
if self.wakuDiscv5.isNil():
return err("discv5 not started")
await self.wakuDiscv5.stop()
return ok(true)
except CatchableError as e:
return err(e.msg)
proc peerExchangeRequest*(
self: Waku, numPeers: uint64
): Future[Result[int, string]] {.async.} =
try:
let numPeersRecv = (await self.node.fetchPeerExchangePeers(numPeers)).valueOr:
return err("failed peer exchange: " & $error)
return ok(numPeersRecv)
except CatchableError as e:
return err(e.msg)

View File

@ -0,0 +1,7 @@
import
./[
message_events, filter_subscribe_events, health_events, peer_events, discovery_events,
]
export
message_events, filter_subscribe_events, health_events, peer_events, discovery_events

View File

@ -0,0 +1,10 @@
import brokers/event_broker
import logos_delivery/api/types
import logos_delivery/waku/[waku_core/message, waku_core/topics]
export event_broker, types
EventBroker:
# Internal event emitted when a message arrives from the network via any protocol
type MessageSeenEvent* = object
topic*: PubsubTopic
message*: WakuMessage

View File

@ -0,0 +1,10 @@
## Waku layer API — health / connectivity.
{.push raises: [].}
import results, chronos, chronicles
import logos_delivery/waku/waku
import logos_delivery/waku/[node/health_monitor, node/health_monitor/online_monitor]
proc isOnline*(self: Waku): bool =
return self.healthMonitor.onlineMonitor.amIOnline()

View File

@ -0,0 +1,35 @@
## Waku layer API — lightpush (light client publish) operations.
import logos_delivery/waku/compat/option_valueor
{.push raises: [].}
import results, chronos, chronicles
import logos_delivery/waku/waku
import
logos_delivery/waku/[
waku_core,
waku_core/codecs,
node/waku_node,
node/peer_manager,
waku_lightpush_legacy/client,
]
proc lightpushPublish*(
self: Waku, pubsubTopic: PubsubTopic, message: WakuMessage
): Future[Result[string, string]] {.async.} =
## Selects a lightpush service peer and publishes; returns the message hash.
try:
if self.node.wakuLegacyLightpushClient.isNil():
return err("wakuLegacyLightpushClient is not mounted")
let remotePeer = self.node.peerManager.selectPeer(WakuLightPushCodec).valueOr:
return err("failed to lightpublish message, no suitable remote peers")
let msgHashHex = (
await self.node.wakuLegacyLightpushClient.publish(pubsubTopic, message, remotePeer)
).valueOr:
return err($error)
return ok(msgHashHex)
except CatchableError as e:
return err(e.msg)

View File

@ -0,0 +1,45 @@
## Waku layer API — ping operation.
{.push raises: [].}
import results, chronos, chronicles
import libp2p/protocols/ping
import libp2p/switch
import logos_delivery/waku/waku
import logos_delivery/waku/[waku_core, node/waku_node, node/waku_node/ping]
proc pingPeer*(
self: Waku, peerAddr: string, timeoutMs: int
): Future[Result[int64, string]] {.async.} =
## Pings the peer; `timeoutMs <= 0` means no timeout. Returns RTT in nanos.
try:
let peerInfo = parsePeerInfo(peerAddr).valueOr:
return err("pingPeer failed to parse peer addr: " & $error)
proc doPing(): Future[Result[Duration, string]] {.async.} =
try:
let conn =
await self.node.switch.dial(peerInfo.peerId, peerInfo.addrs, PingCodec)
defer:
await conn.close()
let rtt = await self.node.libp2pPing.ping(conn)
if rtt == 0.nanos:
return err("could not ping peer: rtt-0")
return ok(rtt)
except CatchableError as e:
return err("could not ping peer: " & e.msg)
let pingFut = doPing()
let rtt: Duration =
if timeoutMs <= 0:
(await pingFut).valueOr:
return err(error)
else:
if not await pingFut.withTimeout(chronos.milliseconds(timeoutMs)):
return err("ping timed out")
pingFut.read().valueOr:
return err(error)
return ok(rtt.nanos)
except CatchableError as e:
return err(e.msg)

View File

@ -0,0 +1,31 @@
## Waku layer API — store (historical query) operations.
{.push raises: [].}
import results, chronos, chronicles
import logos_delivery/waku/waku
import
logos_delivery/waku/[
waku_core, node/waku_node, waku_store/common, waku_store/client
]
proc storeQuery*(
self: Waku, request: StoreQueryRequest, peer: string, timeoutMs: int
): Future[Result[StoreQueryResponse, string]] {.async.} =
try:
if self.node.wakuStoreClient.isNil():
return err("wakuStoreClient is not mounted")
let remotePeer = parsePeerInfo(peer).valueOr:
return err("storeQuery failed to parse peer addr: " & $error)
let queryFut = self.node.wakuStoreClient.query(request, remotePeer)
if not await queryFut.withTimeout(timeoutMs.milliseconds):
return err("storeQuery timed out")
let queryResponse = queryFut.read().valueOr:
return err("storeQuery failed: " & $error)
return ok(queryResponse)
except CatchableError as e:
return err(e.msg)

View File

@ -0,0 +1,27 @@
## Waku layer API — topic construction.
{.push raises: [].}
import std/strformat
import results
import logos_delivery/waku/waku
import logos_delivery/waku/waku_core
proc buildContentTopic*(
self: Waku, appName: string, appVersion: uint32, name: string, encoding: string
): Result[ContentTopic, string] =
try:
return ok(ContentTopic(fmt"/{appName}/{appVersion}/{name}/{encoding}"))
except CatchableError as e:
return err(e.msg)
proc buildPubsubTopic*(
self: Waku, topicName: string
): Result[PubsubTopic, string] =
try:
return ok(PubsubTopic(fmt"/waku/2/{topicName}"))
except CatchableError as e:
return err(e.msg)
proc defaultPubsubTopic*(self: Waku): PubsubTopic =
return DefaultPubsubTopic

View File

@ -1,9 +0,0 @@
import
./[
message_events, delivery_events, health_events, peer_events, lifecycle_events,
discovery_events,
]
export
message_events, delivery_events, health_events, peer_events, lifecycle_events,
discovery_events