Merge 68ef4c70d029b3e909e3143181a9f7724304a009 into 26098d76cf7a30f00c76dc5a01d0d74614674628

This commit is contained in:
NagyZoltanPeter 2026-06-26 19:12:45 +02:00 committed by GitHub
commit d2a3b004a9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
60 changed files with 1510 additions and 1037 deletions

View File

@ -0,0 +1,76 @@
import std/json
import chronos, results, ffi
import
logos_delivery/waku/common/base64,
logos_delivery,
logos_delivery/waku/waku_core/topics/content_topic,
logos_delivery/api/types,
../declare_lib
proc logosdelivery_channel_create(
ctx: ptr FFIContext[LogosDelivery],
callback: FFICallBack,
userData: pointer,
channelIdStr: cstring,
contentTopicStr: cstring,
senderIdStr: cstring,
) {.ffi.} =
requireInitializedNode(ctx, "ChannelCreate"):
return err(errMsg)
let id = ctx.myLib[].reliableChannelManager.createReliableChannel(
ChannelId($channelIdStr),
ContentTopic($contentTopicStr),
SdsParticipantID($senderIdStr),
).valueOr:
return err("ChannelCreate failed: " & $error)
return ok(string(id))
proc logosdelivery_channel_send(
ctx: ptr FFIContext[LogosDelivery],
callback: FFICallBack,
userData: pointer,
channelIdStr: cstring,
messageJson: cstring,
) {.ffi.} =
## `messageJson` carries `{ "payload": <base64>, "ephemeral": <bool> }`.
requireInitializedNode(ctx, "ChannelSend"):
return err(errMsg)
var jsonNode: JsonNode
try:
jsonNode = parseJson($messageJson)
except Exception as e:
return err("Failed to parse channel message JSON: " & e.msg)
if not jsonNode.hasKey("payload"):
return err("Missing payload field")
let payload = base64.decode(Base64String(jsonNode["payload"].getStr())).valueOr:
return err("invalid payload format: " & error)
let ephemeral = jsonNode.getOrDefault("ephemeral").getBool(false)
let requestId = (
await ctx.myLib[].reliableChannelManager.send(
ChannelId($channelIdStr), payload, ephemeral
)
).valueOr:
return err("ChannelSend failed: " & $error)
return ok($requestId)
proc logosdelivery_channel_close(
ctx: ptr FFIContext[LogosDelivery],
callback: FFICallBack,
userData: pointer,
channelIdStr: cstring,
) {.ffi.} =
requireInitializedNode(ctx, "ChannelClose"):
return err(errMsg)
(await ctx.myLib[].reliableChannelManager.closeChannel(ChannelId($channelIdStr))).isOkOr:
return err("ChannelClose failed: " & $error)
return ok("")

View File

@ -1,53 +1,46 @@
import std/json
import
chronicles,
chronos,
results,
eth/p2p/discoveryv5/enr,
strutils,
libp2p/peerid,
metrics,
ffi
import
logos_delivery/waku/waku,
logos_delivery/waku/node/waku_node,
logos_delivery/waku/node/health_monitor,
library/declare_lib
proc getMultiaddresses(node: WakuNode): seq[string] =
return node.info().listenAddresses
proc getMetrics(): string =
{.gcsafe.}:
return defaultRegistry.toText() ## defaultRegistry is {.global.} in metrics module
import std/strutils
import chronos, results, ffi
import logos_delivery, library/declare_lib
proc waku_version(
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
) {.ffi.} =
return ok(WakuNodeVersionString)
let v = (await ctx.myLib[].waku.version()).valueOr:
return err(error)
return ok(v)
proc waku_listen_addresses(
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
) {.ffi.} =
## returns a comma-separated string of the listen addresses
return ok(ctx.myLib[].waku.node.getMultiaddresses().join(","))
let addrs = (await ctx.myLib[].waku.listenAddresses()).valueOr:
return err(error)
return ok(addrs.join(","))
proc waku_get_my_enr(
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
) {.ffi.} =
return ok(ctx.myLib[].waku.node.enr.toURI())
let enrUri = (await ctx.myLib[].waku.myEnr()).valueOr:
return err(error)
return ok(enrUri)
proc waku_get_my_peerid(
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
) {.ffi.} =
return ok($ctx.myLib[].waku.node.peerId())
let peerId = (await ctx.myLib[].waku.myPeerId()).valueOr:
return err(error)
return ok(peerId)
proc waku_get_metrics(
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
) {.ffi.} =
return ok(getMetrics())
let m = (await ctx.myLib[].waku.metrics()).valueOr:
return err(error)
return ok(m)
proc waku_is_online(
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
) {.ffi.} =
return ok($ctx.myLib[].waku.healthMonitor.onlineMonitor.amIOnline())
let online = (await ctx.myLib[].waku.isOnline()).valueOr:
return err(error)
return ok($online)

View File

