mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-06-26 11:29:28 +00:00
waku: extract relay/filter/peer_manager API into waku/api/
Move the relay, filter and peer-manager operations out of the monolithic waku/waku.nim into focused logos_delivery/waku/api/ modules over the Waku type, with the matching node-level touch-ups. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
parent
b6c6b298e5
commit
d7264a798c
88
logos_delivery/waku/api/filter.nim
Normal file
88
logos_delivery/waku/api/filter.nim
Normal file
@ -0,0 +1,88 @@
|
||||
## Waku layer API — filter (light client) operations.
|
||||
import logos_delivery/waku/compat/option_valueor
|
||||
{.push raises: [].}
|
||||
|
||||
import std/options
|
||||
import results, chronos, chronicles
|
||||
|
||||
import logos_delivery/waku/waku
|
||||
import
|
||||
logos_delivery/waku/[
|
||||
waku_core,
|
||||
waku_core/subscription/push_handler,
|
||||
node/waku_node,
|
||||
node/waku_node/filter,
|
||||
node/peer_manager,
|
||||
waku_filter_v2/client,
|
||||
waku_filter_v2/common,
|
||||
]
|
||||
|
||||
const FilterOpTimeout = 5.seconds
|
||||
|
||||
proc filterSubscribe*(
|
||||
self: Waku,
|
||||
pubsubTopic: PubsubTopic,
|
||||
contentTopics: seq[ContentTopic],
|
||||
pushHandler: FilterPushHandler,
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
## Registers `pushHandler` for incoming filtered messages, selects a filter
|
||||
## service peer, and subscribes.
|
||||
try:
|
||||
if self.node.wakuFilterClient.isNil():
|
||||
return err("wakuFilterClient is not mounted")
|
||||
|
||||
self.node.wakuFilterClient.registerPushHandler(pushHandler)
|
||||
|
||||
let peer = self.node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr:
|
||||
return err("could not find peer with WakuFilterSubscribeCodec when subscribing")
|
||||
|
||||
let subFut = self.node.filterSubscribe(some(pubsubTopic), contentTopics, peer)
|
||||
if not await subFut.withTimeout(FilterOpTimeout):
|
||||
return err("filter subscription timed out")
|
||||
subFut.read().isOkOr:
|
||||
return err($error)
|
||||
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc filterUnsubscribe*(
|
||||
self: Waku, pubsubTopic: PubsubTopic, contentTopics: seq[ContentTopic]
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
## Selects a filter service peer and unsubscribes the given content topics.
|
||||
try:
|
||||
if self.node.wakuFilterClient.isNil():
|
||||
return err("wakuFilterClient is not mounted")
|
||||
|
||||
let peer = self.node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr:
|
||||
return err("could not find peer with WakuFilterSubscribeCodec when unsubscribing")
|
||||
|
||||
let unsubFut = self.node.filterUnsubscribe(some(pubsubTopic), contentTopics, peer)
|
||||
if not await unsubFut.withTimeout(FilterOpTimeout):
|
||||
return err("filter un-subscription timed out")
|
||||
unsubFut.read().isOkOr:
|
||||
return err($error)
|
||||
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc filterUnsubscribeAll*(self: Waku): Future[Result[bool, string]] {.async.} =
|
||||
## Selects a filter service peer and unsubscribes from everything.
|
||||
try:
|
||||
if self.node.wakuFilterClient.isNil():
|
||||
return err("wakuFilterClient is not mounted")
|
||||
|
||||
let peer = self.node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr:
|
||||
return
|
||||
err("could not find peer with WakuFilterSubscribeCodec when unsubscribing all")
|
||||
|
||||
let unsubFut = self.node.filterUnsubscribeAll(peer)
|
||||
if not await unsubFut.withTimeout(FilterOpTimeout):
|
||||
return err("filter un-subscription all timed out")
|
||||
unsubFut.read().isOkOr:
|
||||
return err($error)
|
||||
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
112
logos_delivery/waku/api/peer_manager.nim
Normal file
112
logos_delivery/waku/api/peer_manager.nim
Normal file
@ -0,0 +1,112 @@
|
||||
## Waku layer API — peer management operations.
|
||||
{.push raises: [].}
|
||||
|
||||
import std/[options, sequtils, strutils]
|
||||
import results, chronos, chronicles
|
||||
import libp2p/[peerid, peerstore]
|
||||
|
||||
import logos_delivery/waku/waku
|
||||
import logos_delivery/waku/[waku_core, node/waku_node, node/peer_manager]
|
||||
|
||||
type PeerConnInfo* = object ## structured connected-peer info for the api boundary
|
||||
peerId*: string
|
||||
protocols*: seq[string]
|
||||
addresses*: seq[string]
|
||||
|
||||
proc connect*(
|
||||
self: Waku, peers: seq[string], timeoutMs: uint32
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
await self.node.connectToNodes(peers.mapIt(strip(it)), source = "static")
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc disconnectPeerById*(
|
||||
self: Waku, peerId: string
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
let pId = PeerId.init(peerId).valueOr:
|
||||
return err($error)
|
||||
await self.node.peerManager.disconnectNode(pId)
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc disconnectAllPeers*(self: Waku): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
await self.node.peerManager.disconnectAllPeers()
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc dialPeer*(
|
||||
self: Waku, peerAddr: string, protocol: string, timeoutMs: int
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
let remotePeerInfo = parsePeerInfo(peerAddr).valueOr:
|
||||
return err($error)
|
||||
let conn = await self.node.peerManager.dialPeer(remotePeerInfo, protocol)
|
||||
if conn.isNone():
|
||||
return err("failed dialing peer")
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc dialPeerById*(
|
||||
self: Waku, peerId: string, protocol: string, timeoutMs: int
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
let pId = PeerId.init(peerId).valueOr:
|
||||
return err($error)
|
||||
let conn = await self.node.peerManager.dialPeer(pId, protocol)
|
||||
if conn.isNone():
|
||||
return err("failed dialing peer")
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc peerIdsFromPeerstore*(self: Waku): Future[Result[seq[string], string]] {.async.} =
|
||||
try:
|
||||
return ok(self.node.peerManager.switch.peerStore.peers().mapIt($it.peerId))
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc connectedPeersInfo*(
|
||||
self: Waku
|
||||
): Future[Result[seq[PeerConnInfo], string]] {.async.} =
|
||||
## Structured info (protocols, addresses) for every connected peer.
|
||||
try:
|
||||
var infos: seq[PeerConnInfo]
|
||||
for peer in self.node.peerManager.switch.peerStore.peers():
|
||||
if peer.connectedness == Connected:
|
||||
infos.add(
|
||||
PeerConnInfo(
|
||||
peerId: $peer.peerId,
|
||||
protocols: peer.protocols,
|
||||
addresses: peer.addrs.mapIt($it),
|
||||
)
|
||||
)
|
||||
return ok(infos)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc connectedPeers*(self: Waku): Future[Result[seq[string], string]] {.async.} =
|
||||
try:
|
||||
let (inPeerIds, outPeerIds) = self.node.peerManager.connectedPeers()
|
||||
return ok(concat(inPeerIds, outPeerIds).mapIt($it))
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc peerIdsByProtocol*(
|
||||
self: Waku, protocol: string
|
||||
): Future[Result[seq[string], string]] {.async.} =
|
||||
try:
|
||||
return ok(
|
||||
self.node.peerManager.switch.peerStore
|
||||
.peers(protocol)
|
||||
.filterIt(it.connectedness == Connected)
|
||||
.mapIt($it.peerId)
|
||||
)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
134
logos_delivery/waku/api/relay.nim
Normal file
134
logos_delivery/waku/api/relay.nim
Normal file
@ -0,0 +1,134 @@
|
||||
## Waku layer API — relay (gossipsub) operations.
|
||||
{.push raises: [].}
|
||||
|
||||
import std/sequtils
|
||||
import results, chronos, chronicles, secp256k1, stew/byteutils
|
||||
|
||||
import logos_delivery/waku/waku
|
||||
import
|
||||
logos_delivery/waku/[
|
||||
waku_core,
|
||||
node/waku_node,
|
||||
node/waku_node/relay,
|
||||
node/subscription_manager,
|
||||
waku_relay/protocol,
|
||||
factory/waku_conf,
|
||||
factory/validator_signed,
|
||||
]
|
||||
|
||||
proc relayPublish*(
|
||||
self: Waku, pubsubTopic: PubsubTopic, message: WakuMessage, timeoutMs: uint32
|
||||
): Future[Result[string, string]] {.async.} =
|
||||
## Publishes `message` and returns its message hash (0x-hex).
|
||||
try:
|
||||
if self.node.wakuRelay.isNil():
|
||||
return err("relayPublish: WakuRelay not mounted")
|
||||
|
||||
(await self.node.wakuRelay.publish(pubsubTopic, message)).isOkOr:
|
||||
return err($error)
|
||||
|
||||
return ok(computeMessageHash(pubsubTopic, message).to0xHex)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc relaySubscribe*(
|
||||
self: Waku,
|
||||
pubsubTopic: PubsubTopic,
|
||||
handler: WakuRelayHandler = WakuRelayHandler(nil),
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
## Subscribes to `pubsubTopic`. `handler` (optional) is invoked per message;
|
||||
## pass nil to subscribe without a message callback.
|
||||
try:
|
||||
if self.node.wakuRelay.isNil():
|
||||
return err("relaySubscribe: WakuRelay not mounted")
|
||||
|
||||
self.node.subscribe(
|
||||
(kind: SubscriptionKind.PubsubSub, topic: pubsubTopic), handler
|
||||
).isOkOr:
|
||||
return err($error)
|
||||
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc relayUnsubscribe*(
|
||||
self: Waku, pubsubTopic: PubsubTopic
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuRelay.isNil():
|
||||
return err("relayUnsubscribe: WakuRelay not mounted")
|
||||
|
||||
self.node.unsubscribe((kind: SubscriptionKind.PubsubSub, topic: pubsubTopic)).isOkOr:
|
||||
return err($error)
|
||||
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc relayAddProtectedShard*(
|
||||
self: Waku, clusterId: uint16, shardId: uint16, publicKey: string
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuRelay.isNil():
|
||||
return err("relayAddProtectedShard: WakuRelay not mounted")
|
||||
|
||||
let pubKey = SkPublicKey.fromHex(publicKey).valueOr:
|
||||
return err("relayAddProtectedShard: invalid public key: " & $error)
|
||||
|
||||
let protectedShard = ProtectedShard(shard: shardId, key: pubKey)
|
||||
self.node.wakuRelay.addSignedShardsValidator(@[protectedShard], clusterId)
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc relayConnectedPeers*(
|
||||
self: Waku, pubsubTopic: PubsubTopic
|
||||
): Future[Result[seq[string], string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuRelay.isNil():
|
||||
return err("relayConnectedPeers: WakuRelay not mounted")
|
||||
|
||||
let connPeers = self.node.wakuRelay.getConnectedPeers(pubsubTopic).valueOr:
|
||||
return err($error)
|
||||
|
||||
return ok(connPeers.mapIt($it))
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc relayPeersInMesh*(
|
||||
self: Waku, pubsubTopic: PubsubTopic
|
||||
): Future[Result[seq[string], string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuRelay.isNil():
|
||||
return err("relayPeersInMesh: WakuRelay not mounted")
|
||||
|
||||
let meshPeers = self.node.wakuRelay.getPeersInMesh(pubsubTopic).valueOr:
|
||||
return err($error)
|
||||
|
||||
return ok(meshPeers.mapIt($it))
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc relayNumPeersInMesh*(
|
||||
self: Waku, pubsubTopic: PubsubTopic
|
||||
): Future[Result[int, string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuRelay.isNil():
|
||||
return err("relayNumPeersInMesh: WakuRelay not mounted")
|
||||
let n = self.node.wakuRelay.getNumPeersInMesh(pubsubTopic).valueOr:
|
||||
return err($error)
|
||||
return ok(n)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc relayNumConnectedPeers*(
|
||||
self: Waku, pubsubTopic: PubsubTopic
|
||||
): Future[Result[int, string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuRelay.isNil():
|
||||
return err("relayNumConnectedPeers: WakuRelay not mounted")
|
||||
let n = self.node.wakuRelay.getNumConnectedPeers(pubsubTopic).valueOr:
|
||||
return err($error)
|
||||
return ok(n)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
@ -21,7 +21,7 @@ import
|
||||
import
|
||||
logos_delivery/waku/waku_core,
|
||||
logos_delivery/waku/node/peer_manager,
|
||||
logos_delivery/waku/events/discovery_events
|
||||
logos_delivery/waku/api/events/discovery_events
|
||||
|
||||
logScope:
|
||||
topics = "waku service discovery"
|
||||
|
||||
@ -1,5 +1,9 @@
|
||||
import ../waku_relay, ../node/peer_manager, ../node/health_monitor/connection_status
|
||||
|
||||
# Re-export the modules that define the handler types below, so that consumers
|
||||
# of `AppCallbacks` (e.g. the FFI library) can construct the handlers.
|
||||
export waku_relay, peer_manager, connection_status
|
||||
|
||||
type AppCallbacks* = ref object
|
||||
relayHandler*: WakuRelayHandler
|
||||
topicHealthChangeHandler*: TopicHealthChangeHandler
|
||||
|
||||
@ -39,7 +39,7 @@ import
|
||||
../node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations,
|
||||
../waku_lightpush_legacy/common,
|
||||
../common/rate_limit/setting,
|
||||
../events/discovery_events
|
||||
../api/events/discovery_events
|
||||
|
||||
## Peer persistence
|
||||
|
||||
|
||||
@ -12,8 +12,8 @@ import
|
||||
logos_delivery/waku/[
|
||||
waku_relay,
|
||||
waku_rln_relay,
|
||||
events/health_events,
|
||||
events/peer_events,
|
||||
api/events/health_events,
|
||||
api/events/peer_events,
|
||||
node/waku_node,
|
||||
node/node_telemetry,
|
||||
node/peer_manager,
|
||||
|
||||
@ -20,7 +20,7 @@ import
|
||||
waku_relay/protocol,
|
||||
waku_enr/sharding,
|
||||
waku_enr/capabilities,
|
||||
events/peer_events,
|
||||
api/events/peer_events,
|
||||
common/nimchronos,
|
||||
common/enr,
|
||||
common/callbacks,
|
||||
|
||||
@ -15,9 +15,9 @@ import
|
||||
waku_filter_v2/common as filter_common,
|
||||
waku_filter_v2/client as filter_client,
|
||||
waku_filter_v2/protocol as filter_protocol,
|
||||
events/health_events,
|
||||
events/message_events,
|
||||
events/peer_events,
|
||||
api/events/health_events,
|
||||
api/events/message_events,
|
||||
api/events/peer_events,
|
||||
requests/health_requests,
|
||||
node/peer_manager,
|
||||
node/health_monitor/topic_health,
|
||||
|
||||
@ -59,9 +59,9 @@ import
|
||||
waku_mix,
|
||||
requests/node_requests,
|
||||
requests/health_requests,
|
||||
events/health_events,
|
||||
events/message_events,
|
||||
events/peer_events,
|
||||
api/events/health_events,
|
||||
api/events/message_events,
|
||||
api/events/peer_events,
|
||||
],
|
||||
logos_delivery/waku/discovery/waku_kademlia,
|
||||
logos_delivery/waku/net/[bound_ports, net_config],
|
||||
|
||||
@ -32,7 +32,7 @@ import
|
||||
node/waku_node,
|
||||
node/subscription_manager,
|
||||
node/peer_manager,
|
||||
events/message_events,
|
||||
api/events/message_events,
|
||||
]
|
||||
|
||||
export waku_relay.WakuRelayHandler
|
||||
|
||||
@ -63,7 +63,6 @@ logScope:
|
||||
# Git version in git describe format (defined at compile time)
|
||||
const git_version* {.strdefine.} = "n/a"
|
||||
|
||||
const FilterOpTimeout = 5.seconds
|
||||
|
||||
type Waku* = ref object
|
||||
stateInfo*: WakuStateInfo
|
||||
@ -574,418 +573,4 @@ proc stop*(waku: Waku): Future[Result[void, string]] {.async: (raises: []).} =
|
||||
|
||||
return ok()
|
||||
|
||||
## Kernel API realization
|
||||
##
|
||||
# --- topic construction ---
|
||||
proc buildContentTopic*(
|
||||
self: Waku, appName: string, appVersion: uint32, name: string, encoding: string
|
||||
): Future[Result[ContentTopic, string]] {.async.} =
|
||||
try:
|
||||
return ok(ContentTopic(fmt"/{appName}/{appVersion}/{name}/{encoding}"))
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc buildPubsubTopic*(
|
||||
self: Waku, topicName: string
|
||||
): Future[Result[PubsubTopic, string]] {.async.} =
|
||||
try:
|
||||
return ok(PubsubTopic(fmt"/waku/2/{topicName}"))
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc defaultPubsubTopic*(self: Waku): Future[Result[PubsubTopic, string]] {.async.} =
|
||||
return ok(DefaultPubsubTopic)
|
||||
|
||||
# --- relay ---
|
||||
proc relayPublish*(
|
||||
self: Waku, pubsubTopic: PubsubTopic, message: WakuMessage, timeoutMs: uint32
|
||||
): Future[Result[int, string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuRelay.isNil():
|
||||
return err("relayPublish: WakuRelay not mounted")
|
||||
|
||||
let numPeers = (await self.node.wakuRelay.publish(pubsubTopic, message)).valueOr:
|
||||
return err($error)
|
||||
|
||||
return ok(numPeers)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc relaySubscribe*(
|
||||
self: Waku, pubsubTopic: PubsubTopic
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuRelay.isNil():
|
||||
return err("relaySubscribe: WakuRelay not mounted")
|
||||
|
||||
self.node.subscribe(
|
||||
(kind: SubscriptionKind.PubsubSub, topic: pubsubTopic), WakuRelayHandler(nil)
|
||||
).isOkOr:
|
||||
return err($error)
|
||||
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc relayUnsubscribe*(
|
||||
self: Waku, pubsubTopic: PubsubTopic
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuRelay.isNil():
|
||||
return err("relayUnsubscribe: WakuRelay not mounted")
|
||||
|
||||
self.node.unsubscribe((kind: SubscriptionKind.PubsubSub, topic: pubsubTopic)).isOkOr:
|
||||
return err($error)
|
||||
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc relayAddProtectedShard*(
|
||||
self: Waku, clusterId: uint16, shardId: uint16, publicKey: string
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuRelay.isNil():
|
||||
return err("relayAddProtectedShard: WakuRelay not mounted")
|
||||
|
||||
let pubKey = SkPublicKey.fromHex(publicKey).valueOr:
|
||||
return err("relayAddProtectedShard: invalid public key: " & $error)
|
||||
|
||||
let protectedShard = ProtectedShard(shard: shardId, key: pubKey)
|
||||
self.node.wakuRelay.addSignedShardsValidator(@[protectedShard], clusterId)
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc relayConnectedPeers*(
|
||||
self: Waku, pubsubTopic: PubsubTopic
|
||||
): Future[Result[seq[string], string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuRelay.isNil():
|
||||
return err("relayConnectedPeers: WakuRelay not mounted")
|
||||
|
||||
let connPeers = self.node.wakuRelay.getConnectedPeers(pubsubTopic).valueOr:
|
||||
return err($error)
|
||||
|
||||
return ok(connPeers.mapIt($it))
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc relayPeersInMesh*(
|
||||
self: Waku, pubsubTopic: PubsubTopic
|
||||
): Future[Result[seq[string], string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuRelay.isNil():
|
||||
return err("relayPeersInMesh: WakuRelay not mounted")
|
||||
|
||||
let meshPeers = self.node.wakuRelay.getPeersInMesh(pubsubTopic).valueOr:
|
||||
return err($error)
|
||||
|
||||
return ok(meshPeers.mapIt($it))
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
# --- filter ---
|
||||
proc filterSubscribe*(
|
||||
self: Waku,
|
||||
pubsubTopic: Option[PubsubTopic],
|
||||
contentTopics: seq[ContentTopic],
|
||||
peer: string,
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuFilterClient.isNil():
|
||||
return err("wakuFilterClient is not mounted")
|
||||
|
||||
let subFut = self.node.filterSubscribe(pubsubTopic, contentTopics, peer)
|
||||
if not await subFut.withTimeout(FilterOpTimeout):
|
||||
return err("filter subscription timed out")
|
||||
subFut.read().isOkOr:
|
||||
return err($error)
|
||||
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc filterUnsubscribe*(
|
||||
self: Waku,
|
||||
pubsubTopic: Option[PubsubTopic],
|
||||
contentTopics: seq[ContentTopic],
|
||||
peer: string,
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuFilterClient.isNil():
|
||||
return err("wakuFilterClient is not mounted")
|
||||
|
||||
let unsubFut = self.node.filterUnsubscribe(pubsubTopic, contentTopics, peer)
|
||||
if not await unsubFut.withTimeout(FilterOpTimeout):
|
||||
return err("filter un-subscription timed out")
|
||||
unsubFut.read().isOkOr:
|
||||
return err($error)
|
||||
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc filterUnsubscribeAll*(
|
||||
self: Waku, peer: string
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuFilterClient.isNil():
|
||||
return err("wakuFilterClient is not mounted")
|
||||
|
||||
let unsubFut = self.node.filterUnsubscribeAll(peer)
|
||||
if not await unsubFut.withTimeout(FilterOpTimeout):
|
||||
return err("filter un-subscription all timed out")
|
||||
unsubFut.read().isOkOr:
|
||||
return err($error)
|
||||
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
# --- lightpush ---
|
||||
proc lightpushPublish*(
|
||||
self: Waku, pubsubTopic: PubsubTopic, message: WakuMessage, peer: string
|
||||
): Future[Result[string, string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuLegacyLightpushClient.isNil():
|
||||
return err("wakuLegacyLightpushClient is not mounted")
|
||||
|
||||
let remotePeer = parsePeerInfo(peer).valueOr:
|
||||
return err("lightpushPublish failed to parse peer addr: " & $error)
|
||||
|
||||
let msgHashHex = (
|
||||
await self.node.wakuLegacyLightpushClient.publish(
|
||||
pubsubTopic, message, remotePeer
|
||||
)
|
||||
).valueOr:
|
||||
return err($error)
|
||||
|
||||
return ok(msgHashHex)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
# --- store ---
|
||||
proc storeQuery*(
|
||||
self: Waku, request: StoreQueryRequest, peer: string, timeoutMs: int
|
||||
): Future[Result[StoreQueryResponse, string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuStoreClient.isNil():
|
||||
return err("wakuStoreClient is not mounted")
|
||||
|
||||
let remotePeer = parsePeerInfo(peer).valueOr:
|
||||
return err("storeQuery failed to parse peer addr: " & $error)
|
||||
|
||||
let queryFut = self.node.wakuStoreClient.query(request, remotePeer)
|
||||
if not await queryFut.withTimeout(timeoutMs.milliseconds):
|
||||
return err("storeQuery timed out")
|
||||
|
||||
let queryResponse = queryFut.read().valueOr:
|
||||
return err("storeQuery failed: " & $error)
|
||||
|
||||
return ok(queryResponse)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
# --- peer management ---
|
||||
proc connect*(
|
||||
self: Waku, peers: seq[string], timeoutMs: uint32
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
await self.node.connectToNodes(peers.mapIt(strip(it)), source = "static")
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc disconnectPeerById*(
|
||||
self: Waku, peerId: string
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
let pId = PeerId.init(peerId).valueOr:
|
||||
return err($error)
|
||||
await self.node.peerManager.disconnectNode(pId)
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc disconnectAllPeers*(self: Waku): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
await self.node.peerManager.disconnectAllPeers()
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc dialPeer*(
|
||||
self: Waku, peerAddr: string, protocol: string, timeoutMs: int
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
let remotePeerInfo = parsePeerInfo(peerAddr).valueOr:
|
||||
return err($error)
|
||||
let conn = await self.node.peerManager.dialPeer(remotePeerInfo, protocol)
|
||||
if conn.isNone():
|
||||
return err("failed dialing peer")
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc dialPeerById*(
|
||||
self: Waku, peerId: string, protocol: string, timeoutMs: int
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
let pId = PeerId.init(peerId).valueOr:
|
||||
return err($error)
|
||||
let conn = await self.node.peerManager.dialPeer(pId, protocol)
|
||||
if conn.isNone():
|
||||
return err("failed dialing peer")
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc peerIdsFromPeerstore*(self: Waku): Future[Result[seq[string], string]] {.async.} =
|
||||
try:
|
||||
return ok(self.node.peerManager.switch.peerStore.peers().mapIt($it.peerId))
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc connectedPeersInfo*(self: Waku): Future[Result[seq[string], string]] {.async.} =
|
||||
try:
|
||||
return ok(
|
||||
self.node.peerManager.switch.peerStore
|
||||
.peers()
|
||||
.filterIt(it.connectedness == Connected)
|
||||
.mapIt($it.peerId)
|
||||
)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc connectedPeers*(self: Waku): Future[Result[seq[string], string]] {.async.} =
|
||||
try:
|
||||
let (inPeerIds, outPeerIds) = self.node.peerManager.connectedPeers()
|
||||
return ok(concat(inPeerIds, outPeerIds).mapIt($it))
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc peerIdsByProtocol*(
|
||||
self: Waku, protocol: string
|
||||
): Future[Result[seq[string], string]] {.async.} =
|
||||
try:
|
||||
return ok(
|
||||
self.node.peerManager.switch.peerStore
|
||||
.peers(protocol)
|
||||
.filterIt(it.connectedness == Connected)
|
||||
.mapIt($it.peerId)
|
||||
)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
# --- discovery ---
|
||||
proc dnsDiscovery*(
|
||||
self: Waku, enrTreeUrl: string, nameServer: string, timeoutMs: int
|
||||
): Future[Result[seq[string], string]] {.async.} =
|
||||
try:
|
||||
let dnsNameServers = @[parseIpAddress(nameServer)]
|
||||
let discoveredPeers = (
|
||||
await retrieveDynamicBootstrapNodes(enrTreeUrl, dnsNameServers)
|
||||
).valueOr:
|
||||
return err("failed discovering peers from DNS: " & $error)
|
||||
|
||||
var multiAddresses = newSeq[string]()
|
||||
for discPeer in discoveredPeers:
|
||||
for address in discPeer.addrs:
|
||||
multiAddresses.add($address & "/p2p/" & $discPeer)
|
||||
|
||||
return ok(multiAddresses)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc discv5UpdateBootnodes*(
|
||||
self: Waku, bootnodes: seq[string]
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
if self.wakuDiscv5.isNil():
|
||||
return err("discv5 not started")
|
||||
let jsonArray = "[" & bootnodes.mapIt("\"" & it & "\"").join(",") & "]"
|
||||
self.wakuDiscv5.updateBootstrapRecords(jsonArray).isOkOr:
|
||||
return err("error in discv5UpdateBootnodes: " & $error)
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc startDiscv5*(self: Waku): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
if self.wakuDiscv5.isNil():
|
||||
return err("discv5 not started")
|
||||
(await self.wakuDiscv5.start()).isOkOr:
|
||||
return err("error starting discv5: " & $error)
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc stopDiscv5*(self: Waku): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
if self.wakuDiscv5.isNil():
|
||||
return err("discv5 not started")
|
||||
await self.wakuDiscv5.stop()
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc peerExchangeRequest*(
|
||||
self: Waku, numPeers: uint64
|
||||
): Future[Result[int, string]] {.async.} =
|
||||
try:
|
||||
let numPeersRecv = (await self.node.fetchPeerExchangePeers(numPeers)).valueOr:
|
||||
return err("failed peer exchange: " & $error)
|
||||
return ok(numPeersRecv)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
# --- debug / info ---
|
||||
proc version*(self: Waku): Future[Result[string, string]] {.async.} =
|
||||
return ok(WakuNodeVersionString)
|
||||
|
||||
proc listenAddresses*(self: Waku): Future[Result[seq[string], string]] {.async.} =
|
||||
try:
|
||||
return ok(self.node.info().listenAddresses)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc myEnr*(self: Waku): Future[Result[string, string]] {.async.} =
|
||||
try:
|
||||
return ok(self.node.enr.toURI())
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc myPeerId*(self: Waku): Future[Result[string, string]] {.async.} =
|
||||
try:
|
||||
return ok($self.node.peerId())
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc metrics*(self: Waku): Future[Result[string, string]] {.async.} =
|
||||
{.gcsafe.}:
|
||||
try:
|
||||
return ok(defaultRegistry.toText())
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc pingPeer*(
|
||||
self: Waku, peerAddr: string, timeoutMs: int
|
||||
): Future[Result[int64, string]] {.async.} =
|
||||
try:
|
||||
let peerInfo = parsePeerInfo(peerAddr).valueOr:
|
||||
return err("pingPeer failed to parse peer addr: " & $error)
|
||||
|
||||
let conn = await self.node.switch.dial(peerInfo.peerId, peerInfo.addrs, PingCodec)
|
||||
defer:
|
||||
await conn.close()
|
||||
let pingRTT = await self.node.libp2pPing.ping(conn)
|
||||
|
||||
if pingRTT == 0.nanos:
|
||||
return err("could not ping peer: rtt-0")
|
||||
|
||||
return ok(pingRTT.nanos)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
{.pop.}
|
||||
|
||||
@ -14,7 +14,7 @@ import
|
||||
brokers/broker_context
|
||||
|
||||
import
|
||||
logos_delivery/waku/[node/peer_manager, waku_core, events/delivery_events],
|
||||
logos_delivery/waku/[node/peer_manager, waku_core, api/events/filter_subscribe_events],
|
||||
./common,
|
||||
./protocol_metrics,
|
||||
./rpc_codec,
|
||||
|
||||
@ -24,9 +24,9 @@ import
|
||||
logos_delivery/waku/waku_core,
|
||||
logos_delivery/waku/node/health_monitor/topic_health,
|
||||
logos_delivery/waku/requests/health_requests,
|
||||
logos_delivery/waku/events/health_events,
|
||||
logos_delivery/waku/api/events/health_events,
|
||||
./message_id,
|
||||
logos_delivery/waku/events/peer_events
|
||||
logos_delivery/waku/api/events/peer_events
|
||||
|
||||
from logos_delivery/waku/waku_core/codecs import WakuRelayCodec
|
||||
export WakuRelayCodec
|
||||
|
||||
@ -19,8 +19,8 @@ import
|
||||
node/waku_node/store,
|
||||
node/waku_node/lightpush,
|
||||
node/waku_node/filter,
|
||||
events/health_events,
|
||||
events/peer_events,
|
||||
api/events/health_events,
|
||||
api/events/peer_events,
|
||||
waku_archive,
|
||||
]
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user