@ -1,42 +1,6 @@
import logos_delivery/waku/compat/option_valueor
import std/json
import chronos, chronicles, results, strutils, libp2p/multiaddress, ffi
import
logos_delivery/waku/waku,
logos_delivery/waku/discovery/waku_dnsdisc,
logos_delivery/waku/discovery/waku_discv5,
logos_delivery/waku/waku_core/peers,
logos_delivery/waku/waku_node,
library/declare_lib
proc retrieveBootstrapNodes(
enrTreeUrl: string, ipDnsServer: string
): Future[Result[seq[string], string]] {.async.} =
let dnsNameServers = @[parseIpAddress(ipDnsServer)]
let discoveredPeers: seq[RemotePeerInfo] = (
await retrieveDynamicBootstrapNodes(enrTreeUrl, dnsNameServers)
).valueOr:
return err("failed discovering peers from DNS: " & $error)
var multiAddresses = newSeq[string]()
for discPeer in discoveredPeers:
for address in discPeer.addrs:
multiAddresses.add($address & "/p2p/" & $discPeer)
return ok(multiAddresses)
proc updateDiscv5BootstrapNodes(nodes: string, waku: Waku): Result[void, string] =
waku.wakuDiscv5.updateBootstrapRecords(nodes).isOkOr:
return err("error in updateDiscv5BootstrapNodes: " & $error)
return ok()
proc performPeerExchangeRequestTo*(
numPeers: uint64, waku: Waku
): Future[Result[int, string]] {.async.} =
let numPeersRecv = (await waku.node.fetchPeerExchangePeers(numPeers)).valueOr:
return err($error)
return ok(numPeersRecv)
import std/strutils
import chronos, chronicles, results, ffi
import logos_delivery, library/declare_lib
proc waku_discv5_update_bootnodes(
ctx: ptr FFIContext[LogosDelivery],
@ -46,11 +10,9 @@ proc waku_discv5_update_bootnodes(
) {.ffi.} =
## Updates the bootnode list used for discovering new peers via DiscoveryV5
## bootnodes - JSON array containing the bootnode ENRs i.e. `["enr:...", "enr:..."]`
updateDiscv5BootstrapNodes($bootnodes, ctx.myLib[].waku).isOkOr:
(await ctx.myLib[].waku.discv5UpdateBootnodes($bootnodes)).isOkOr:
error "UPDATE_DISCV5_BOOTSTRAP_NODES failed", error = error
return err($error)
return err(error)
return ok("discovery request processed correctly")
proc waku_dns_discovery(
@ -61,26 +23,28 @@ proc waku_dns_discovery(
nameDnsServer: cstring,
timeoutMs: cint,
) {.ffi.} =
let nodes = (await retrieveBootstrapNodes($enrTreeUrl, $nameDnsServer)).valueOr:
let nodes = (
await ctx.myLib[].waku.dnsDiscovery($enrTreeUrl, $nameDnsServer, int(timeoutMs))
).valueOr:
error "GET_BOOTSTRAP_NODES failed", error = error
return err($error)
return err(error)
## returns a comma-separated string of bootstrap nodes' multiaddresses
return ok(nodes.join(","))
proc waku_start_discv5(
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
) {.ffi.} =
(await ctx.myLib[].waku.wakuDiscv5.start()).isOkOr:
(await ctx.myLib[].waku.startDiscv5()).isOkOr:
error "START_DISCV5 failed", error = error
return err("error starting discv5: " & $error)
return err(error)
return ok("discv5 started correctly")
proc waku_stop_discv5(
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
) {.ffi.} =
await ctx.myLib[].waku.wakuDiscv5.stop()
(await ctx.myLib[].waku.stopDiscv5()).isOkOr:
error "STOP_DISCV5 failed", error = error
return err(error)
return ok("discv5 stopped correctly")
proc waku_peer_exchange_request(
@ -89,8 +53,7 @@ proc waku_peer_exchange_request(
userData: pointer,
numPeers: uint64,
) {.ffi.} =
let numValidPeers = (await performPeerExchangeRequestTo(numPeers, ctx.myLib[].waku)).valueOr:
let numValidPeers = (await ctx.myLib[].waku.peerExchangeRequest(numPeers)).valueOr:
error "waku_peer_exchange_request failed", error = error
return err("failed peer exchange: " & $error)
return err(error)
return ok($numValidPeers)

View File

@ -1,11 +1,6 @@
import logos_delivery/waku/compat/option_valueor
import std/[sequtils, strutils, tables]
import chronicles, chronos, results, options, json, ffi
import
logos_delivery/waku/waku,
logos_delivery/waku/node/waku_node,
logos_delivery/waku/node/peer_manager,
library/declare_lib
import std/[strutils, tables, json]
import chronicles, chronos, results, ffi
import logos_delivery, library/declare_lib
type PeerInfo = object
protocols: seq[string]
@ -15,11 +10,9 @@ proc waku_get_peerids_from_peerstore(
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
) {.ffi.} =
## returns a comma-separated string of peerIDs
let peerIDs = ctx.myLib[].waku.node.peerManager.switch.peerStore
.peers()
.mapIt($it.peerId)
.join(",")
return ok(peerIDs)
let peerIds = (await ctx.myLib[].waku.peerIdsFromPeerstore()).valueOr:
return err(error)
return ok(peerIds.join(","))
proc waku_connect(
ctx: ptr FFIContext[LogosDelivery],
@ -28,8 +21,9 @@ proc waku_connect(
peerMultiAddr: cstring,
timeoutMs: cuint,
) {.ffi.} =
let peers = ($peerMultiAddr).split(",").mapIt(strip(it))
await ctx.myLib[].waku.node.connectToNodes(peers, source = "static")
let peers = ($peerMultiAddr).split(",")
(await ctx.myLib[].waku.connect(peers, uint32(timeoutMs))).isOkOr:
return err(error)
return ok("")
proc waku_disconnect_peer_by_id(
@ -38,16 +32,16 @@ proc waku_disconnect_peer_by_id(
userData: pointer,
peerId: cstring,
) {.ffi.} =
let pId = PeerId.init($peerId).valueOr:
error "DISCONNECT_PEER_BY_ID failed", error = $error
return err($error)
await ctx.myLib[].waku.node.peerManager.disconnectNode(pId)
(await ctx.myLib[].waku.disconnectPeerById($peerId)).isOkOr:
error "DISCONNECT_PEER_BY_ID failed", error = error
return err(error)
return ok("")
proc waku_disconnect_all_peers(
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
) {.ffi.} =
await ctx.myLib[].waku.node.peerManager.disconnectAllPeers()
(await ctx.myLib[].waku.disconnectAllPeers()).isOkOr:
return err(error)
return ok("")
proc waku_dial_peer(
@ -58,14 +52,9 @@ proc waku_dial_peer(
protocol: cstring,
timeoutMs: cuint,
) {.ffi.} =
let remotePeerInfo = parsePeerInfo($peerMultiAddr).valueOr:
error "DIAL_PEER failed", error = $error
return err($error)
let conn = await ctx.myLib[].waku.node.peerManager.dialPeer(remotePeerInfo, $protocol)
if conn.isNone():
let msg = "failed dialing peer"
error "DIAL_PEER failed", error = msg, peerId = $remotePeerInfo.peerId
return err(msg)
(await ctx.myLib[].waku.dialPeer($peerMultiAddr, $protocol, int(timeoutMs))).isOkOr:
error "DIAL_PEER failed", error = error
return err(error)
return ok("")
proc waku_dial_peer_by_id(
@ -76,47 +65,32 @@ proc waku_dial_peer_by_id(
protocol: cstring,
timeoutMs: cuint,
) {.ffi.} =
let pId = PeerId.init($peerId).valueOr:
error "DIAL_PEER_BY_ID failed", error = $error
return err($error)
let conn = await ctx.myLib[].waku.node.peerManager.dialPeer(pId, $protocol)
if conn.isNone():
let msg = "failed dialing peer"
error "DIAL_PEER_BY_ID failed", error = msg, peerId = $peerId
return err(msg)
(await ctx.myLib[].waku.dialPeerById($peerId, $protocol, int(timeoutMs))).isOkOr:
error "DIAL_PEER_BY_ID failed", error = error
return err(error)
return ok("")
proc waku_get_connected_peers_info(
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
) {.ffi.} =
## returns a JSON string mapping peerIDs to objects with protocols and addresses
let peers = (await ctx.myLib[].waku.connectedPeersInfo()).valueOr:
return err(error)
var peersMap = initTable[string, PeerInfo]()
let peers = ctx.myLib[].waku.node.peerManager.switch.peerStore.peers().filterIt(
it.connectedness == Connected
)
# Build a map of peer IDs to peer info objects
for peer in peers:
let peerIdStr = $peer.peerId
peersMap[peerIdStr] =
PeerInfo(protocols: peer.protocols, addresses: peer.addrs.mapIt($it))
peersMap[peer.peerId] =
PeerInfo(protocols: peer.protocols, addresses: peer.addresses)
# Convert the map to JSON string
let jsonObj = %*peersMap
let jsonStr = $jsonObj
return ok(jsonStr)
return ok($(%*peersMap))
proc waku_get_connected_peers(
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
) {.ffi.} =
## returns a comma-separated string of peerIDs
let
(inPeerIds, outPeerIds) = ctx.myLib[].waku.node.peerManager.connectedPeers()
connectedPeerids = concat(inPeerIds, outPeerIds)
return ok(connectedPeerids.mapIt($it).join(","))
let peerIds = (await ctx.myLib[].waku.connectedPeers()).valueOr:
return err(error)
return ok(peerIds.join(","))
proc waku_get_peerids_by_protocol(
ctx: ptr FFIContext[LogosDelivery],
@ -125,9 +99,6 @@ proc waku_get_peerids_by_protocol(
protocol: cstring,
) {.ffi.} =
## returns a comma-separated string of peerIDs that mount the given protocol
let connectedPeers = ctx.myLib[].waku.node.peerManager.switch.peerStore
.peers($protocol)
.filterIt(it.connectedness == Connected)
.mapIt($it.peerId)
.join(",")
return ok(connectedPeers)
let peerIds = (await ctx.myLib[].waku.peerIdsByProtocol($protocol)).valueOr:
return err(error)
return ok(peerIds.join(","))

View File

@ -1,7 +1,5 @@
import std/[json, strutils]
import chronos, results, ffi
import libp2p/[protocols/ping, switch, multiaddress, multicodec]
import logos_delivery/waku/[waku, waku_core/peers, node/waku_node], library/declare_lib
import logos_delivery, library/declare_lib
proc waku_ping_peer(
ctx: ptr FFIContext[LogosDelivery],
@ -10,35 +8,6 @@ proc waku_ping_peer(
peerAddr: cstring,
timeoutMs: cuint,
) {.ffi.} =
let peerInfo = peers.parsePeerInfo(($peerAddr).split(",")).valueOr:
return err("PingRequest failed to parse peer addr: " & $error)
let timeout = chronos.milliseconds(timeoutMs)
proc ping(): Future[Result[Duration, string]] {.async, gcsafe.} =
try:
let conn = await ctx.myLib[].waku.node.switch.dial(
peerInfo.peerId, peerInfo.addrs, PingCodec
)
defer:
await conn.close()
let pingRTT = await ctx.myLib[].waku.node.libp2pPing.ping(conn)
if pingRTT == 0.nanos:
return err("could not ping peer: rtt-0")
return ok(pingRTT)
except CatchableError as exc:
return err("could not ping peer: " & exc.msg)
let pingFuture = ping()
let pingRTT: Duration =
if timeout == chronos.milliseconds(0): # No timeout expected
(await pingFuture).valueOr:
return err("ping failed, no timeout expected: " & error)
else:
let timedOut = not (await pingFuture.withTimeout(timeout))
if timedOut:
return err("ping timed out")
pingFuture.read().valueOr:
return err("failed to read ping future: " & error)
return ok($(pingRTT.nanos))
let rttNanos = (await ctx.myLib[].waku.pingPeer($peerAddr, int(timeoutMs))).valueOr:
return err(error)
return ok($rttNanos)

View File

@ -1,29 +1,14 @@
import logos_delivery/waku/compat/option_valueor
import options, std/[strutils, sequtils]
import std/[strutils, sequtils]
import chronicles, chronos, results, ffi
import
logos_delivery/waku/waku_filter_v2/client,
logos_delivery,
logos_delivery/waku/waku_core/message/message,
logos_delivery/waku/waku,
logos_delivery/waku/waku_relay,
logos_delivery/waku/waku_filter_v2/common,
logos_delivery/waku/waku_core/subscription/push_handler,
logos_delivery/waku/node/peer_manager/peer_manager,
logos_delivery/waku/waku_node,
logos_delivery/waku/waku_core/topics/pubsub_topic,
logos_delivery/waku/waku_core/topics/content_topic,
library/events/json_message_event,
library/declare_lib
const FilterOpTimeout = 5.seconds
proc checkFilterClientMounted(waku: Waku): Result[string, string] =
if waku.node.wakuFilterClient.isNil():
let errorMsg = "wakuFilterClient is not mounted"
error "fail filter process", error = errorMsg
return err(errorMsg)
return ok("")
proc waku_filter_subscribe(
ctx: ptr FFIContext[LogosDelivery],
callback: FFICallBack,
@ -31,33 +16,20 @@ proc waku_filter_subscribe(
pubSubTopic: cstring,
contentTopics: cstring,
) {.ffi.} =
proc onReceivedMessage(ctx: ptr FFIContext): WakuRelayHandler =
proc onReceivedMessage(ctx: ptr FFIContext[LogosDelivery]): FilterPushHandler =
return proc(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async.} =
callEventCallback(ctx, "onReceivedMessage"):
$JsonMessageEvent.new(pubsubTopic, msg)
checkFilterClientMounted(ctx.myLib[].waku).isOkOr:
return err($error)
var filterPushEventCallback = FilterPushHandler(onReceivedMessage(ctx))
ctx.myLib[].waku.node.wakuFilterClient.registerPushHandler(filterPushEventCallback)
let peer = ctx.myLib[].waku.node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr:
let errorMsg = "could not find peer with WakuFilterSubscribeCodec when subscribing"
error "fail filter subscribe", error = errorMsg
return err(errorMsg)
let subFut = ctx.myLib[].waku.node.filterSubscribe(
some(PubsubTopic($pubsubTopic)),
($contentTopics).split(",").mapIt(ContentTopic(it)),
peer,
)
if not await subFut.withTimeout(FilterOpTimeout):
let errorMsg = "filter subscription timed out"
error "fail filter unsubscribe", error = errorMsg
return err(errorMsg)
(
await ctx.myLib[].waku.filterSubscribe(
PubsubTopic($pubSubTopic),
($contentTopics).split(",").mapIt(ContentTopic(it)),
FilterPushHandler(onReceivedMessage(ctx)),
)
).isOkOr:
error "fail filter subscribe", error = error
return err(error)
return ok("")
proc waku_filter_unsubscribe(
@ -67,43 +39,19 @@ proc waku_filter_unsubscribe(
pubSubTopic: cstring,
contentTopics: cstring,
) {.ffi.} =
checkFilterClientMounted(ctx.myLib[].waku).isOkOr:
return err($error)
let peer = ctx.myLib[].waku.node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr:
let errorMsg =
"could not find peer with WakuFilterSubscribeCodec when unsubscribing"
error "fail filter process", error = errorMsg
return err(errorMsg)
let subFut = ctx.myLib[].waku.node.filterUnsubscribe(
some(PubsubTopic($pubsubTopic)),
($contentTopics).split(",").mapIt(ContentTopic(it)),
peer,
)
if not await subFut.withTimeout(FilterOpTimeout):
let errorMsg = "filter un-subscription timed out"
error "fail filter unsubscribe", error = errorMsg
return err(errorMsg)
(
await ctx.myLib[].waku.filterUnsubscribe(
PubsubTopic($pubSubTopic), ($contentTopics).split(",").mapIt(ContentTopic(it))
)
).isOkOr:
error "fail filter unsubscribe", error = error
return err(error)
return ok("")
proc waku_filter_unsubscribe_all(
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
) {.ffi.} =
checkFilterClientMounted(ctx.myLib[].waku).isOkOr:
return err($error)
let peer = ctx.myLib[].waku.node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr:
let errorMsg =
"could not find peer with WakuFilterSubscribeCodec when unsubscribing all"
error "fail filter unsubscribe all", error = errorMsg
return err(errorMsg)
let unsubFut = ctx.myLib[].waku.node.filterUnsubscribeAll(peer)
if not await unsubFut.withTimeout(FilterOpTimeout):
let errorMsg = "filter un-subscription all timed out"
error "fail filter unsubscribe all", error = errorMsg
return err(errorMsg)
(await ctx.myLib[].waku.filterUnsubscribeAll()).isOkOr:
error "fail filter unsubscribe all", error = error
return err(error)
return ok("")

View File

@ -1,14 +1,9 @@
import logos_delivery/waku/compat/option_valueor
import options, std/[json, strformat]
import std/[json, strformat]
import chronicles, chronos, results, ffi
import
logos_delivery/waku/waku_core/message/message,
logos_delivery/waku/waku_core/codecs,
logos_delivery/waku/waku,
logos_delivery,
logos_delivery/waku/waku_core/message,
logos_delivery/waku/waku_core/topics/pubsub_topic,
logos_delivery/waku/waku_lightpush_legacy/client,
logos_delivery/waku/node/peer_manager/peer_manager,
library/events/json_message_event,
library/declare_lib
@ -19,11 +14,6 @@ proc waku_lightpush_publish(
pubSubTopic: cstring,
jsonWakuMessage: cstring,
) {.ffi.} =
if ctx.myLib[].waku.node.wakuLightpushClient.isNil():
let errorMsg = "LightpushRequest waku.node.wakuLightpushClient is nil"
error "PUBLISH failed", error = errorMsg
return err(errorMsg)
var jsonMessage: JsonMessage
try:
let jsonContent = parseJson($jsonWakuMessage)
@ -35,18 +25,10 @@ proc waku_lightpush_publish(
let msg = json_message_event.toWakuMessage(jsonMessage).valueOr:
return err("Problem building the WakuMessage: " & $error)
let peerOpt = ctx.myLib[].waku.node.peerManager.selectPeer(WakuLightPushCodec)
if peerOpt.isNone():
let errorMsg = "failed to lightpublish message, no suitable remote peers"
error "PUBLISH failed", error = errorMsg
return err(errorMsg)
let msgHashHex = (
await ctx.myLib[].waku.node.wakuLegacyLightpushClient.publish(
$pubsubTopic, msg, peer = peerOpt.get()
)
await ctx.myLib[].waku.lightpushPublish(PubsubTopic($pubSubTopic), msg)
).valueOr:
error "PUBLISH failed", error = error
return err($error)
return err(error)
return ok(msgHashHex)

View File

@ -1,17 +1,10 @@
import logos_delivery/waku/compat/option_valueor
import std/[net, sequtils, strutils, json], strformat
import chronicles, chronos, stew/byteutils, results, ffi
import std/[strutils, json]
import chronicles, chronos, results, ffi
import
logos_delivery/waku/waku_core/message/message,
logos_delivery/waku/factory/validator_signed,
logos_delivery/waku/waku,
tools/confutils/cli_args,
logos_delivery/waku/waku_core/message,
logos_delivery,
logos_delivery/waku/waku_core/topics/pubsub_topic,
logos_delivery/waku/waku_core/topics,
logos_delivery/waku/node/waku_node/relay,
logos_delivery/waku/waku_core/message,
logos_delivery/waku/waku_relay/protocol,
logos_delivery/waku/node/peer_manager,
library/events/json_message_event,
library/declare_lib
@ -21,11 +14,11 @@ proc waku_relay_get_peers_in_mesh(
userData: pointer,
pubSubTopic: cstring,
) {.ffi.} =
let meshPeers = ctx.myLib[].waku.node.wakuRelay.getPeersInMesh($pubsubTopic).valueOr:
let peers = (await ctx.myLib[].waku.relayPeersInMesh(PubsubTopic($pubSubTopic))).valueOr:
error "LIST_MESH_PEERS failed", error = error
return err($error)
return err(error)
## returns a comma-separated string of peerIDs
return ok(meshPeers.mapIt($it).join(","))
return ok(peers.join(","))
proc waku_relay_get_num_peers_in_mesh(
ctx: ptr FFIContext[LogosDelivery],
@ -33,10 +26,10 @@ proc waku_relay_get_num_peers_in_mesh(
userData: pointer,
pubSubTopic: cstring,
) {.ffi.} =
let numPeersInMesh = ctx.myLib[].waku.node.wakuRelay.getNumPeersInMesh($pubsubTopic).valueOr:
let n = (await ctx.myLib[].waku.relayNumPeersInMesh(PubsubTopic($pubSubTopic))).valueOr:
error "NUM_MESH_PEERS failed", error = error
return err($error)
return ok($numPeersInMesh)
return err(error)
return ok($n)
proc waku_relay_get_connected_peers(
ctx: ptr FFIContext[LogosDelivery],
@ -45,11 +38,10 @@ proc waku_relay_get_connected_peers(
pubSubTopic: cstring,
) {.ffi.} =
## Returns the list of all connected peers to an specific pubsub topic
let connPeers = ctx.myLib[].waku.node.wakuRelay.getConnectedPeers($pubsubTopic).valueOr:
let peers = (await ctx.myLib[].waku.relayConnectedPeers(PubsubTopic($pubSubTopic))).valueOr:
error "LIST_CONNECTED_PEERS failed", error = error
return err($error)
## returns a comma-separated string of peerIDs
return ok(connPeers.mapIt($it).join(","))
return err(error)
return ok(peers.join(","))
proc waku_relay_get_num_connected_peers(
ctx: ptr FFIContext[LogosDelivery],
@ -57,10 +49,10 @@ proc waku_relay_get_num_connected_peers(
userData: pointer,
pubSubTopic: cstring,
) {.ffi.} =
let numConnPeers = ctx.myLib[].waku.node.wakuRelay.getNumConnectedPeers($pubsubTopic).valueOr:
let n = (await ctx.myLib[].waku.relayNumConnectedPeers(PubsubTopic($pubSubTopic))).valueOr:
error "NUM_CONNECTED_PEERS failed", error = error
return err($error)
return ok($numConnPeers)
return err(error)
return ok($n)
proc waku_relay_add_protected_shard(
ctx: ptr FFIContext[LogosDelivery],
@ -71,15 +63,12 @@ proc waku_relay_add_protected_shard(
publicKey: cstring,
) {.ffi.} =
## Protects a shard with a public key
try:
let relayShard = RelayShard(clusterId: uint16(clusterId), shardId: uint16(shardId))
let protectedShard = ProtectedShard.parseCmdArg($relayShard & ":" & $publicKey)
ctx.myLib[].waku.node.wakuRelay.addSignedShardsValidator(
@[protectedShard], uint16(clusterId)
(
await ctx.myLib[].waku.relayAddProtectedShard(
uint16(clusterId), uint16(shardId), $publicKey
)
except ValueError as exc:
return err("ERROR in waku_relay_add_protected_shard: " & exc.msg)
).isOkOr:
return err(error)
return ok("")
proc waku_relay_subscribe(
@ -88,20 +77,18 @@ proc waku_relay_subscribe(
userData: pointer,
pubSubTopic: cstring,
) {.ffi.} =
echo "Subscribing to topic: " & $pubSubTopic & " ..."
proc onReceivedMessage(ctx: ptr FFIContext[LogosDelivery]): WakuRelayHandler =
return proc(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async.} =
callEventCallback(ctx, "onReceivedMessage"):
$JsonMessageEvent.new(pubsubTopic, msg)
var cb = onReceivedMessage(ctx)
ctx.myLib[].waku.node.subscribe(
(kind: SubscriptionKind.PubsubSub, topic: $pubsubTopic),
handler = WakuRelayHandler(cb),
(
await ctx.myLib[].waku.relaySubscribe(
PubsubTopic($pubSubTopic), WakuRelayHandler(onReceivedMessage(ctx))
)
).isOkOr:
error "SUBSCRIBE failed", error = error
return err($error)
return err(error)
return ok("")
proc waku_relay_unsubscribe(
@ -110,12 +97,9 @@ proc waku_relay_unsubscribe(
userData: pointer,
pubSubTopic: cstring,
) {.ffi.} =
ctx.myLib[].waku.node.unsubscribe(
(kind: SubscriptionKind.PubsubSub, topic: $pubsubTopic)
).isOkOr:
(await ctx.myLib[].waku.relayUnsubscribe(PubsubTopic($pubSubTopic))).isOkOr:
error "UNSUBSCRIBE failed", error = error
return err($error)
return err(error)
return ok("")
proc waku_relay_publish(
@ -126,31 +110,30 @@ proc waku_relay_publish(
jsonWakuMessage: cstring,
timeoutMs: cuint,
) {.ffi.} =
var
# https://rfc.vac.dev/spec/36/#extern-char-waku_relay_publishchar-messagejson-char-pubsubtopic-int-timeoutms
jsonMessage: JsonMessage
var jsonMessage: JsonMessage
try:
let jsonContent = parseJson($jsonWakuMessage)
jsonMessage = JsonMessage.fromJsonNode(jsonContent).valueOr:
raise newException(JsonParsingError, $error)
except JsonParsingError as exc:
return err(fmt"Error parsing json message: {exc.msg}")
return err("Error parsing json message: " & exc.msg)
let msg = json_message_event.toWakuMessage(jsonMessage).valueOr:
return err("Problem building the WakuMessage: " & $error)
(await ctx.myLib[].waku.node.wakuRelay.publish($pubsubTopic, msg)).isOkOr:
let msgHash = (
await ctx.myLib[].waku.relayPublish(PubsubTopic($pubSubTopic), msg, uint32(timeoutMs))
).valueOr:
error "PUBLISH failed", error = error
return err($error)
let msgHash = computeMessageHash($pubSubTopic, msg).to0xHex
return err(error)
return ok(msgHash)
proc waku_default_pubsub_topic(
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
) {.ffi.} =
# https://rfc.vac.dev/spec/36/#extern-char-waku_default_pubsub_topic
return ok(DefaultPubsubTopic)
let topic = (await ctx.myLib[].waku.defaultPubsubTopic()).valueOr:
return err(error)
return ok(string(topic))
proc waku_content_topic(
ctx: ptr FFIContext[LogosDelivery],
@ -161,9 +144,13 @@ proc waku_content_topic(
contentTopicName: cstring,
encoding: cstring,
) {.ffi.} =
# https://rfc.vac.dev/spec/36/#extern-char-waku_content_topicchar-applicationname-unsigned-int-applicationversion-char-contenttopicname-char-encoding
return ok(fmt"/{$appName}/{$appVersion}/{$contentTopicName}/{$encoding}")
let topic = (
await ctx.myLib[].waku.buildContentTopic(
$appName, uint32(appVersion), $contentTopicName, $encoding
)
).valueOr:
return err(error)
return ok(string(topic))
proc waku_pubsub_topic(
ctx: ptr FFIContext[LogosDelivery],
@ -171,5 +158,6 @@ proc waku_pubsub_topic(
userData: pointer,
topicName: cstring,
) {.ffi.} =
# https://rfc.vac.dev/spec/36/#extern-char-waku_pubsub_topicchar-name-char-encoding
return ok(fmt"/waku/2/{$topicName}")
let topic = (await ctx.myLib[].waku.buildPubsubTopic($topicName)).valueOr:
return err(error)
return ok(string(topic))

View File

@ -1,13 +1,10 @@
import logos_delivery/waku/compat/option_valueor
import std/[json, sugar, strutils, options]
import chronos, chronicles, results, stew/byteutils, ffi
import std/[json, sugar, options]
import chronos, chronicles, results, ffi
import
logos_delivery/waku/waku,
logos_delivery,
library/utils,
logos_delivery/waku/waku_core/peers,
logos_delivery/waku/waku_core/message/digest,
logos_delivery/waku/waku_store/common,
logos_delivery/waku/waku_store/client,
logos_delivery/waku/common/paging,
library/declare_lib
@ -83,13 +80,10 @@ proc waku_store_query(
let storeQueryRequest = ?fromJsonNode(jsonContentRes.get())
let peer = peers.parsePeerInfo(($peerAddr).split(",")).valueOr:
return err("StoreRequest failed to parse peer addr: " & $error)
let queryResponse = (
await ctx.myLib[].waku.node.wakuStoreClient.query(storeQueryRequest, peer)
await ctx.myLib[].waku.storeQuery(storeQueryRequest, $peerAddr, int(timeoutMs))
).valueOr:
return err("StoreRequest failed store query: " & $error)
return err("StoreRequest failed store query: " & error)
let res = $(%*(queryResponse.toHex()))
return ok(res) ## returning the response in json format

View File

@ -71,6 +71,36 @@ extern "C"
void *userData,
const char *messageJson);
// --- Reliable Channels API (stable surface) ---
// Create a reliable channel. Returns the channel id.
int logosdelivery_channel_create(void *ctx,
FFICallBack callback,
void *userData,
const char *channelId,
const char *contentTopic,
const char *senderId);
// Send a message on a reliable channel.
// messageJson: { "payload": "base64-encoded-payload", "ephemeral": false }
// Returns a request ID that can be used to track delivery.
int logosdelivery_channel_send(void *ctx,
FFICallBack callback,
void *userData,
const char *channelId,
const char *messageJson);
// Close a reliable channel: stops its SDS loops; persisted state survives, so
// re-creating the channel restores it.
int logosdelivery_channel_close(void *ctx,
FFICallBack callback,
void *userData,
const char *channelId);
// Channel lifecycle events are delivered through the event callback set via
// logosdelivery_set_event_callback: "onChannelMessageReceived" (payload
// base64-encoded), "onChannelMessageSent", "onChannelMessageError".
// Sets a callback that will be invoked whenever an event occurs.
// It is crucial that the passed callback is fast, non-blocking and potentially thread-safe.
void logosdelivery_set_event_callback(void *ctx,

View File

@ -31,7 +31,8 @@ include
./kernel_api/protocols/relay_api,
./kernel_api/protocols/store_api,
./kernel_api/protocols/lightpush_api,
./kernel_api/protocols/filter_api
./kernel_api/protocols/filter_api,
./channels_api/channel_api
################################################################################
### Exported procs (former libwaku API)

View File

@ -1,11 +1,11 @@
import std/json
import chronos, chronicles, results, ffi
import logos_delivery/waku/common/base64
import
logos_delivery,
logos_delivery/waku/node/waku_node,
logos_delivery/waku/events/message_events,
logos_delivery/api/types,
logos_delivery/waku/events/[message_events, health_events],
logos_delivery/waku/api/events/health_events,
tools/confutils/conf_from_json,
../declare_lib,
../json_event
@ -125,6 +125,40 @@ proc logosdelivery_start_node(
chronicles.error "ConnectionStatusChange.listen failed", err = $error
return err("ConnectionStatusChange.listen failed: " & $error)
let channelReceivedListener = ChannelMessageReceivedEvent.listen(
ctx.myLib[].waku.brokerCtx,
proc(event: ChannelMessageReceivedEvent) {.async: (raises: []).} =
callEventCallback(ctx, "onChannelMessageReceived"):
$(
%*{
"eventType": "channel_message_received",
"channelId": string(event.channelId),
"senderId": $event.senderId,
"payload": string(base64.encode(event.payload)),
}
),
).valueOr:
chronicles.error "ChannelMessageReceivedEvent.listen failed", err = $error
return err("ChannelMessageReceivedEvent.listen failed: " & $error)
let channelSentListener = ChannelMessageSentEvent.listen(
ctx.myLib[].waku.brokerCtx,
proc(event: ChannelMessageSentEvent) {.async: (raises: []).} =
callEventCallback(ctx, "onChannelMessageSent"):
$newJsonEvent("channel_message_sent", event),
).valueOr:
chronicles.error "ChannelMessageSentEvent.listen failed", err = $error
return err("ChannelMessageSentEvent.listen failed: " & $error)
let channelErrorListener = ChannelMessageErrorEvent.listen(
ctx.myLib[].waku.brokerCtx,
proc(event: ChannelMessageErrorEvent) {.async: (raises: []).} =
callEventCallback(ctx, "onChannelMessageError"):
$newJsonEvent("channel_message_error", event),
).valueOr:
chronicles.error "ChannelMessageErrorEvent.listen failed", err = $error
return err("ChannelMessageErrorEvent.listen failed: " & $error)
(await ctx.myLib[].start()).isOkOr:
let errMsg = $error
chronicles.error "START_NODE failed", err = errMsg
@ -142,6 +176,9 @@ proc logosdelivery_stop_node(
await MessagePropagatedEvent.dropAllListeners(ctx.myLib[].waku.brokerCtx)
await MessageReceivedEvent.dropAllListeners(ctx.myLib[].waku.brokerCtx)
await EventConnectionStatusChange.dropAllListeners(ctx.myLib[].waku.brokerCtx)
await ChannelMessageReceivedEvent.dropAllListeners(ctx.myLib[].waku.brokerCtx)
await ChannelMessageSentEvent.dropAllListeners(ctx.myLib[].waku.brokerCtx)
await ChannelMessageErrorEvent.dropAllListeners(ctx.myLib[].waku.brokerCtx)
(await ctx.myLib[].stop()).isOkOr:
let errMsg = $error

View File

@ -0,0 +1,203 @@
import std/options
import chronos, results
import brokers/event_broker
import logos_delivery/api/types as api_types
import logos_delivery/waku/waku_core/topics/pubsub_topic
import logos_delivery/waku/waku_core/message
import logos_delivery/waku/waku_store/common as store_types
export event_broker
export api_types, pubsub_topic, store_types
type IKernel* = ref object of RootObj
EventBroker:
# Internal event emitted when a message arrives from the network via any protocol
type MessageSeenEvent* = object
topic*: PubsubTopic
message*: WakuMessage
# --- topic construction ---
method buildContentTopic*(
self: IKernel, appName: string, appVersion: uint32, name: string, encoding: string
): Future[Result[ContentTopic, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.buildContentTopic not implemented")
method buildPubsubTopic*(
self: IKernel, topicName: string
): Future[Result[PubsubTopic, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.buildPubsubTopic not implemented")
method defaultPubsubTopic*(
self: IKernel
): Future[Result[PubsubTopic, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.defaultPubsubTopic not implemented")
# --- relay ---
method relayPublish*(
self: IKernel, pubsubTopic: PubsubTopic, message: WakuMessage, timeoutMs: uint32
): Future[Result[int, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.relayPublish not implemented")
method relaySubscribe*(
self: IKernel, pubsubTopic: PubsubTopic
): Future[Result[bool, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.relaySubscribe not implemented")
method relayUnsubscribe*(
self: IKernel, pubsubTopic: PubsubTopic
): Future[Result[bool, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.relayUnsubscribe not implemented")
method relayAddProtectedShard*(
self: IKernel, clusterId: uint16, shardId: uint16, publicKey: string
): Future[Result[bool, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.relayAddProtectedShard not implemented")
method relayConnectedPeers*(
self: IKernel, pubsubTopic: PubsubTopic
): Future[Result[seq[string], string]] {.async: (raises: []), base.} =
return err("Interface IKernel.relayConnectedPeers not implemented")
method relayPeersInMesh*(
self: IKernel, pubsubTopic: PubsubTopic
): Future[Result[seq[string], string]] {.async: (raises: []), base.} =
return err("Interface IKernel.relayPeersInMesh not implemented")
# --- filter ---
method filterSubscribe*(
self: IKernel,
pubsubTopic: Option[PubsubTopic],
contentTopics: seq[ContentTopic],
peer: string,
): Future[Result[bool, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.filterSubscribe not implemented")
method filterUnsubscribe*(
self: IKernel,
pubsubTopic: Option[PubsubTopic],
contentTopics: seq[ContentTopic],
peer: string,
): Future[Result[bool, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.filterUnsubscribe not implemented")
method filterUnsubscribeAll*(
self: IKernel, peer: string
): Future[Result[bool, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.filterUnsubscribeAll not implemented")
# --- lightpush ---
method lightpushPublish*(
self: IKernel, pubsubTopic: PubsubTopic, message: WakuMessage, peer: string
): Future[Result[string, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.lightpushPublish not implemented")
# --- store ---
method storeQuery*(
self: IKernel, request: StoreQueryRequest, peer: string, timeoutMs: int
): Future[Result[StoreQueryResponse, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.storeQuery not implemented")
# --- peer management ---
method connect*(
self: IKernel, peers: seq[string], timeoutMs: uint32
): Future[Result[bool, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.connect not implemented")
method disconnectPeerById*(
self: IKernel, peerId: string
): Future[Result[bool, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.disconnectPeerById not implemented")
method disconnectAllPeers*(
self: IKernel
): Future[Result[bool, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.disconnectAllPeers not implemented")
method dialPeer*(
self: IKernel, peerAddr: string, protocol: string, timeoutMs: int
): Future[Result[bool, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.dialPeer not implemented")
method dialPeerById*(
self: IKernel, peerId: string, protocol: string, timeoutMs: int
): Future[Result[bool, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.dialPeerById not implemented")
method peerIdsFromPeerstore*(
self: IKernel
): Future[Result[seq[string], string]] {.async: (raises: []), base.} =
return err("Interface IKernel.peerIdsFromPeerstore not implemented")
method connectedPeersInfo*(
self: IKernel
): Future[Result[seq[string], string]] {.async: (raises: []), base.} =
return err("Interface IKernel.connectedPeersInfo not implemented")
method connectedPeers*(
self: IKernel
): Future[Result[seq[string], string]] {.async: (raises: []), base.} =
return err("Interface IKernel.connectedPeers not implemented")
method peerIdsByProtocol*(
self: IKernel, protocol: string
): Future[Result[seq[string], string]] {.async: (raises: []), base.} =
return err("Interface IKernel.peerIdsByProtocol not implemented")
# --- discovery ---
method dnsDiscovery*(
self: IKernel, enrTreeUrl: string, nameServer: string, timeoutMs: int
): Future[Result[seq[string], string]] {.async: (raises: []), base.} =
return err("Interface IKernel.dnsDiscovery not implemented")
method discv5UpdateBootnodes*(
self: IKernel, bootnodes: seq[string]
): Future[Result[bool, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.discv5UpdateBootnodes not implemented")
method startDiscv5*(
self: IKernel
): Future[Result[bool, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.startDiscv5 not implemented")
method stopDiscv5*(
self: IKernel
): Future[Result[bool, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.stopDiscv5 not implemented")
method peerExchangeRequest*(
self: IKernel, numPeers: uint64
): Future[Result[int, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.peerExchangeRequest not implemented")
# --- debug / info ---
method version*(
self: IKernel
): Future[Result[string, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.version not implemented")
method listenAddresses*(
self: IKernel
): Future[Result[seq[string], string]] {.async: (raises: []), base.} =
return err("Interface IKernel.listenAddresses not implemented")
method myEnr*(
self: IKernel
): Future[Result[string, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.myEnr not implemented")
method myPeerId*(
self: IKernel
): Future[Result[string, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.myPeerId not implemented")
method metrics*(
self: IKernel
): Future[Result[string, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.metrics not implemented")
method pingPeer*(
self: IKernel, peerAddr: string, timeoutMs: int
): Future[Result[int64, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.pingPeer not implemented")

View File

@ -0,0 +1,33 @@
## `LogosDelivery` is the project entry point. It is a pure concentrator: it
## owns exactly one instance of each API layer
##
## Waku <- MessagingClient <- ReliableChannelManager
##
## and chains them together (each layer drives the one below it). Every layer
## keeps its own, separate public API — `LogosDelivery` only wires them up and
## drives the shared `new` / `start` / `stop` lifecycle.
{.push raises: [].}
import results, chronos
import brokers/event_broker
import types as api_types
export api_types, event_broker
type
## Entry point. Holds one instance of each API layer.
ILogosDelivery* = ref object of RootObj
EventBroker:
type EventConnectionStatusChange* = object
connectionStatus*: ConnectionStatus
method start*(self: ILogosDelivery): Future[Result[void, string]] {.async, base.} =
return err("ILogosDelivery.start not implemented")
method stop*(self: ILogosDelivery): Future[Result[void, string]] {.async, base.} =
return err("ILogosDelivery.stop not implemented")
method isOnline*(self: ILogosDelivery): Future[Result[bool, string]] {.async, base.} =
return err("ILogosDelivery.isOnline not implemented")

View File

@ -0,0 +1,49 @@
import chronos, results
import brokers/event_broker
import logos_delivery/api/types as api_types
import logos_delivery/waku/waku_core/message
export event_broker, api_types
type IMessagingClient* = ref object of RootObj
EventBroker:
# Event emitted when a message is sent to the network
type MessageSentEvent* = object
requestId*: RequestId
messageHash*: string
EventBroker:
# Event emitted when a message send operation fails
type MessageErrorEvent* = object
requestId*: RequestId
messageHash*: string
error*: string
EventBroker:
# Confirmation that a message has been correctly delivered to some neighbouring nodes.
type MessagePropagatedEvent* = object
requestId*: RequestId
messageHash*: string
EventBroker:
# Event emitted when a message is received via Waku
type MessageReceivedEvent* = object
messageHash*: string
message*: WakuMessage
method subscribe*(
self: IMessagingClient, contentTopic: ContentTopic
): Future[Result[void, string]] {.async: (raises: []), base.} =
return err("Interface IMessagingClient.subscribe not implemented")
method unsubscribe*(
self: IMessagingClient, contentTopic: ContentTopic
): Result[void, string] {.base, raises: [].} =
return err("Interface IMessagingClient.unsubscribe not implemented")
method send*(
self: IMessagingClient, envelope: MessageEnvelope
): Future[Result[RequestId, string]] {.async: (raises: []), base.} =
return err("Interface IMessagingClient.send not implemented")

View File

@ -0,0 +1,70 @@
import chronos, results
import brokers/event_broker
import logos_delivery/api/types as api_types
import logos_delivery/channels/types as channel_types
# The channel layer re-uses the messaging-layer message events (the `requestId`
# is shared across layers), so it re-exports the messaging interface's event
# surface and only adds the channel-level events that have no lower-layer
# analogue (reassembled payload / senderId / channelId).
import logos_delivery/api/messaging_client_api
export event_broker, api_types
export channel_types, messaging_client_api
type
IReliableChannelManager* = ref object of RootObj
SendHandler* = proc(envelope: MessageEnvelope): Future[Result[RequestId, string]] {.
async: (raises: [CatchableError]), gcsafe
.}
## Egress dispatch boundary. Typically wraps `MessagingClient.send`;
## tests inject a fake that records calls and returns canned
## `RequestId`s so the send state machine can be exercised end-to-end
## without a network.
EventBroker:
type ChannelMessageReceivedEvent* = object
channelId*: ChannelId
senderId*: SdsParticipantID
payload*: seq[byte]
EventBroker:
## Emitted when every segment of a channel-level `send()` reached
## `Confirmed`. Channel-level analogue of `MessageSentEvent`; the
## `requestId` is the channel-layer parent returned by `send()`.
type ChannelMessageSentEvent* = object
channelId*: ChannelId
requestId*: RequestId
EventBroker:
## Emitted when a channel-level `send()` finalises with at least one
## segment in `Failed`. Channel-level analogue of `MessageErrorEvent`.
type ChannelMessageErrorEvent* = object
channelId*: ChannelId
requestId*: RequestId
error*: string
method createReliableChannel*(
self: IReliableChannelManager,
channelId: ChannelId,
contentTopic: ContentTopic,
senderId: SdsParticipantID,
sendHandler: SendHandler = nil,
): Result[ChannelId, string] {.base.} =
return err("Interface IReliableChannelManager.createReliableChannel not implemented")
method closeChannel*(
self: IReliableChannelManager, channelId: ChannelId
): Future[Result[void, string]] {.async: (raises: []), base.} =
return err("Interface IReliableChannelManager.closeChannel not implemented")
method send*(
self: IReliableChannelManager,
channelId: ChannelId,
appPayload: seq[byte],
ephemeral: bool = false,
): Future[Result[RequestId, string]] {.async: (raises: []), base.} =
return err("Interface IReliableChannelManager.send not implemented")

View File

@ -0,0 +1,89 @@
## Reliable Channel layer API — channel lifecycle
## (createReliableChannel / closeChannel).
import std/[options, tables]
import results, chronos, chronicles
import logos_delivery/api/types
import logos_delivery/channels/reliable_channel_manager
import logos_delivery/channels/reliable_channel
import logos_delivery/waku/persistency/sds_persistency
# ReliableChannel, SendHandler, config and wire-version markers.
export reliable_channel
const SdsJobId = "sds"
## One persistency job shared by every channel's SDS state; rows are
## keyed by channelId.
proc sdsPersistence(): Option[Persistence] =
## SDS backend from the Persistency singleton; memory-only fallback when
## it is unavailable (e.g. unit tests).
let p = Persistency.instance().valueOr:
info "SDS persistence disabled, running memory-only", reason = $error
return none(Persistence)
let job = p.openJob(SdsJobId).valueOr:
warn "SDS persistence disabled, could not open persistency job",
jobId = SdsJobId, reason = $error
return none(Persistence)
return some(newSdsPersistence(job))
proc createReliableChannel*(
self: ReliableChannelManager,
channelId: ChannelId,
contentTopic: ContentTopic,
senderId: SdsParticipantID,
sendHandler: SendHandler = nil,
): Result[ChannelId, string] =
## Spec entry point. The `sendHandler` and `rng` the channel needs are
## sourced from the owning `ReliableChannelManager` rather than passed
## per call. Encryption is wired up through the `Encrypt`/`Decrypt`
## request brokers — the application installs its own providers
## (or `setNoopEncryption()`) before traffic flows.
##
## `sendHandler` defaults to the manager's default (constructed at mount
## from `MessagingClient.send`); tests pass a fake to bypass the network.
if self.channels.hasKey(channelId):
return err("channel already exists: " & channelId)
let segConfig = SegmentationConfig(
segmentSizeBytes: DefaultSegmentSizeBytes,
enableReedSolomon: false,
persistence: nil,
)
let sdsConfig = SdsConfig(
acknowledgementTimeoutMs: DefaultAcknowledgementTimeoutMs,
maxRetransmissions: DefaultMaxRetransmissions,
causalHistorySize: DefaultCausalHistorySize,
persistence: sdsPersistence(),
)
let rateConfig = RateLimitConfig(
epochPeriodSec: DefaultEpochPeriodSec, messagesPerEpoch: DefaultMessagesPerEpoch
)
let effectiveSendHandler = if sendHandler.isNil(): self.sendHandler else: sendHandler
let chn = ReliableChannel.new(
sendHandler = effectiveSendHandler,
channelId = channelId,
contentTopic = contentTopic,
senderId = senderId,
segConfig = segConfig,
sdsConfig = sdsConfig,
rateConfig = rateConfig,
brokerCtx = self.brokerCtx,
)
self.channels[channelId] = chn
return ok(channelId)
proc closeChannel*(
self: ReliableChannelManager, channelId: ChannelId
): Future[Result[void, string]] {.async: (raises: []).} =
## Stops the channel's SDS loops and releases the channel. Persisted SDS
## state survives, so re-creating the channel restores it.
let chn = self.channels.getOrDefault(channelId)
if chn.isNil():
return err("unknown channel: " & channelId)
self.channels.del(channelId)
await chn.stop()
return ok()

View File

@ -0,0 +1,21 @@
## Reliable Channel layer API — channel send operation.
import std/tables
import results, chronos
import logos_delivery/api/types
import logos_delivery/channels/reliable_channel_manager
import logos_delivery/channels/reliable_channel
proc send*(
self: ReliableChannelManager,
channelId: ChannelId,
appPayload: seq[byte],
ephemeral: bool = false,
): Future[Result[RequestId, string]] {.async: (raises: []).} =
## Spec-level entry point. Looks the channel up by id and delegates
## to `ReliableChannel.send`, which exposes the visible pipeline
## segmentation -> sds -> rate_limit_manager -> encryption.
let chn = self.channels.getOrDefault(channelId)
if chn.isNil():
return err("unknown channel: " & channelId)
return await chn.send(appPayload, ephemeral)

View File

@ -1,39 +0,0 @@
## Reliable Channel event types emitted to API consumers.
##
## Lifecycle events for individual segments (sent / propagated / errored)
## are the same as the network-level ones the MessagingClient already
## emits — `requestId` is shared across layers — so we just re-export
## `waku/events/message_events` and avoid declaring duplicates.
##
## Only the channel-level `MessageReceivedEvent` carries data that has
## no analogue in the lower layer (reassembled application payload,
## senderId, channelId), so it lives here.
import logos_delivery/waku/events/message_events as waku_message_events
import brokers/event_broker
import ./types as channel_types
export waku_message_events, channel_types, event_broker
EventBroker:
type ChannelMessageReceivedEvent* = object
channelId*: ChannelId
senderId*: SdsParticipantID
payload*: seq[byte]
EventBroker:
## Emitted when every segment of a channel-level `send()` reached
## `Confirmed`. Channel-level analogue of `MessageSentEvent`; the
## `requestId` is the channel-layer parent returned by `send()`.
type ChannelMessageSentEvent* = object
channelId*: ChannelId
requestId*: RequestId
EventBroker:
## Emitted when a channel-level `send()` finalises with at least one
## segment in `Failed`. Channel-level analogue of `MessageErrorEvent`.
type ChannelMessageErrorEvent* = object
channelId*: ChannelId
requestId*: RequestId
error*: string

View File

@ -22,18 +22,18 @@ import stew/byteutils
import libp2p/crypto/crypto as libp2p_crypto
import logos_delivery/api/types
import logos_delivery/api/reliable_channel_manager_api
import logos_delivery/messaging/delivery_service/send_service
import logos_delivery/waku/waku_core/topics
import ./events
import ./segmentation/segmentation
import ./scalable_data_sync/scalable_data_sync
import ./rate_limit_manager/rate_limit_manager
import ./encryption/encryption
export
types, send_service, events, segmentation, scalable_data_sync, rate_limit_manager,
encryption
types, reliable_channel_manager_api, send_service, segmentation,
scalable_data_sync, rate_limit_manager, encryption
const LipWireReliableChannelVersion* = "RELIABLE-CHANNEL-API/1"
## Wire-format spec marker for the Reliable Channel layer, as defined
@ -44,14 +44,6 @@ const LipWireReliableChannelVersion* = "RELIABLE-CHANNEL-API/1"
## on breaking on-the-wire changes; implementations pin one version.
type
SendHandler* = proc(envelope: MessageEnvelope): Future[Result[RequestId, string]] {.
async: (raises: [CatchableError]), gcsafe
.}
## Egress dispatch boundary. Typically wraps `MessagingClient.send`;
## tests inject a fake that records calls and returns canned
## `RequestId`s so the send state machine can be exercised end-to-end
## without a network.
MessagePersistence {.pure.} = enum
Persistent
Ephemeral

View File

@ -13,35 +13,29 @@ import stew/byteutils
import brokers/broker_context
import logos_delivery/waku/events/message_events as waku_message_events
import logos_delivery/messaging/messaging_client
import logos_delivery/messaging/api/send
import logos_delivery/api/types
import logos_delivery/waku/waku_core/topics
import logos_delivery/waku/persistency/sds_persistency
import logos_delivery/api/reliable_channel_manager_api
import ./reliable_channel
import ./encryption/noop_encryption
export reliable_channel
const SdsJobId = "sds"
## One persistency job shared by every channel's SDS state; rows are
## keyed by channelId.
type
ReliableChannelManagerConf* = object
## Per-layer config object for the reliable
## channel API. Placeholder for now (segmentation / SDS / rate-limit defaults
## will move here in a follow-up PR); kept so each layer owns its own config.
ReliableChannelManager* = ref object
channels: Table[ChannelId, ReliableChannel]
ReliableChannelManager* = ref object of IReliableChannelManager
channels*: Table[ChannelId, ReliableChannel] ## read by `channels/api.nim`
messagingClient: MessagingClient ## The channel layer chains onto messaging.
sendHandler: SendHandler
sendHandler*: SendHandler
## Default egress dispatch for channels created through this manager.
## Built in `new` as a closure over `MessagingClient.send` so the channel
## layer itself stays callable-only.
brokerCtx: BrokerContext
brokerCtx*: BrokerContext
proc new*(
T: type ReliableChannelManager,
@ -82,96 +76,6 @@ proc stop*(self: ReliableChannelManager) {.async.} =
await chn.stop()
self.channels.clear()
proc sdsPersistence(): Option[Persistence] =
## SDS backend from the Persistency singleton; memory-only fallback when
## it is unavailable (e.g. unit tests).
let p = Persistency.instance().valueOr:
info "SDS persistence disabled, running memory-only", reason = $error
return none(Persistence)
let job = p.openJob(SdsJobId).valueOr:
warn "SDS persistence disabled, could not open persistency job",
jobId = SdsJobId, reason = $error
return none(Persistence)
return some(newSdsPersistence(job))
proc createReliableChannel*(
self: ReliableChannelManager,
channelId: ChannelId,
contentTopic: ContentTopic,
senderId: SdsParticipantID,
sendHandler: SendHandler = nil,
): Result[ChannelId, string] =
## Spec entry point. The `sendHandler` and `rng` the channel needs are
## sourced from the owning `ReliableChannelManager` rather than passed
## per call. Encryption is wired up through the `Encrypt`/`Decrypt`
## request brokers — the application installs its own providers
## (or `setNoopEncryption()`) before traffic flows.
##
## Segmentation, SDS and rate-limit configs will eventually be read
## from the node's `NodeConfig`. Defaults for now.
##
## `sendHandler` defaults to the manager's default (constructed at mount
## from `MessagingClient.send`); tests pass a fake to bypass the network.
if self.channels.hasKey(channelId):
return err("channel already exists: " & channelId)
let segConfig = SegmentationConfig(
segmentSizeBytes: DefaultSegmentSizeBytes,
enableReedSolomon: false,
persistence: nil,
)
let sdsConfig = SdsConfig(
acknowledgementTimeoutMs: DefaultAcknowledgementTimeoutMs,
maxRetransmissions: DefaultMaxRetransmissions,
causalHistorySize: DefaultCausalHistorySize,
persistence: sdsPersistence(),
)
let rateConfig = RateLimitConfig(
epochPeriodSec: DefaultEpochPeriodSec, messagesPerEpoch: DefaultMessagesPerEpoch
)
let effectiveSendHandler = if sendHandler.isNil(): self.sendHandler else: sendHandler
let chn = ReliableChannel.new(
sendHandler = effectiveSendHandler,
channelId = channelId,
contentTopic = contentTopic,
senderId = senderId,
segConfig = segConfig,
sdsConfig = sdsConfig,
rateConfig = rateConfig,
brokerCtx = self.brokerCtx,
)
self.channels[channelId] = chn
return ok(channelId)
proc closeChannel*(
self: ReliableChannelManager, channelId: ChannelId
): Future[Result[void, string]] {.async: (raises: []).} =
## Stops the channel's SDS loops and releases the channel. Persisted SDS
## state survives, so re-creating the channel restores it.
let chn = self.channels.getOrDefault(channelId)
if chn.isNil():
return err("unknown channel: " & channelId)
self.channels.del(channelId)
await chn.stop()
return ok()
proc send*(
self: ReliableChannelManager,
channelId: ChannelId,
appPayload: seq[byte],
ephemeral: bool = false,
): Future[Result[RequestId, string]] {.async: (raises: []).} =
## Spec-level entry point. Looks the channel up by id and delegates
## to `ReliableChannel.send`, which exposes the visible pipeline
## segmentation -> sds -> rate_limit_manager -> encryption.
let chn = self.channels.getOrDefault(channelId)
if chn.isNil():
return err("unknown channel: " & channelId)
return await chn.send(appPayload, ephemeral)
## Inbound messages are not handed to the manager by direct call. Each
## `ReliableChannel` installs its own `MessageReceivedEvent` listener
## in `ReliableChannel.new`, filters by spec marker and `contentTopic`,

View File

@ -11,12 +11,45 @@
import results, chronos, chronicles
import logos_delivery/api/logos_delivery_api
export logos_delivery_api
# Each layer has a core module (type + new/start/stop) and an api/ folder whose
# modules each implement a differentiated set of operations, plus an events
# surface. The concentrator re-exports them so library consumers get the full
# surface from `import logos_delivery`. (The per-layer `events` modules share a
# stem, so they are imported under aliases.)
# Waku layer
import logos_delivery/waku/waku
export waku
import
logos_delivery/waku/api/[
topics, relay, filter, lightpush, store, peer_manager, discovery, debug, health,
ping,
]
export
topics, relay, filter, lightpush, store, peer_manager, discovery, debug, health, ping
# `MessageSeenEvent` is surfaced via `export waku` (Kernel interface); the
# remaining waku health events live here.
import logos_delivery/waku/api/events/health_events
export health_events
# Messaging layer
import logos_delivery/messaging/messaging_client
export messaging_client
import logos_delivery/messaging/api/[subscription, send]
export subscription, send
# Message* events are surfaced via `export messaging_client` (messaging interface).
# Reliable Channel layer
import logos_delivery/channels/reliable_channel_manager
export reliable_channel_manager
import logos_delivery/channels/api/channel_lifecycle
export channel_lifecycle
import logos_delivery/channels/api/send as channel_send
export channel_send
# ChannelMessage* events are surfaced via `export reliable_channel_manager`.
import logos_delivery/waku/factory/waku_conf
import logos_delivery/waku/factory/app_callbacks
@ -35,7 +68,8 @@ type
messaging*: MessagingClientConf
reliableChannel*: ReliableChannelManagerConf
LogosDelivery* = ref object ## Entry point. Holds one instance of each API layer.
LogosDelivery* = ref object of ILogosDelivery
## Entry point. Holds one instance of each API layer.
waku*: Waku
messagingClient*: MessagingClient
reliableChannelManager*: ReliableChannelManager
@ -79,7 +113,7 @@ proc new*(
)
)
proc start*(self: LogosDelivery): Future[Result[void, string]] {.async.} =
method start*(self: LogosDelivery): Future[Result[void, string]] {.async.} =
## Starts each layer bottom-up: transport first, then messaging, then channels.
if self.waku.isNil():
return err("Waku node is not initialized")
@ -99,7 +133,7 @@ proc start*(self: LogosDelivery): Future[Result[void, string]] {.async.} =
return ok()
proc stop*(self: LogosDelivery): Future[Result[void, string]] {.async.} =
method stop*(self: LogosDelivery): Future[Result[void, string]] {.async.} =
## Stops in reverse order so higher layers drain before their dependencies.
await self.reliableChannelManager.stop()
await self.messagingClient.stop()
@ -109,7 +143,7 @@ proc stop*(self: LogosDelivery): Future[Result[void, string]] {.async.} =
return ok()
proc isOnline*(self: LogosDelivery): Future[Result[bool, string]] {.async.} =
method isOnline*(self: LogosDelivery): Future[Result[bool, string]] {.async.} =
if self.waku.isNil():
return err("Waku node is not initialized")
return ok(self.waku.healthMonitor.onlineMonitor.amIOnline())

View File

@ -0,0 +1,34 @@
## Messaging layer API — send operation.
import results, chronos, chronicles
import logos_delivery/api/types
import logos_delivery/messaging/messaging_client
import logos_delivery/waku/node/[waku_node, subscription_manager]
import logos_delivery/messaging/delivery_service/send_service
import logos_delivery/messaging/delivery_service/send_service/delivery_task
proc send*(
self: MessagingClient, envelope: MessageEnvelope
): Future[Result[RequestId, string]] {.async.} =
## High-level messaging API send. Auto-subscribes to the content topic
## (so the local node sees its own gossipsub broadcast), builds a
## `DeliveryTask`, and hands it to the send service. Returns the request
## id the caller can correlate with `MessageSentEvent` / `MessageErrorEvent`.
?self.checkApiAvailability()
let isSubbed =
self.node.subscriptionManager.isSubscribed(envelope.contentTopic).valueOr(false)
if not isSubbed:
info "Auto-subscribing to topic on send", contentTopic = envelope.contentTopic
self.node.subscriptionManager.subscribe(envelope.contentTopic).isOkOr:
warn "Failed to auto-subscribe", error = error
return err("Failed to auto-subscribe before sending: " & error)
let requestId = RequestId.new(self.node.rng)
let deliveryTask = DeliveryTask.new(requestId, envelope, self.node.brokerCtx).valueOr:
return err("MessagingClient.send: Failed to create delivery task: " & error)
asyncSpawn self.sendService.send(deliveryTask)
return ok(requestId)

View File

@ -0,0 +1,18 @@
## Messaging layer API — subscription operations.
import results, chronos
import logos_delivery/api/types
import logos_delivery/messaging/messaging_client
import logos_delivery/waku/node/[waku_node, subscription_manager]
proc subscribe*(
self: MessagingClient, contentTopic: ContentTopic
): Future[Result[void, string]] {.async.} =
?self.checkApiAvailability()
return self.node.subscriptionManager.subscribe(contentTopic)
proc unsubscribe*(
self: MessagingClient, contentTopic: ContentTopic
): Result[void, string] =
?self.checkApiAvailability()
return self.node.subscriptionManager.unsubscribe(contentTopic)

View File

@ -13,11 +13,13 @@ import
waku_store/client,
waku_store/common,
waku_filter_v2/client,
events/message_events,
events/health_events,
waku_node,
node/subscription_manager,
]
import
logos_delivery/api/kernel_api, # MessageSeenEvent
logos_delivery/api/messaging_client_api, # MessageReceivedEvent
logos_delivery/api/logos_delivery_api # EventConnectionStatusChange
const MaxMessageLife = chronos.minutes(7) ## Max time we will keep track of rx messages

View File

@ -18,8 +18,8 @@ import
waku_rln_relay/rln_relay,
waku_lightpush/client,
waku_lightpush/callbacks,
events/message_events,
]
import logos_delivery/api/messaging_client_api
logScope:
topics = "send service"

View File

@ -1,10 +1,14 @@
## Messaging layer core: the `MessagingClient` type plus its construction and
## lifecycle. The public operations (subscribe / unsubscribe / send) live in
## `messaging/api.nim`.
import results, chronos
import chronicles
import
logos_delivery/api/types,
logos_delivery/waku/node/[waku_node, subscription_manager],
logos_delivery/messaging/delivery_service/[recv_service, send_service],
logos_delivery/messaging/delivery_service/send_service/delivery_task
logos_delivery/api/messaging_client_api,
logos_delivery/waku/node/waku_node,
logos_delivery/messaging/delivery_service/[recv_service, send_service]
# Surfaces the messaging API interface (and its Message* events) to consumers.
export messaging_client_api
type
MessagingClientConf* = object
@ -13,8 +17,8 @@ type
## follow-up PR. Today it only carries the p2p reliability toggle.
useP2PReliability*: bool
MessagingClient* = ref object
node: WakuNode
MessagingClient* = ref object of IMessagingClient
node*: WakuNode ## Waku core driven by this layer; read by `messaging/api.nim`.
sendService*: SendService
recvService*: RecvService
started: bool
@ -43,46 +47,9 @@ proc stop*(self: MessagingClient) {.async.} =
await self.recvService.stopRecvService()
self.started = false
proc checkApiAvailability(self: MessagingClient): Result[void, string] =
proc checkApiAvailability*(self: MessagingClient): Result[void, string] =
## Shared guard for the api operation module.
if self.isNil():
return err("MessagingClient is not initialized")
return ok()
proc subscribe*(
self: MessagingClient, contentTopic: ContentTopic
): Future[Result[void, string]] {.async.} =
?checkApiAvailability(self)
return self.node.subscriptionManager.subscribe(contentTopic)
proc unsubscribe*(
self: MessagingClient, contentTopic: ContentTopic
): Result[void, string] =
?checkApiAvailability(self)
return self.node.subscriptionManager.unsubscribe(contentTopic)
proc send*(
self: MessagingClient, envelope: MessageEnvelope
): Future[Result[RequestId, string]] {.async.} =
## High-level messaging API send. Auto-subscribes to the content topic
## (so the local node sees its own gossipsub broadcast), builds a
## `DeliveryTask`, and hands it to the send service. Returns the request
## id the caller can correlate with `MessageSentEvent` / `MessageErrorEvent`.
let isSubbed =
self.node.subscriptionManager.isSubscribed(envelope.contentTopic).valueOr(false)
if not isSubbed:
info "Auto-subscribing to topic on send", contentTopic = envelope.contentTopic
self.node.subscriptionManager.subscribe(envelope.contentTopic).isOkOr:
warn "Failed to auto-subscribe", error = error
return err("Failed to auto-subscribe before sending: " & error)
let requestId = RequestId.new(self.node.rng)
let deliveryTask = DeliveryTask.new(requestId, envelope, self.node.brokerCtx).valueOr:
return err("MessagingClient.send: Failed to create delivery task: " & error)
asyncSpawn self.sendService.send(deliveryTask)
return ok(requestId)

View File

@ -0,0 +1,36 @@
## Waku layer API — debug / info operations.
{.push raises: [].}
import results, chronos, chronicles, metrics
import eth/p2p/discoveryv5/enr
import logos_delivery/waku/waku
import logos_delivery/waku/[waku_core, node/waku_node]
proc version*(self: Waku): Future[Result[string, string]] {.async.} =
return ok(WakuNodeVersionString)
proc listenAddresses*(self: Waku): Future[Result[seq[string], string]] {.async.} =
try:
return ok(self.node.info().listenAddresses)
except CatchableError as e:
return err(e.msg)
proc myEnr*(self: Waku): Future[Result[string, string]] {.async.} =
try:
return ok(self.node.enr.toURI())
except CatchableError as e:
return err(e.msg)
proc myPeerId*(self: Waku): Future[Result[string, string]] {.async.} =
try:
return ok($self.node.peerId())
except CatchableError as e:
return err(e.msg)
proc metrics*(self: Waku): Future[Result[string, string]] {.async.} =
{.gcsafe.}:
try:
return ok(defaultRegistry.toText())
except CatchableError as e:
return err(e.msg)

View File

@ -0,0 +1,76 @@
## Waku layer API — discovery operations (DNS, discv5, peer exchange).
{.push raises: [].}
import std/[net, sequtils]
import results, chronos, chronicles
import logos_delivery/waku/waku
import
logos_delivery/waku/[
waku_core,
node/waku_node,
node/waku_node/peer_exchange,
discovery/waku_dnsdisc,
discovery/waku_discv5,
]
proc dnsDiscovery*(
self: Waku, enrTreeUrl: string, nameServer: string, timeoutMs: int
): Future[Result[seq[string], string]] {.async.} =
try:
let dnsNameServers = @[parseIpAddress(nameServer)]
let discoveredPeers = (
await retrieveDynamicBootstrapNodes(enrTreeUrl, dnsNameServers)
).valueOr:
return err("failed discovering peers from DNS: " & $error)
var multiAddresses = newSeq[string]()
for discPeer in discoveredPeers:
for address in discPeer.addrs:
multiAddresses.add($address & "/p2p/" & $discPeer)
return ok(multiAddresses)
except CatchableError as e:
return err(e.msg)
proc discv5UpdateBootnodes*(
self: Waku, bootnodes: string
): Future[Result[bool, string]] {.async.} =
## `bootnodes` is a JSON array of ENRs, e.g. `["enr:...", "enr:..."]`.
try:
if self.wakuDiscv5.isNil():
return err("discv5 not started")
self.wakuDiscv5.updateBootstrapRecords(bootnodes).isOkOr:
return err("error in discv5UpdateBootnodes: " & $error)
return ok(true)
except CatchableError as e:
return err(e.msg)
proc startDiscv5*(self: Waku): Future[Result[bool, string]] {.async.} =
try:
if self.wakuDiscv5.isNil():
return err("discv5 not started")
(await self.wakuDiscv5.start()).isOkOr:
return err("error starting discv5: " & $error)
return ok(true)
except CatchableError as e:
return err(e.msg)
proc stopDiscv5*(self: Waku): Future[Result[bool, string]] {.async.} =
try:
if self.wakuDiscv5.isNil():
return err("discv5 not started")
await self.wakuDiscv5.stop()
return ok(true)
except CatchableError as e:
return err(e.msg)
proc peerExchangeRequest*(
self: Waku, numPeers: uint64
): Future[Result[int, string]] {.async.} =
try:
let numPeersRecv = (await self.node.fetchPeerExchangePeers(numPeers)).valueOr:
return err("failed peer exchange: " & $error)
return ok(numPeersRecv)
except CatchableError as e:
return err(e.msg)

View File

@ -0,0 +1,3 @@
import ./[filter_subscribe_events, health_events, peer_events, discovery_events]
export filter_subscribe_events, health_events, peer_events, discovery_events

View File

@ -6,10 +6,8 @@ import logos_delivery/waku/waku_core/topics
export protocol_health, topic_health
# Notify health changes to node connectivity
EventBroker:
type EventConnectionStatusChange* = object
connectionStatus*: ConnectionStatus
# Note: `EventConnectionStatusChange` lives in `logos_delivery/api/logos_delivery_api`
# (the top-level orchestrator interface owns the node-connectivity event).
# Notify health changes to a subscribed topic
# TODO: emit content topic health change events when subscribe/unsubscribe

View File

@ -0,0 +1,88 @@
## Waku layer API — filter (light client) operations.
import logos_delivery/waku/compat/option_valueor
{.push raises: [].}
import std/options
import results, chronos, chronicles
import logos_delivery/waku/waku
import
logos_delivery/waku/[
waku_core,
waku_core/subscription/push_handler,
node/waku_node,
node/waku_node/filter,
node/peer_manager,
waku_filter_v2/client,
waku_filter_v2/common,
]
const FilterOpTimeout = 5.seconds
proc filterSubscribe*(
self: Waku,
pubsubTopic: PubsubTopic,
contentTopics: seq[ContentTopic],
pushHandler: FilterPushHandler,
): Future[Result[bool, string]] {.async.} =
## Registers `pushHandler` for incoming filtered messages, selects a filter
## service peer, and subscribes.
try:
if self.node.wakuFilterClient.isNil():
return err("wakuFilterClient is not mounted")
self.node.wakuFilterClient.registerPushHandler(pushHandler)
let peer = self.node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr:
return err("could not find peer with WakuFilterSubscribeCodec when subscribing")
let subFut = self.node.filterSubscribe(some(pubsubTopic), contentTopics, peer)
if not await subFut.withTimeout(FilterOpTimeout):
return err("filter subscription timed out")
subFut.read().isOkOr:
return err($error)
return ok(true)
except CatchableError as e:
return err(e.msg)
proc filterUnsubscribe*(
self: Waku, pubsubTopic: PubsubTopic, contentTopics: seq[ContentTopic]
): Future[Result[bool, string]] {.async.} =
## Selects a filter service peer and unsubscribes the given content topics.
try:
if self.node.wakuFilterClient.isNil():
return err("wakuFilterClient is not mounted")
let peer = self.node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr:
return err("could not find peer with WakuFilterSubscribeCodec when unsubscribing")
let unsubFut = self.node.filterUnsubscribe(some(pubsubTopic), contentTopics, peer)
if not await unsubFut.withTimeout(FilterOpTimeout):
return err("filter un-subscription timed out")
unsubFut.read().isOkOr:
return err($error)
return ok(true)
except CatchableError as e:
return err(e.msg)
proc filterUnsubscribeAll*(self: Waku): Future[Result[bool, string]] {.async.} =
## Selects a filter service peer and unsubscribes from everything.
try:
if self.node.wakuFilterClient.isNil():
return err("wakuFilterClient is not mounted")
let peer = self.node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr:
return
err("could not find peer with WakuFilterSubscribeCodec when unsubscribing all")
let unsubFut = self.node.filterUnsubscribeAll(peer)
if not await unsubFut.withTimeout(FilterOpTimeout):
return err("filter un-subscription all timed out")
unsubFut.read().isOkOr:
return err($error)
return ok(true)
except CatchableError as e:
return err(e.msg)

View File

@ -0,0 +1,13 @@
## Waku layer API — health / connectivity.
{.push raises: [].}
import results, chronos, chronicles
import logos_delivery/waku/waku
import logos_delivery/waku/[node/health_monitor, node/health_monitor/online_monitor]
proc isOnline*(self: Waku): Future[Result[bool, string]] {.async.} =
try:
return ok(self.healthMonitor.onlineMonitor.amIOnline())
except CatchableError as e:
return err(e.msg)

View File

@ -0,0 +1,37 @@
## Waku layer API — lightpush (light client publish) operations.
import logos_delivery/waku/compat/option_valueor
{.push raises: [].}
import results, chronos, chronicles
import logos_delivery/waku/waku
import
logos_delivery/waku/[
waku_core,
waku_core/codecs,
node/waku_node,
node/peer_manager,
waku_lightpush_legacy/client,
]
proc lightpushPublish*(
self: Waku, pubsubTopic: PubsubTopic, message: WakuMessage
): Future[Result[string, string]] {.async.} =
## Selects a lightpush service peer and publishes; returns the message hash.
try:
if self.node.wakuLegacyLightpushClient.isNil():
return err("wakuLegacyLightpushClient is not mounted")
let remotePeer = self.node.peerManager.selectPeer(WakuLightPushCodec).valueOr:
return err("failed to lightpublish message, no suitable remote peers")
let msgHashHex = (
await self.node.wakuLegacyLightpushClient.publish(
pubsubTopic, message, remotePeer
)
).valueOr:
return err($error)
return ok(msgHashHex)
except CatchableError as e:
return err(e.msg)

View File

@ -0,0 +1,112 @@
## Waku layer API — peer management operations.
{.push raises: [].}
import std/[options, sequtils, strutils]
import results, chronos, chronicles
import libp2p/[peerid, peerstore]
import logos_delivery/waku/waku
import logos_delivery/waku/[waku_core, node/waku_node, node/peer_manager]
type PeerConnInfo* = object ## structured connected-peer info for the api boundary
peerId*: string
protocols*: seq[string]
addresses*: seq[string]
proc connect*(
self: Waku, peers: seq[string], timeoutMs: uint32
): Future[Result[bool, string]] {.async.} =
try:
await self.node.connectToNodes(peers.mapIt(strip(it)), source = "static")
return ok(true)
except CatchableError as e:
return err(e.msg)
proc disconnectPeerById*(
self: Waku, peerId: string
): Future[Result[bool, string]] {.async.} =
try:
let pId = PeerId.init(peerId).valueOr:
return err($error)
await self.node.peerManager.disconnectNode(pId)
return ok(true)
except CatchableError as e:
return err(e.msg)
proc disconnectAllPeers*(self: Waku): Future[Result[bool, string]] {.async.} =
try:
await self.node.peerManager.disconnectAllPeers()
return ok(true)
except CatchableError as e:
return err(e.msg)
proc dialPeer*(
self: Waku, peerAddr: string, protocol: string, timeoutMs: int
): Future[Result[bool, string]] {.async.} =
try:
let remotePeerInfo = parsePeerInfo(peerAddr).valueOr:
return err($error)
let conn = await self.node.peerManager.dialPeer(remotePeerInfo, protocol)
if conn.isNone():
return err("failed dialing peer")
return ok(true)
except CatchableError as e:
return err(e.msg)
proc dialPeerById*(
self: Waku, peerId: string, protocol: string, timeoutMs: int
): Future[Result[bool, string]] {.async.} =
try:
let pId = PeerId.init(peerId).valueOr:
return err($error)
let conn = await self.node.peerManager.dialPeer(pId, protocol)
if conn.isNone():
return err("failed dialing peer")
return ok(true)
except CatchableError as e:
return err(e.msg)
proc peerIdsFromPeerstore*(self: Waku): Future[Result[seq[string], string]] {.async.} =
try:
return ok(self.node.peerManager.switch.peerStore.peers().mapIt($it.peerId))
except CatchableError as e:
return err(e.msg)
proc connectedPeersInfo*(
self: Waku
): Future[Result[seq[PeerConnInfo], string]] {.async.} =
## Structured info (protocols, addresses) for every connected peer.
try:
var infos: seq[PeerConnInfo]
for peer in self.node.peerManager.switch.peerStore.peers():
if peer.connectedness == Connected:
infos.add(
PeerConnInfo(
peerId: $peer.peerId,
protocols: peer.protocols,
addresses: peer.addrs.mapIt($it),
)
)
return ok(infos)
except CatchableError as e:
return err(e.msg)
proc connectedPeers*(self: Waku): Future[Result[seq[string], string]] {.async.} =
try:
let (inPeerIds, outPeerIds) = self.node.peerManager.connectedPeers()
return ok(concat(inPeerIds, outPeerIds).mapIt($it))
except CatchableError as e:
return err(e.msg)
proc peerIdsByProtocol*(
self: Waku, protocol: string
): Future[Result[seq[string], string]] {.async.} =
try:
return ok(
self.node.peerManager.switch.peerStore
.peers(protocol)
.filterIt(it.connectedness == Connected)
.mapIt($it.peerId)
)
except CatchableError as e:
return err(e.msg)

View File

@ -0,0 +1,45 @@
## Waku layer API — ping operation.
{.push raises: [].}
import results, chronos, chronicles
import libp2p/protocols/ping
import libp2p/switch
import logos_delivery/waku/waku
import logos_delivery/waku/[waku_core, node/waku_node, node/waku_node/ping]
proc pingPeer*(
self: Waku, peerAddr: string, timeoutMs: int
): Future[Result[int64, string]] {.async.} =
## Pings the peer; `timeoutMs <= 0` means no timeout. Returns RTT in nanos.
try:
let peerInfo = parsePeerInfo(peerAddr).valueOr:
return err("pingPeer failed to parse peer addr: " & $error)
proc doPing(): Future[Result[Duration, string]] {.async.} =
try:
let conn =
await self.node.switch.dial(peerInfo.peerId, peerInfo.addrs, PingCodec)
defer:
await conn.close()
let rtt = await self.node.libp2pPing.ping(conn)
if rtt == 0.nanos:
return err("could not ping peer: rtt-0")
return ok(rtt)
except CatchableError as e:
return err("could not ping peer: " & e.msg)
let pingFut = doPing()
let rtt: Duration =
if timeoutMs <= 0:
(await pingFut).valueOr:
return err(error)
else:
if not await pingFut.withTimeout(chronos.milliseconds(timeoutMs)):
return err("ping timed out")
pingFut.read().valueOr:
return err(error)
return ok(rtt.nanos)
except CatchableError as e:
return err(e.msg)

View File

@ -0,0 +1,132 @@
## Waku layer API — relay (gossipsub) operations.
{.push raises: [].}
import std/sequtils
import results, chronos, chronicles, secp256k1, stew/byteutils
import logos_delivery/waku/waku
import
logos_delivery/waku/[
waku_core,
node/waku_node,
node/waku_node/relay,
node/subscription_manager,
waku_relay/protocol,
factory/waku_conf,
factory/validator_signed,
]
proc relayPublish*(
self: Waku, pubsubTopic: PubsubTopic, message: WakuMessage, timeoutMs: uint32
): Future[Result[string, string]] {.async.} =
## Publishes `message` and returns its message hash (0x-hex).
try:
if self.node.wakuRelay.isNil():
return err("relayPublish: WakuRelay not mounted")
(await self.node.wakuRelay.publish(pubsubTopic, message)).isOkOr:
return err($error)
return ok(computeMessageHash(pubsubTopic, message).to0xHex)
except CatchableError as e:
return err(e.msg)
proc relaySubscribe*(
self: Waku,
pubsubTopic: PubsubTopic,
handler: WakuRelayHandler = WakuRelayHandler(nil),
): Future[Result[bool, string]] {.async.} =
## Subscribes to `pubsubTopic`. `handler` (optional) is invoked per message;
## pass nil to subscribe without a message callback.
try:
if self.node.wakuRelay.isNil():
return err("relaySubscribe: WakuRelay not mounted")
self.node.subscribe((kind: SubscriptionKind.PubsubSub, topic: pubsubTopic), handler).isOkOr:
return err($error)
return ok(true)
except CatchableError as e:
return err(e.msg)
proc relayUnsubscribe*(
self: Waku, pubsubTopic: PubsubTopic
): Future[Result[bool, string]] {.async.} =
try:
if self.node.wakuRelay.isNil():
return err("relayUnsubscribe: WakuRelay not mounted")
self.node.unsubscribe((kind: SubscriptionKind.PubsubSub, topic: pubsubTopic)).isOkOr:
return err($error)
return ok(true)
except CatchableError as e:
return err(e.msg)
proc relayAddProtectedShard*(
self: Waku, clusterId: uint16, shardId: uint16, publicKey: string
): Future[Result[bool, string]] {.async.} =
try:
if self.node.wakuRelay.isNil():
return err("relayAddProtectedShard: WakuRelay not mounted")
let pubKey = SkPublicKey.fromHex(publicKey).valueOr:
return err("relayAddProtectedShard: invalid public key: " & $error)
let protectedShard = ProtectedShard(shard: shardId, key: pubKey)
self.node.wakuRelay.addSignedShardsValidator(@[protectedShard], clusterId)
return ok(true)
except CatchableError as e:
return err(e.msg)
proc relayConnectedPeers*(
self: Waku, pubsubTopic: PubsubTopic
): Future[Result[seq[string], string]] {.async.} =
try:
if self.node.wakuRelay.isNil():
return err("relayConnectedPeers: WakuRelay not mounted")
let connPeers = self.node.wakuRelay.getConnectedPeers(pubsubTopic).valueOr:
return err($error)
return ok(connPeers.mapIt($it))
except CatchableError as e:
return err(e.msg)
proc relayPeersInMesh*(
self: Waku, pubsubTopic: PubsubTopic
): Future[Result[seq[string], string]] {.async.} =
try:
if self.node.wakuRelay.isNil():
return err("relayPeersInMesh: WakuRelay not mounted")
let meshPeers = self.node.wakuRelay.getPeersInMesh(pubsubTopic).valueOr:
return err($error)
return ok(meshPeers.mapIt($it))
except CatchableError as e:
return err(e.msg)
proc relayNumPeersInMesh*(
self: Waku, pubsubTopic: PubsubTopic
): Future[Result[int, string]] {.async.} =
try:
if self.node.wakuRelay.isNil():
return err("relayNumPeersInMesh: WakuRelay not mounted")
let n = self.node.wakuRelay.getNumPeersInMesh(pubsubTopic).valueOr:
return err($error)
return ok(n)
except CatchableError as e:
return err(e.msg)
proc relayNumConnectedPeers*(
self: Waku, pubsubTopic: PubsubTopic
): Future[Result[int, string]] {.async.} =
try:
if self.node.wakuRelay.isNil():
return err("relayNumConnectedPeers: WakuRelay not mounted")
let n = self.node.wakuRelay.getNumConnectedPeers(pubsubTopic).valueOr:
return err($error)
return ok(n)
except CatchableError as e:
return err(e.msg)

View File

@ -0,0 +1,29 @@
## Waku layer API — store (historical query) operations.
{.push raises: [].}
import results, chronos, chronicles
import logos_delivery/waku/waku
import
logos_delivery/waku/[waku_core, node/waku_node, waku_store/common, waku_store/client]
proc storeQuery*(
self: Waku, request: StoreQueryRequest, peer: string, timeoutMs: int
): Future[Result[StoreQueryResponse, string]] {.async.} =
try:
if self.node.wakuStoreClient.isNil():
return err("wakuStoreClient is not mounted")
let remotePeer = parsePeerInfo(peer).valueOr:
return err("storeQuery failed to parse peer addr: " & $error)
let queryFut = self.node.wakuStoreClient.query(request, remotePeer)
if not await queryFut.withTimeout(timeoutMs.milliseconds):
return err("storeQuery timed out")
let queryResponse = queryFut.read().valueOr:
return err("storeQuery failed: " & $error)
return ok(queryResponse)
except CatchableError as e:
return err(e.msg)

View File

@ -0,0 +1,27 @@
## Waku layer API — topic construction.
{.push raises: [].}
import std/strformat
import results, chronos
import logos_delivery/waku/waku
import logos_delivery/waku/waku_core
proc buildContentTopic*(
self: Waku, appName: string, appVersion: uint32, name: string, encoding: string
): Future[Result[ContentTopic, string]] {.async.} =
try:
return ok(ContentTopic(fmt"/{appName}/{appVersion}/{name}/{encoding}"))
except CatchableError as e:
return err(e.msg)
proc buildPubsubTopic*(
self: Waku, topicName: string
): Future[Result[PubsubTopic, string]] {.async.} =
try:
return ok(PubsubTopic(fmt"/waku/2/{topicName}"))
except CatchableError as e:
return err(e.msg)
proc defaultPubsubTopic*(self: Waku): Future[Result[PubsubTopic, string]] {.async.} =
return ok(DefaultPubsubTopic)

View File

@ -21,7 +21,7 @@ import
import
logos_delivery/waku/waku_core,
logos_delivery/waku/node/peer_manager,
logos_delivery/waku/events/discovery_events
logos_delivery/waku/api/events/discovery_events
logScope:
topics = "waku service discovery"

View File

@ -1,9 +0,0 @@
import
./[
message_events, delivery_events, health_events, peer_events, lifecycle_events,
discovery_events,
]
export
message_events, delivery_events, health_events, peer_events, lifecycle_events,
discovery_events

View File

@ -1,35 +0,0 @@
import brokers/event_broker
import logos_delivery/api/types
import logos_delivery/waku/[waku_core/message, waku_core/topics]
export types
EventBroker:
# Event emitted when a message is sent to the network
type MessageSentEvent* = object
requestId*: RequestId
messageHash*: string
EventBroker:
# Event emitted when a message send operation fails
type MessageErrorEvent* = object
requestId*: RequestId
messageHash*: string
error*: string
EventBroker:
# Confirmation that a message has been correctly delivered to some neighbouring nodes.
type MessagePropagatedEvent* = object
requestId*: RequestId
messageHash*: string
EventBroker:
# Event emitted when a message is received via Waku
type MessageReceivedEvent* = object
messageHash*: string
message*: WakuMessage
EventBroker:
# Internal event emitted when a message arrives from the network via any protocol
type MessageSeenEvent* = object
topic*: PubsubTopic
message*: WakuMessage

View File

@ -1,5 +1,9 @@
import ../waku_relay, ../node/peer_manager, ../node/health_monitor/connection_status
# Re-export the modules that define the handler types below, so that consumers
# of `AppCallbacks` (e.g. the FFI library) can construct the handlers.
export waku_relay, peer_manager, connection_status
type AppCallbacks* = ref object
relayHandler*: WakuRelayHandler
topicHealthChangeHandler*: TopicHealthChangeHandler

View File

@ -39,7 +39,7 @@ import
../node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations,
../waku_lightpush_legacy/common,
../common/rate_limit/setting,
../events/discovery_events
../api/events/discovery_events
## Peer persistence

View File

@ -9,11 +9,12 @@ import
libp2p/protocols/pubsub,
libp2p/protocols/pubsub/rpc/messages,
logos_delivery/api/types,
logos_delivery/api/logos_delivery_api, # EventConnectionStatusChange
logos_delivery/waku/[
waku_relay,
waku_rln_relay,
events/health_events,
events/peer_events,
api/events/health_events,
api/events/peer_events,
node/waku_node,
node/node_telemetry,
node/peer_manager,

View File

@ -20,7 +20,7 @@ import
waku_relay/protocol,
waku_enr/sharding,
waku_enr/capabilities,
events/peer_events,
api/events/peer_events,
common/nimchronos,
common/enr,
common/callbacks,

View File

@ -15,14 +15,14 @@ import
waku_filter_v2/common as filter_common,
waku_filter_v2/client as filter_client,
waku_filter_v2/protocol as filter_protocol,
events/health_events,
events/message_events,
events/peer_events,
api/events/health_events,
api/events/peer_events,
requests/health_requests,
node/peer_manager,
node/health_monitor/topic_health,
node/health_monitor/connection_status,
]
import logos_delivery/api/kernel_api # MessageSeenEvent
{.push raises: [].}

View File

@ -59,10 +59,10 @@ import
waku_mix,
requests/node_requests,
requests/health_requests,
events/health_events,
events/message_events,
events/peer_events,
api/events/health_events,
api/events/peer_events,
],
logos_delivery/api/kernel_api, # MessageSeenEvent
logos_delivery/waku/discovery/waku_kademlia,
logos_delivery/waku/net/[bound_ports, net_config],
./peer_manager,

View File

@ -33,8 +33,8 @@ import
node/waku_node,
node/subscription_manager,
node/peer_manager,
events/message_events,
]
import logos_delivery/api/kernel_api # MessageSeenEvent
export waku_relay.WakuRelayHandler

View File

@ -22,6 +22,7 @@ import
metrics/chronos_httpserver,
brokers/broker_context,
logos_delivery/api/types,
logos_delivery/api/kernel_api,
logos_delivery/waku/[
waku_core,
waku_node,
@ -57,15 +58,17 @@ import
./factory/waku_conf,
./factory/waku_state_info
# Surfaces the Kernel API interface (and its `MessageSeenEvent`) to consumers
# of the Waku layer.
export kernel_api
logScope:
topics = "wakunode waku"
# Git version in git describe format (defined at compile time)
const git_version* {.strdefine.} = "n/a"
const FilterOpTimeout = 5.seconds
type Waku* = ref object
type Waku* = ref object of IKernel
stateInfo*: WakuStateInfo
conf*: WakuConf
rng*: crypto.Rng
@ -574,418 +577,4 @@ proc stop*(waku: Waku): Future[Result[void, string]] {.async: (raises: []).} =
return ok()
## Kernel API realization
##
# --- topic construction ---
proc buildContentTopic*(
self: Waku, appName: string, appVersion: uint32, name: string, encoding: string
): Future[Result[ContentTopic, string]] {.async.} =
try:
return ok(ContentTopic(fmt"/{appName}/{appVersion}/{name}/{encoding}"))
except CatchableError as e:
return err(e.msg)
proc buildPubsubTopic*(
self: Waku, topicName: string
): Future[Result[PubsubTopic, string]] {.async.} =
try:
return ok(PubsubTopic(fmt"/waku/2/{topicName}"))
except CatchableError as e:
return err(e.msg)
proc defaultPubsubTopic*(self: Waku): Future[Result[PubsubTopic, string]] {.async.} =
return ok(DefaultPubsubTopic)
# --- relay ---
proc relayPublish*(
self: Waku, pubsubTopic: PubsubTopic, message: WakuMessage, timeoutMs: uint32
): Future[Result[int, string]] {.async.} =
try:
if self.node.wakuRelay.isNil():
return err("relayPublish: WakuRelay not mounted")
let numPeers = (await self.node.wakuRelay.publish(pubsubTopic, message)).valueOr:
return err($error)
return ok(numPeers)
except CatchableError as e:
return err(e.msg)
proc relaySubscribe*(
self: Waku, pubsubTopic: PubsubTopic
): Future[Result[bool, string]] {.async.} =
try:
if self.node.wakuRelay.isNil():
return err("relaySubscribe: WakuRelay not mounted")
self.node.subscribe(
(kind: SubscriptionKind.PubsubSub, topic: pubsubTopic), WakuRelayHandler(nil)
).isOkOr:
return err($error)
return ok(true)
except CatchableError as e:
return err(e.msg)
proc relayUnsubscribe*(
self: Waku, pubsubTopic: PubsubTopic
): Future[Result[bool, string]] {.async.} =
try:
if self.node.wakuRelay.isNil():
return err("relayUnsubscribe: WakuRelay not mounted")
self.node.unsubscribe((kind: SubscriptionKind.PubsubSub, topic: pubsubTopic)).isOkOr:
return err($error)
return ok(true)
except CatchableError as e:
return err(e.msg)
proc relayAddProtectedShard*(
self: Waku, clusterId: uint16, shardId: uint16, publicKey: string
): Future[Result[bool, string]] {.async.} =
try:
if self.node.wakuRelay.isNil():
return err("relayAddProtectedShard: WakuRelay not mounted")
let pubKey = SkPublicKey.fromHex(publicKey).valueOr:
return err("relayAddProtectedShard: invalid public key: " & $error)
let protectedShard = ProtectedShard(shard: shardId, key: pubKey)
self.node.wakuRelay.addSignedShardsValidator(@[protectedShard], clusterId)
return ok(true)
except CatchableError as e:
return err(e.msg)
proc relayConnectedPeers*(
self: Waku, pubsubTopic: PubsubTopic
): Future[Result[seq[string], string]] {.async.} =
try:
if self.node.wakuRelay.isNil():
return err("relayConnectedPeers: WakuRelay not mounted")
let connPeers = self.node.wakuRelay.getConnectedPeers(pubsubTopic).valueOr:
return err($error)
return ok(connPeers.mapIt($it))
except CatchableError as e:
return err(e.msg)
proc relayPeersInMesh*(
self: Waku, pubsubTopic: PubsubTopic
): Future[Result[seq[string], string]] {.async.} =
try:
if self.node.wakuRelay.isNil():
return err("relayPeersInMesh: WakuRelay not mounted")
let meshPeers = self.node.wakuRelay.getPeersInMesh(pubsubTopic).valueOr:
return err($error)
return ok(meshPeers.mapIt($it))
except CatchableError as e:
return err(e.msg)
# --- filter ---
proc filterSubscribe*(
self: Waku,
pubsubTopic: Option[PubsubTopic],
contentTopics: seq[ContentTopic],
peer: string,
): Future[Result[bool, string]] {.async.} =
try:
if self.node.wakuFilterClient.isNil():
return err("wakuFilterClient is not mounted")
let subFut = self.node.filterSubscribe(pubsubTopic, contentTopics, peer)
if not await subFut.withTimeout(FilterOpTimeout):
return err("filter subscription timed out")
subFut.read().isOkOr:
return err($error)
return ok(true)
except CatchableError as e:
return err(e.msg)
proc filterUnsubscribe*(
self: Waku,
pubsubTopic: Option[PubsubTopic],
contentTopics: seq[ContentTopic],
peer: string,
): Future[Result[bool, string]] {.async.} =
try:
if self.node.wakuFilterClient.isNil():
return err("wakuFilterClient is not mounted")
let unsubFut = self.node.filterUnsubscribe(pubsubTopic, contentTopics, peer)
if not await unsubFut.withTimeout(FilterOpTimeout):
return err("filter un-subscription timed out")
unsubFut.read().isOkOr:
return err($error)
return ok(true)
except CatchableError as e:
return err(e.msg)
proc filterUnsubscribeAll*(
self: Waku, peer: string
): Future[Result[bool, string]] {.async.} =
try:
if self.node.wakuFilterClient.isNil():
return err("wakuFilterClient is not mounted")
let unsubFut = self.node.filterUnsubscribeAll(peer)
if not await unsubFut.withTimeout(FilterOpTimeout):
return err("filter un-subscription all timed out")
unsubFut.read().isOkOr:
return err($error)
return ok(true)
except CatchableError as e:
return err(e.msg)
# --- lightpush ---
proc lightpushPublish*(
self: Waku, pubsubTopic: PubsubTopic, message: WakuMessage, peer: string
): Future[Result[string, string]] {.async.} =
try:
if self.node.wakuLegacyLightpushClient.isNil():
return err("wakuLegacyLightpushClient is not mounted")
let remotePeer = parsePeerInfo(peer).valueOr:
return err("lightpushPublish failed to parse peer addr: " & $error)
let msgHashHex = (
await self.node.wakuLegacyLightpushClient.publish(
pubsubTopic, message, remotePeer
)
).valueOr:
return err($error)
return ok(msgHashHex)
except CatchableError as e:
return err(e.msg)
# --- store ---
proc storeQuery*(
self: Waku, request: StoreQueryRequest, peer: string, timeoutMs: int
): Future[Result[StoreQueryResponse, string]] {.async.} =
try:
if self.node.wakuStoreClient.isNil():
return err("wakuStoreClient is not mounted")
let remotePeer = parsePeerInfo(peer).valueOr:
return err("storeQuery failed to parse peer addr: " & $error)
let queryFut = self.node.wakuStoreClient.query(request, remotePeer)
if not await queryFut.withTimeout(timeoutMs.milliseconds):
return err("storeQuery timed out")
let queryResponse = queryFut.read().valueOr:
return err("storeQuery failed: " & $error)
return ok(queryResponse)
except CatchableError as e:
return err(e.msg)
# --- peer management ---
proc connect*(
self: Waku, peers: seq[string], timeoutMs: uint32
): Future[Result[bool, string]] {.async.} =
try:
await self.node.connectToNodes(peers.mapIt(strip(it)), source = "static")
return ok(true)
except CatchableError as e:
return err(e.msg)
proc disconnectPeerById*(
self: Waku, peerId: string
): Future[Result[bool, string]] {.async.} =
try:
let pId = PeerId.init(peerId).valueOr:
return err($error)
await self.node.peerManager.disconnectNode(pId)
return ok(true)
except CatchableError as e:
return err(e.msg)
proc disconnectAllPeers*(self: Waku): Future[Result[bool, string]] {.async.} =
try:
await self.node.peerManager.disconnectAllPeers()
return ok(true)
except CatchableError as e:
return err(e.msg)
proc dialPeer*(
self: Waku, peerAddr: string, protocol: string, timeoutMs: int
): Future[Result[bool, string]] {.async.} =
try:
let remotePeerInfo = parsePeerInfo(peerAddr).valueOr:
return err($error)
let conn = await self.node.peerManager.dialPeer(remotePeerInfo, protocol)
if conn.isNone():
return err("failed dialing peer")
return ok(true)
except CatchableError as e:
return err(e.msg)
proc dialPeerById*(
self: Waku, peerId: string, protocol: string, timeoutMs: int
): Future[Result[bool, string]] {.async.} =
try:
let pId = PeerId.init(peerId).valueOr:
return err($error)
let conn = await self.node.peerManager.dialPeer(pId, protocol)
if conn.isNone():
return err("failed dialing peer")
return ok(true)
except CatchableError as e:
return err(e.msg)
proc peerIdsFromPeerstore*(self: Waku): Future[Result[seq[string], string]] {.async.} =
try:
return ok(self.node.peerManager.switch.peerStore.peers().mapIt($it.peerId))
except CatchableError as e:
return err(e.msg)
proc connectedPeersInfo*(self: Waku): Future[Result[seq[string], string]] {.async.} =
try:
return ok(
self.node.peerManager.switch.peerStore
.peers()
.filterIt(it.connectedness == Connected)
.mapIt($it.peerId)
)
except CatchableError as e:
return err(e.msg)
proc connectedPeers*(self: Waku): Future[Result[seq[string], string]] {.async.} =
try:
let (inPeerIds, outPeerIds) = self.node.peerManager.connectedPeers()
return ok(concat(inPeerIds, outPeerIds).mapIt($it))
except CatchableError as e:
return err(e.msg)
proc peerIdsByProtocol*(
self: Waku, protocol: string
): Future[Result[seq[string], string]] {.async.} =
try:
return ok(
self.node.peerManager.switch.peerStore
.peers(protocol)
.filterIt(it.connectedness == Connected)
.mapIt($it.peerId)
)
except CatchableError as e:
return err(e.msg)
# --- discovery ---
proc dnsDiscovery*(
self: Waku, enrTreeUrl: string, nameServer: string, timeoutMs: int
): Future[Result[seq[string], string]] {.async.} =
try:
let dnsNameServers = @[parseIpAddress(nameServer)]
let discoveredPeers = (
await retrieveDynamicBootstrapNodes(enrTreeUrl, dnsNameServers)
).valueOr:
return err("failed discovering peers from DNS: " & $error)
var multiAddresses = newSeq[string]()
for discPeer in discoveredPeers:
for address in discPeer.addrs:
multiAddresses.add($address & "/p2p/" & $discPeer)
return ok(multiAddresses)
except CatchableError as e:
return err(e.msg)
proc discv5UpdateBootnodes*(
self: Waku, bootnodes: seq[string]
): Future[Result[bool, string]] {.async.} =
try:
if self.wakuDiscv5.isNil():
return err("discv5 not started")
let jsonArray = "[" & bootnodes.mapIt("\"" & it & "\"").join(",") & "]"
self.wakuDiscv5.updateBootstrapRecords(jsonArray).isOkOr:
return err("error in discv5UpdateBootnodes: " & $error)
return ok(true)
except CatchableError as e:
return err(e.msg)
proc startDiscv5*(self: Waku): Future[Result[bool, string]] {.async.} =
try:
if self.wakuDiscv5.isNil():
return err("discv5 not started")
(await self.wakuDiscv5.start()).isOkOr:
return err("error starting discv5: " & $error)
return ok(true)
except CatchableError as e:
return err(e.msg)
proc stopDiscv5*(self: Waku): Future[Result[bool, string]] {.async.} =
try:
if self.wakuDiscv5.isNil():
return err("discv5 not started")
await self.wakuDiscv5.stop()
return ok(true)
except CatchableError as e:
return err(e.msg)
proc peerExchangeRequest*(
self: Waku, numPeers: uint64
): Future[Result[int, string]] {.async.} =
try:
let numPeersRecv = (await self.node.fetchPeerExchangePeers(numPeers)).valueOr:
return err("failed peer exchange: " & $error)
return ok(numPeersRecv)
except CatchableError as e:
return err(e.msg)
# --- debug / info ---
proc version*(self: Waku): Future[Result[string, string]] {.async.} =
return ok(WakuNodeVersionString)
proc listenAddresses*(self: Waku): Future[Result[seq[string], string]] {.async.} =
try:
return ok(self.node.info().listenAddresses)
except CatchableError as e:
return err(e.msg)
proc myEnr*(self: Waku): Future[Result[string, string]] {.async.} =
try:
return ok(self.node.enr.toURI())
except CatchableError as e:
return err(e.msg)
proc myPeerId*(self: Waku): Future[Result[string, string]] {.async.} =
try:
return ok($self.node.peerId())
except CatchableError as e:
return err(e.msg)
proc metrics*(self: Waku): Future[Result[string, string]] {.async.} =
{.gcsafe.}:
try:
return ok(defaultRegistry.toText())
except CatchableError as e:
return err(e.msg)
proc pingPeer*(
self: Waku, peerAddr: string, timeoutMs: int
): Future[Result[int64, string]] {.async.} =
try:
let peerInfo = parsePeerInfo(peerAddr).valueOr:
return err("pingPeer failed to parse peer addr: " & $error)
let conn = await self.node.switch.dial(peerInfo.peerId, peerInfo.addrs, PingCodec)
defer:
await conn.close()
let pingRTT = await self.node.libp2pPing.ping(conn)
if pingRTT == 0.nanos:
return err("could not ping peer: rtt-0")
return ok(pingRTT.nanos)
except CatchableError as e:
return err(e.msg)
{.pop.}

View File

@ -14,7 +14,7 @@ import
brokers/broker_context
import
logos_delivery/waku/[node/peer_manager, waku_core, events/delivery_events],
logos_delivery/waku/[node/peer_manager, waku_core, api/events/filter_subscribe_events],
./common,
./protocol_metrics,
./rpc_codec,

View File

@ -24,9 +24,9 @@ import
logos_delivery/waku/waku_core,
logos_delivery/waku/node/health_monitor/topic_health,
logos_delivery/waku/requests/health_requests,
logos_delivery/waku/events/health_events,
logos_delivery/waku/api/events/health_events,
./message_id,
logos_delivery/waku/events/peer_events
logos_delivery/waku/api/events/peer_events
from logos_delivery/waku/waku_core/codecs import WakuRelayCodec
export WakuRelayCodec

View File

@ -12,7 +12,7 @@ import
[topic_health, health_status, protocol_health, health_report],
logos_delivery/waku/requests/health_requests,
logos_delivery/waku/requests/node_requests,
logos_delivery/waku/events/health_events,
logos_delivery/waku/api/events/health_events,
logos_delivery/waku/common/waku_protocol,
logos_delivery/waku/factory/waku_conf
import tools/confutils/cli_args

View File

@ -14,8 +14,7 @@ import
logos_delivery/waku/[
waku_node,
waku_core,
events/message_events,
events/health_events,
api/events/health_events,
waku_relay/protocol,
waku_archive,
waku_archive/common as archive_common,

View File

@ -12,7 +12,6 @@ import
logos_delivery/waku/[
waku_node,
waku_core,
events/message_events,
waku_relay/protocol,
node/waku_node/filter,
node/subscription_manager,

View File

@ -10,7 +10,7 @@ import ../testlib/[common, wakucore, wakunode, testasync]
import logos_delivery
import logos_delivery/waku/[waku_node, waku_core]
import logos_delivery/waku/factory/waku_conf
import logos_delivery/waku/events/message_events as waku_message_events
import logos_delivery/api/messaging_client_api as waku_message_events
import tools/confutils/cli_args
import logos_delivery/channels/reliable_channel_manager

View File

@ -19,8 +19,8 @@ import
node/waku_node/store,
node/waku_node/lightpush,
node/waku_node/filter,
events/health_events,
events/peer_events,
api/events/health_events,
api/events/peer_events,
waku_archive,
]