feat: add LogosDelivery orchestrator as project entry point for FFI (#3970)

This commit is contained in:
Ivan FB 2026-06-23 01:20:09 +02:00 committed by GitHub
parent 2fe7e1c373
commit 1a3b3204fa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
28 changed files with 500 additions and 469 deletions

View File

@ -101,7 +101,14 @@ jobs:
touch nimbledeps/.nimble-setup
- name: Build binaries
run: make V=1 all
# -j1: `all` builds wakunode2 and liblogosdelivery, each via `nimble
# <task>`. Under -j2 the two nimble invocations re-resolve git deps
# concurrently and clobber each other in the shared ~/.nimble/pkgcache
# (e.g. "destination already exists", "could not lock config file"),
# failing the build. Serializing the targets removes the race; Nim's
# own --parallelBuild still parallelizes compilation. Mirrors the test
# job, which already forces -j1.
run: make V=1 -j1 all
build-windows:
needs: changes

View File

@ -7,7 +7,7 @@ type CliArgs = object
defaultValue: "", desc: "ETH RPC Endpoint, if passed, RLN is enabled"
.}: string
proc periodicSender(w: Waku): Future[void] {.async.} =
proc periodicSender(logos: LogosDelivery): Future[void] {.async.} =
let sentListener = MessageSentEvent.listen(
proc(event: MessageSentEvent) {.async: (raises: []).} =
echo "Message sent with request ID: ",
@ -45,7 +45,7 @@ proc periodicSender(w: Waku): Future[void] {.async.} =
payload = "Hello Waku! Message number: " & $counter,
)
let sendRequestId = (await w.send(envelope)).valueOr:
let sendRequestId = (await logos.messagingClient.send(envelope)).valueOr:
echo "Failed to send message: ", error
quit(QuitFailure)
@ -75,16 +75,12 @@ when isMainModule:
conf.preset = "twn"
conf.ethClientUrls = @[EthRpcUrl(args.ethRpcEndpoint)]
# Create the node using the library API's createNode function
let node = (waitFor createNode(conf)).valueOr:
# Create the full Logos Messaging stack (Waku + messaging + channels)
let node = (waitFor LogosDelivery.new(conf)).valueOr:
echo "Failed to create node: ", error
quit(QuitFailure)
echo("Waku node created successfully!")
node.mountMessagingClient().isOkOr:
echo "Failed to mount messaging: ", error
quit(QuitFailure)
echo("Logos Messaging node created successfully!")
# Start the node
(waitFor node.start()).isOkOr:

View File

@ -1,6 +1,6 @@
import ffi
import std/locks
import logos_delivery/waku/factory/waku
import logos_delivery
declareLibrary("logosdelivery")
@ -8,7 +8,7 @@ var eventCallbackLock: Lock
initLock(eventCallbackLock)
template requireInitializedNode*(
ctx: ptr FFIContext[Waku], opName: string, onError: untyped
ctx: ptr FFIContext[LogosDelivery], opName: string, onError: untyped
) =
if isNil(ctx):
let errMsg {.inject.} = opName & " failed: invalid context"
@ -18,7 +18,7 @@ template requireInitializedNode*(
onError
proc logosdelivery_set_event_callback(
ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
) {.dynlib, exportc, cdecl.} =
if isNil(ctx):
echo "error: invalid context in logosdelivery_set_event_callback"

View File

@ -22,32 +22,32 @@ proc getMetrics(): string =
return defaultRegistry.toText() ## defaultRegistry is {.global.} in metrics module
proc waku_version(
ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
) {.ffi.} =
return ok(WakuNodeVersionString)
proc waku_listen_addresses(
ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
) {.ffi.} =
## returns a comma-separated string of the listen addresses
return ok(ctx.myLib[].node.getMultiaddresses().join(","))
return ok(ctx.myLib[].waku.node.getMultiaddresses().join(","))
proc waku_get_my_enr(
ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
) {.ffi.} =
return ok(ctx.myLib[].node.enr.toURI())
return ok(ctx.myLib[].waku.node.enr.toURI())
proc waku_get_my_peerid(
ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
) {.ffi.} =
return ok($ctx.myLib[].node.peerId())
return ok($ctx.myLib[].waku.node.peerId())
proc waku_get_metrics(
ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
) {.ffi.} =
return ok(getMetrics())
proc waku_is_online(
ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
) {.ffi.} =
return ok($ctx.myLib[].healthMonitor.onlineMonitor.amIOnline())
return ok($ctx.myLib[].waku.healthMonitor.onlineMonitor.amIOnline())

View File

@ -39,7 +39,7 @@ proc performPeerExchangeRequestTo*(
return ok(numPeersRecv)
proc waku_discv5_update_bootnodes(
ctx: ptr FFIContext[Waku],
ctx: ptr FFIContext[LogosDelivery],
callback: FFICallBack,
userData: pointer,
bootnodes: cstring,
@ -47,14 +47,14 @@ proc waku_discv5_update_bootnodes(
## Updates the bootnode list used for discovering new peers via DiscoveryV5
## bootnodes - JSON array containing the bootnode ENRs i.e. `["enr:...", "enr:..."]`
updateDiscv5BootstrapNodes($bootnodes, ctx.myLib[]).isOkOr:
updateDiscv5BootstrapNodes($bootnodes, ctx.myLib[].waku).isOkOr:
error "UPDATE_DISCV5_BOOTSTRAP_NODES failed", error = error
return err($error)
return ok("discovery request processed correctly")
proc waku_dns_discovery(
ctx: ptr FFIContext[Waku],
ctx: ptr FFIContext[LogosDelivery],
callback: FFICallBack,
userData: pointer,
enrTreeUrl: cstring,
@ -69,27 +69,27 @@ proc waku_dns_discovery(
return ok(nodes.join(","))
proc waku_start_discv5(
ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
) {.ffi.} =
(await ctx.myLib[].wakuDiscv5.start()).isOkOr:
(await ctx.myLib[].waku.wakuDiscv5.start()).isOkOr:
error "START_DISCV5 failed", error = error
return err("error starting discv5: " & $error)
return ok("discv5 started correctly")
proc waku_stop_discv5(
ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
) {.ffi.} =
await ctx.myLib[].wakuDiscv5.stop()
await ctx.myLib[].waku.wakuDiscv5.stop()
return ok("discv5 stopped correctly")
proc waku_peer_exchange_request(
ctx: ptr FFIContext[Waku],
ctx: ptr FFIContext[LogosDelivery],
callback: FFICallBack,
userData: pointer,
numPeers: uint64,
) {.ffi.} =
let numValidPeers = (await performPeerExchangeRequestTo(numPeers, ctx.myLib[])).valueOr:
let numValidPeers = (await performPeerExchangeRequestTo(numPeers, ctx.myLib[].waku)).valueOr:
error "waku_peer_exchange_request failed", error = error
return err("failed peer exchange: " & $error)

View File

@ -13,7 +13,7 @@ import
proc createWaku(
configJson: cstring, appCallbacks: AppCallbacks = nil
): Future[Result[Waku, string]] {.async.} =
): Future[Result[LogosDelivery, string]] {.async.} =
var conf = defaultWakuNodeConf().valueOr:
return err("Failed creating node: " & error)
@ -47,19 +47,15 @@ proc createWaku(
appCallbacks.relayHandler = nil
appCallbacks.topicHealthChangeHandler = nil
# TODO: Convert `confJson` directly to `WakuConf`
var wakuConf = conf.toWakuConf().valueOr:
return err("Configuration error: " & $error)
conf.rest = false ## libwaku never runs the REST server
wakuConf.restServerConf = none(RestServerConf) ## don't want REST in libwaku
let logosRes = (await LogosDelivery.new(conf, appCallbacks)).valueOr:
error "LogosDelivery initialization failed", error = error
return err("Failed setting up LogosDelivery: " & $error)
let wakuRes = (await Waku.new(wakuConf, appCallbacks)).valueOr:
error "waku initialization failed", error = error
return err("Failed setting up Waku: " & $error)
return ok(logosRes)
return ok(wakuRes)
registerReqFFI(CreateNodeWithCallbacksRequest, ctx: ptr FFIContext[Waku]):
registerReqFFI(CreateNodeWithCallbacksRequest, ctx: ptr FFIContext[LogosDelivery]):
proc(
configJson: cstring, appCallbacks: AppCallbacks
): Future[Result[string, string]] {.async.} =
@ -70,7 +66,7 @@ registerReqFFI(CreateNodeWithCallbacksRequest, ctx: ptr FFIContext[Waku]):
return ok("")
proc waku_start(
ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
) {.ffi.} =
(await ctx.myLib[].start()).isOkOr:
error "START_NODE failed", error = error
@ -78,7 +74,7 @@ proc waku_start(
return ok("")
proc waku_stop(
ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
) {.ffi.} =
(await ctx.myLib[].stop()).isOkOr:
error "STOP_NODE failed", error = error

View File

@ -12,41 +12,46 @@ type PeerInfo = object
addresses: seq[string]
proc waku_get_peerids_from_peerstore(
ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
) {.ffi.} =
## returns a comma-separated string of peerIDs
let peerIDs =
ctx.myLib[].node.peerManager.switch.peerStore.peers().mapIt($it.peerId).join(",")
let peerIDs = ctx.myLib[].waku.node.peerManager.switch.peerStore
.peers()
.mapIt($it.peerId)
.join(",")
return ok(peerIDs)
proc waku_connect(
ctx: ptr FFIContext[Waku],
ctx: ptr FFIContext[LogosDelivery],
callback: FFICallBack,
userData: pointer,
peerMultiAddr: cstring,
timeoutMs: cuint,
) {.ffi.} =
let peers = ($peerMultiAddr).split(",").mapIt(strip(it))
await ctx.myLib[].node.connectToNodes(peers, source = "static")
await ctx.myLib[].waku.node.connectToNodes(peers, source = "static")
return ok("")
proc waku_disconnect_peer_by_id(
ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer, peerId: cstring
ctx: ptr FFIContext[LogosDelivery],
callback: FFICallBack,
userData: pointer,
peerId: cstring,
) {.ffi.} =
let pId = PeerId.init($peerId).valueOr:
error "DISCONNECT_PEER_BY_ID failed", error = $error
return err($error)
await ctx.myLib[].node.peerManager.disconnectNode(pId)
await ctx.myLib[].waku.node.peerManager.disconnectNode(pId)
return ok("")
proc waku_disconnect_all_peers(
ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
) {.ffi.} =
await ctx.myLib[].node.peerManager.disconnectAllPeers()
await ctx.myLib[].waku.node.peerManager.disconnectAllPeers()
return ok("")
proc waku_dial_peer(
ctx: ptr FFIContext[Waku],
ctx: ptr FFIContext[LogosDelivery],
callback: FFICallBack,
userData: pointer,
peerMultiAddr: cstring,
@ -56,7 +61,7 @@ proc waku_dial_peer(
let remotePeerInfo = parsePeerInfo($peerMultiAddr).valueOr:
error "DIAL_PEER failed", error = $error
return err($error)
let conn = await ctx.myLib[].node.peerManager.dialPeer(remotePeerInfo, $protocol)
let conn = await ctx.myLib[].waku.node.peerManager.dialPeer(remotePeerInfo, $protocol)
if conn.isNone():
let msg = "failed dialing peer"
error "DIAL_PEER failed", error = msg, peerId = $remotePeerInfo.peerId
@ -64,7 +69,7 @@ proc waku_dial_peer(
return ok("")
proc waku_dial_peer_by_id(
ctx: ptr FFIContext[Waku],
ctx: ptr FFIContext[LogosDelivery],
callback: FFICallBack,
userData: pointer,
peerId: cstring,
@ -74,7 +79,7 @@ proc waku_dial_peer_by_id(
let pId = PeerId.init($peerId).valueOr:
error "DIAL_PEER_BY_ID failed", error = $error
return err($error)
let conn = await ctx.myLib[].node.peerManager.dialPeer(pId, $protocol)
let conn = await ctx.myLib[].waku.node.peerManager.dialPeer(pId, $protocol)
if conn.isNone():
let msg = "failed dialing peer"
error "DIAL_PEER_BY_ID failed", error = msg, peerId = $peerId
@ -83,12 +88,12 @@ proc waku_dial_peer_by_id(
return ok("")
proc waku_get_connected_peers_info(
ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
) {.ffi.} =
## returns a JSON string mapping peerIDs to objects with protocols and addresses
var peersMap = initTable[string, PeerInfo]()
let peers = ctx.myLib[].node.peerManager.switch.peerStore.peers().filterIt(
let peers = ctx.myLib[].waku.node.peerManager.switch.peerStore.peers().filterIt(
it.connectedness == Connected
)
@ -104,23 +109,23 @@ proc waku_get_connected_peers_info(
return ok(jsonStr)
proc waku_get_connected_peers(
ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
) {.ffi.} =
## returns a comma-separated string of peerIDs
let
(inPeerIds, outPeerIds) = ctx.myLib[].node.peerManager.connectedPeers()
(inPeerIds, outPeerIds) = ctx.myLib[].waku.node.peerManager.connectedPeers()
connectedPeerids = concat(inPeerIds, outPeerIds)
return ok(connectedPeerids.mapIt($it).join(","))
proc waku_get_peerids_by_protocol(
ctx: ptr FFIContext[Waku],
ctx: ptr FFIContext[LogosDelivery],
callback: FFICallBack,
userData: pointer,
protocol: cstring,
) {.ffi.} =
## returns a comma-separated string of peerIDs that mount the given protocol
let connectedPeers = ctx.myLib[].node.peerManager.switch.peerStore
let connectedPeers = ctx.myLib[].waku.node.peerManager.switch.peerStore
.peers($protocol)
.filterIt(it.connectedness == Connected)
.mapIt($it.peerId)

View File

@ -6,7 +6,7 @@ import
library/declare_lib
proc waku_ping_peer(
ctx: ptr FFIContext[Waku],
ctx: ptr FFIContext[LogosDelivery],
callback: FFICallBack,
userData: pointer,
peerAddr: cstring,
@ -18,12 +18,13 @@ proc waku_ping_peer(
let timeout = chronos.milliseconds(timeoutMs)
proc ping(): Future[Result[Duration, string]] {.async, gcsafe.} =
try:
let conn =
await ctx.myLib[].node.switch.dial(peerInfo.peerId, peerInfo.addrs, PingCodec)
let conn = await ctx.myLib[].waku.node.switch.dial(
peerInfo.peerId, peerInfo.addrs, PingCodec
)
defer:
await conn.close()
let pingRTT = await ctx.myLib[].node.libp2pPing.ping(conn)
let pingRTT = await ctx.myLib[].waku.node.libp2pPing.ping(conn)
if pingRTT == 0.nanos:
return err("could not ping peer: rtt-0")
return ok(pingRTT)

View File

@ -25,7 +25,7 @@ proc checkFilterClientMounted(waku: Waku): Result[string, string] =
return ok("")
proc waku_filter_subscribe(
ctx: ptr FFIContext[Waku],
ctx: ptr FFIContext[LogosDelivery],
callback: FFICallBack,
userData: pointer,
pubSubTopic: cstring,
@ -36,18 +36,18 @@ proc waku_filter_subscribe(
callEventCallback(ctx, "onReceivedMessage"):
$JsonMessageEvent.new(pubsubTopic, msg)
checkFilterClientMounted(ctx.myLib[]).isOkOr:
checkFilterClientMounted(ctx.myLib[].waku).isOkOr:
return err($error)
var filterPushEventCallback = FilterPushHandler(onReceivedMessage(ctx))
ctx.myLib[].node.wakuFilterClient.registerPushHandler(filterPushEventCallback)
ctx.myLib[].waku.node.wakuFilterClient.registerPushHandler(filterPushEventCallback)
let peer = ctx.myLib[].node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr:
let peer = ctx.myLib[].waku.node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr:
let errorMsg = "could not find peer with WakuFilterSubscribeCodec when subscribing"
error "fail filter subscribe", error = errorMsg
return err(errorMsg)
let subFut = ctx.myLib[].node.filterSubscribe(
let subFut = ctx.myLib[].waku.node.filterSubscribe(
some(PubsubTopic($pubsubTopic)),
($contentTopics).split(",").mapIt(ContentTopic(it)),
peer,
@ -61,22 +61,22 @@ proc waku_filter_subscribe(
return ok("")
proc waku_filter_unsubscribe(
ctx: ptr FFIContext[Waku],
ctx: ptr FFIContext[LogosDelivery],
callback: FFICallBack,
userData: pointer,
pubSubTopic: cstring,
contentTopics: cstring,
) {.ffi.} =
checkFilterClientMounted(ctx.myLib[]).isOkOr:
checkFilterClientMounted(ctx.myLib[].waku).isOkOr:
return err($error)
let peer = ctx.myLib[].node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr:
let peer = ctx.myLib[].waku.node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr:
let errorMsg =
"could not find peer with WakuFilterSubscribeCodec when unsubscribing"
error "fail filter process", error = errorMsg
return err(errorMsg)
let subFut = ctx.myLib[].node.filterUnsubscribe(
let subFut = ctx.myLib[].waku.node.filterUnsubscribe(
some(PubsubTopic($pubsubTopic)),
($contentTopics).split(",").mapIt(ContentTopic(it)),
peer,
@ -88,18 +88,18 @@ proc waku_filter_unsubscribe(
return ok("")
proc waku_filter_unsubscribe_all(
ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
) {.ffi.} =
checkFilterClientMounted(ctx.myLib[]).isOkOr:
checkFilterClientMounted(ctx.myLib[].waku).isOkOr:
return err($error)
let peer = ctx.myLib[].node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr:
let peer = ctx.myLib[].waku.node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr:
let errorMsg =
"could not find peer with WakuFilterSubscribeCodec when unsubscribing all"
error "fail filter unsubscribe all", error = errorMsg
return err(errorMsg)
let unsubFut = ctx.myLib[].node.filterUnsubscribeAll(peer)
let unsubFut = ctx.myLib[].waku.node.filterUnsubscribeAll(peer)
if not await unsubFut.withTimeout(FilterOpTimeout):
let errorMsg = "filter un-subscription all timed out"

View File

@ -13,13 +13,13 @@ import
library/declare_lib
proc waku_lightpush_publish(
ctx: ptr FFIContext[Waku],
ctx: ptr FFIContext[LogosDelivery],
callback: FFICallBack,
userData: pointer,
pubSubTopic: cstring,
jsonWakuMessage: cstring,
) {.ffi.} =
if ctx.myLib[].node.wakuLightpushClient.isNil():
if ctx.myLib[].waku.node.wakuLightpushClient.isNil():
let errorMsg = "LightpushRequest waku.node.wakuLightpushClient is nil"
error "PUBLISH failed", error = errorMsg
return err(errorMsg)
@ -35,14 +35,14 @@ proc waku_lightpush_publish(
let msg = json_message_event.toWakuMessage(jsonMessage).valueOr:
return err("Problem building the WakuMessage: " & $error)
let peerOpt = ctx.myLib[].node.peerManager.selectPeer(WakuLightPushCodec)
let peerOpt = ctx.myLib[].waku.node.peerManager.selectPeer(WakuLightPushCodec)
if peerOpt.isNone():
let errorMsg = "failed to lightpublish message, no suitable remote peers"
error "PUBLISH failed", error = errorMsg
return err(errorMsg)
let msgHashHex = (
await ctx.myLib[].node.wakuLegacyLightpushClient.publish(
await ctx.myLib[].waku.node.wakuLegacyLightpushClient.publish(
$pubsubTopic, msg, peer = peerOpt.get()
)
).valueOr:

View File

@ -15,54 +15,54 @@ import
library/declare_lib
proc waku_relay_get_peers_in_mesh(
ctx: ptr FFIContext[Waku],
ctx: ptr FFIContext[LogosDelivery],
callback: FFICallBack,
userData: pointer,
pubSubTopic: cstring,
) {.ffi.} =
let meshPeers = ctx.myLib[].node.wakuRelay.getPeersInMesh($pubsubTopic).valueOr:
let meshPeers = ctx.myLib[].waku.node.wakuRelay.getPeersInMesh($pubsubTopic).valueOr:
error "LIST_MESH_PEERS failed", error = error
return err($error)
## returns a comma-separated string of peerIDs
return ok(meshPeers.mapIt($it).join(","))
proc waku_relay_get_num_peers_in_mesh(
ctx: ptr FFIContext[Waku],
ctx: ptr FFIContext[LogosDelivery],
callback: FFICallBack,
userData: pointer,
pubSubTopic: cstring,
) {.ffi.} =
let numPeersInMesh = ctx.myLib[].node.wakuRelay.getNumPeersInMesh($pubsubTopic).valueOr:
let numPeersInMesh = ctx.myLib[].waku.node.wakuRelay.getNumPeersInMesh($pubsubTopic).valueOr:
error "NUM_MESH_PEERS failed", error = error
return err($error)
return ok($numPeersInMesh)
proc waku_relay_get_connected_peers(
ctx: ptr FFIContext[Waku],
ctx: ptr FFIContext[LogosDelivery],
callback: FFICallBack,
userData: pointer,
pubSubTopic: cstring,
) {.ffi.} =
## Returns the list of all connected peers to an specific pubsub topic
let connPeers = ctx.myLib[].node.wakuRelay.getConnectedPeers($pubsubTopic).valueOr:
let connPeers = ctx.myLib[].waku.node.wakuRelay.getConnectedPeers($pubsubTopic).valueOr:
error "LIST_CONNECTED_PEERS failed", error = error
return err($error)
## returns a comma-separated string of peerIDs
return ok(connPeers.mapIt($it).join(","))
proc waku_relay_get_num_connected_peers(
ctx: ptr FFIContext[Waku],
ctx: ptr FFIContext[LogosDelivery],
callback: FFICallBack,
userData: pointer,
pubSubTopic: cstring,
) {.ffi.} =
let numConnPeers = ctx.myLib[].node.wakuRelay.getNumConnectedPeers($pubsubTopic).valueOr:
let numConnPeers = ctx.myLib[].waku.node.wakuRelay.getNumConnectedPeers($pubsubTopic).valueOr:
error "NUM_CONNECTED_PEERS failed", error = error
return err($error)
return ok($numConnPeers)
proc waku_relay_add_protected_shard(
ctx: ptr FFIContext[Waku],
ctx: ptr FFIContext[LogosDelivery],
callback: FFICallBack,
userData: pointer,
clusterId: cint,
@ -73,7 +73,7 @@ proc waku_relay_add_protected_shard(
try:
let relayShard = RelayShard(clusterId: uint16(clusterId), shardId: uint16(shardId))
let protectedShard = ProtectedShard.parseCmdArg($relayShard & ":" & $publicKey)
ctx.myLib[].node.wakuRelay.addSignedShardsValidator(
ctx.myLib[].waku.node.wakuRelay.addSignedShardsValidator(
@[protectedShard], uint16(clusterId)
)
except ValueError as exc:
@ -82,20 +82,20 @@ proc waku_relay_add_protected_shard(
return ok("")
proc waku_relay_subscribe(
ctx: ptr FFIContext[Waku],
ctx: ptr FFIContext[LogosDelivery],
callback: FFICallBack,
userData: pointer,
pubSubTopic: cstring,
) {.ffi.} =
echo "Subscribing to topic: " & $pubSubTopic & " ..."
proc onReceivedMessage(ctx: ptr FFIContext[Waku]): WakuRelayHandler =
proc onReceivedMessage(ctx: ptr FFIContext[LogosDelivery]): WakuRelayHandler =
return proc(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async.} =
callEventCallback(ctx, "onReceivedMessage"):
$JsonMessageEvent.new(pubsubTopic, msg)
var cb = onReceivedMessage(ctx)
ctx.myLib[].node.subscribe(
ctx.myLib[].waku.node.subscribe(
(kind: SubscriptionKind.PubsubSub, topic: $pubsubTopic),
handler = WakuRelayHandler(cb),
).isOkOr:
@ -104,19 +104,21 @@ proc waku_relay_subscribe(
return ok("")
proc waku_relay_unsubscribe(
ctx: ptr FFIContext[Waku],
ctx: ptr FFIContext[LogosDelivery],
callback: FFICallBack,
userData: pointer,
pubSubTopic: cstring,
) {.ffi.} =
ctx.myLib[].node.unsubscribe((kind: SubscriptionKind.PubsubSub, topic: $pubsubTopic)).isOkOr:
ctx.myLib[].waku.node.unsubscribe(
(kind: SubscriptionKind.PubsubSub, topic: $pubsubTopic)
).isOkOr:
error "UNSUBSCRIBE failed", error = error
return err($error)
return ok("")
proc waku_relay_publish(
ctx: ptr FFIContext[Waku],
ctx: ptr FFIContext[LogosDelivery],
callback: FFICallBack,
userData: pointer,
pubSubTopic: cstring,
@ -136,7 +138,7 @@ proc waku_relay_publish(
let msg = json_message_event.toWakuMessage(jsonMessage).valueOr:
return err("Problem building the WakuMessage: " & $error)
(await ctx.myLib[].node.wakuRelay.publish($pubsubTopic, msg)).isOkOr:
(await ctx.myLib[].waku.node.wakuRelay.publish($pubsubTopic, msg)).isOkOr:
error "PUBLISH failed", error = error
return err($error)
@ -144,13 +146,13 @@ proc waku_relay_publish(
return ok(msgHash)
proc waku_default_pubsub_topic(
ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
) {.ffi.} =
# https://rfc.vac.dev/spec/36/#extern-char-waku_default_pubsub_topic
return ok(DefaultPubsubTopic)
proc waku_content_topic(
ctx: ptr FFIContext[Waku],
ctx: ptr FFIContext[LogosDelivery],
callback: FFICallBack,
userData: pointer,
appName: cstring,
@ -163,7 +165,7 @@ proc waku_content_topic(
return ok(fmt"/{$appName}/{$appVersion}/{$contentTopicName}/{$encoding}")
proc waku_pubsub_topic(
ctx: ptr FFIContext[Waku],
ctx: ptr FFIContext[LogosDelivery],
callback: FFICallBack,
userData: pointer,
topicName: cstring,

View File

@ -68,7 +68,7 @@ func fromJsonNode(jsonContent: JsonNode): Result[StoreQueryRequest, string] =
)
proc waku_store_query(
ctx: ptr FFIContext[Waku],
ctx: ptr FFIContext[LogosDelivery],
callback: FFICallBack,
userData: pointer,
jsonQuery: cstring,
@ -87,7 +87,7 @@ proc waku_store_query(
return err("StoreRequest failed to parse peer addr: " & $error)
let queryResponse = (
await ctx.myLib[].node.wakuStoreClient.query(storeQueryRequest, peer)
await ctx.myLib[].waku.node.wakuStoreClient.query(storeQueryRequest, peer)
).valueOr:
return err("StoreRequest failed store query: " & $error)

View File

@ -5,6 +5,7 @@ import
logos_delivery/waku/waku_core/message/message,
logos_delivery/waku/waku_core/topics/pubsub_topic,
logos_delivery/waku/waku_relay,
logos_delivery,
logos_delivery/waku/factory/waku,
logos_delivery/waku/node/waku_node,
logos_delivery/waku/node/health_monitor/health_status,
@ -46,7 +47,7 @@ proc waku_new(
return nil
## Create the Waku thread that will keep waiting for req from the main thread.
var ctx = ffi.createFFIContext[Waku]().valueOr:
var ctx = ffi.createFFIContext[LogosDelivery]().valueOr:
let msg = "Error in createFFIContext: " & $error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return nil
@ -93,7 +94,7 @@ proc waku_new(
return ctx
proc waku_destroy(
ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
): cint {.dynlib, exportc, cdecl.} =
initializeLibrary()
checkParams(ctx, callback, userData)

View File

@ -3,17 +3,17 @@ import logos_delivery/waku/factory/waku_state_info
import tools/confutils/[cli_args, config_option_meta]
proc logosdelivery_get_available_node_info_ids(
ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
) {.ffi.} =
## Returns the list of all available node info item ids that
## can be queried with `get_node_info_item`.
requireInitializedNode(ctx, "GetNodeInfoIds"):
return err(errMsg)
return ok($ctx.myLib[].stateInfo.getAllPossibleInfoItemIds())
return ok($ctx.myLib[].waku.stateInfo.getAllPossibleInfoItemIds())
proc logosdelivery_get_node_info(
ctx: ptr FFIContext[Waku],
ctx: ptr FFIContext[LogosDelivery],
callback: FFICallBack,
userData: pointer,
nodeInfoId: cstring,
@ -28,10 +28,10 @@ proc logosdelivery_get_node_info(
except ValueError:
return err("Invalid node info id: " & $nodeInfoId)
return ok(ctx.myLib[].stateInfo.getNodeInfoItem(infoItemIdEnum))
return ok(ctx.myLib[].waku.stateInfo.getNodeInfoItem(infoItemIdEnum))
proc logosdelivery_get_available_configs(
ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
) {.ffi.} =
## Returns information about the accepted config items.
requireInitializedNode(ctx, "GetAvailableConfigs"):

View File

@ -9,7 +9,7 @@ import
../declare_lib
proc logosdelivery_subscribe(
ctx: ptr FFIContext[Waku],
ctx: ptr FFIContext[LogosDelivery],
callback: FFICallBack,
userData: pointer,
contentTopicStr: cstring,
@ -20,14 +20,14 @@ proc logosdelivery_subscribe(
# ContentTopic is just a string type alias
let contentTopic = ContentTopic($contentTopicStr)
(await api.subscribe(ctx.myLib[], contentTopic)).isOkOr:
(await api.subscribe(ctx.myLib[].waku, contentTopic)).isOkOr:
let errMsg = $error
return err("Subscribe failed: " & errMsg)
return ok("")
proc logosdelivery_unsubscribe(
ctx: ptr FFIContext[Waku],
ctx: ptr FFIContext[LogosDelivery],
callback: FFICallBack,
userData: pointer,
contentTopicStr: cstring,
@ -38,14 +38,14 @@ proc logosdelivery_unsubscribe(
# ContentTopic is just a string type alias
let contentTopic = ContentTopic($contentTopicStr)
api.unsubscribe(ctx.myLib[], contentTopic).isOkOr:
api.unsubscribe(ctx.myLib[].waku, contentTopic).isOkOr:
let errMsg = $error
return err("Unsubscribe failed: " & errMsg)
return ok("")
proc logosdelivery_send(
ctx: ptr FFIContext[Waku],
ctx: ptr FFIContext[LogosDelivery],
callback: FFICallBack,
userData: pointer,
messageJson: cstring,
@ -83,8 +83,8 @@ proc logosdelivery_send(
contentTopic = contentTopic, payload = payload, ephemeral = ephemeral
)
# Send the message
let requestId = (await api.send(ctx.myLib[], envelope)).valueOr:
# Send the message via the messaging layer's own API.
let requestId = (await ctx.myLib[].messagingClient.send(envelope)).valueOr:
let errMsg = $error
return err("Send failed: " & errMsg)

View File

@ -1,7 +1,7 @@
import std/json
import chronos, chronicles, results, ffi
import
logos_delivery/waku/factory/waku,
logos_delivery,
logos_delivery/waku/node/waku_node,
logos_delivery/waku/api/[api, types],
logos_delivery/waku/events/[message_events, health_events],
@ -13,14 +13,14 @@ import
proc `%`*(id: RequestId): JsonNode =
%($id)
registerReqFFI(CreateNodeRequest, ctx: ptr FFIContext[Waku]):
registerReqFFI(CreateNodeRequest, ctx: ptr FFIContext[LogosDelivery]):
proc(configJson: cstring): Future[Result[string, string]] {.async.} =
let conf = parseNodeConfFromJson($configJson).valueOr:
error "Failed to assemble WakuNodeConf from JSON",
error = error, configJson = $configJson
return err("failed parseNodeConfFromJson " & error)
ctx.myLib[] = (await api.createNode(conf)).valueOr:
ctx.myLib[] = (await LogosDelivery.new(conf)).valueOr:
let errMsg = $error
chronicles.error "CreateNodeRequest failed", err = errMsg
return err(errMsg)
@ -28,7 +28,7 @@ registerReqFFI(CreateNodeRequest, ctx: ptr FFIContext[Waku]):
return ok("")
proc logosdelivery_destroy(
ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
): cint {.dynlib, exportc, cdecl.} =
initializeLibrary()
checkParams(ctx, callback, userData)
@ -52,7 +52,7 @@ proc logosdelivery_create_node(
echo "error: missing callback in logosdelivery_create_node"
return nil
var ctx = ffi.createFFIContext[Waku]().valueOr:
var ctx = ffi.createFFIContext[LogosDelivery]().valueOr:
let msg = "Error in createFFIContext: " & $error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return nil
@ -73,14 +73,14 @@ proc logosdelivery_create_node(
return ctx
proc logosdelivery_start_node(
ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
) {.ffi.} =
requireInitializedNode(ctx, "START_NODE"):
return err(errMsg)
# setting up outgoing event listeners
let sentListener = MessageSentEvent.listen(
ctx.myLib[].brokerCtx,
ctx.myLib[].waku.brokerCtx,
proc(event: MessageSentEvent) {.async: (raises: []).} =
callEventCallback(ctx, "onMessageSent"):
$newJsonEvent("message_sent", event),
@ -89,7 +89,7 @@ proc logosdelivery_start_node(
return err("MessageSentEvent.listen failed: " & $error)
let errorListener = MessageErrorEvent.listen(
ctx.myLib[].brokerCtx,
ctx.myLib[].waku.brokerCtx,
proc(event: MessageErrorEvent) {.async: (raises: []).} =
callEventCallback(ctx, "onMessageError"):
$newJsonEvent("message_error", event),
@ -98,7 +98,7 @@ proc logosdelivery_start_node(
return err("MessageErrorEvent.listen failed: " & $error)
let propagatedListener = MessagePropagatedEvent.listen(
ctx.myLib[].brokerCtx,
ctx.myLib[].waku.brokerCtx,
proc(event: MessagePropagatedEvent) {.async: (raises: []).} =
callEventCallback(ctx, "onMessagePropagated"):
$newJsonEvent("message_propagated", event),
@ -107,7 +107,7 @@ proc logosdelivery_start_node(
return err("MessagePropagatedEvent.listen failed: " & $error)
let receivedListener = MessageReceivedEvent.listen(
ctx.myLib[].brokerCtx,
ctx.myLib[].waku.brokerCtx,
proc(event: MessageReceivedEvent) {.async: (raises: []).} =
callEventCallback(ctx, "onMessageReceived"):
$newJsonEvent("message_received", event),
@ -116,7 +116,7 @@ proc logosdelivery_start_node(
return err("MessageReceivedEvent.listen failed: " & $error)
let ConnectionStatusChangeListener = EventConnectionStatusChange.listen(
ctx.myLib[].brokerCtx,
ctx.myLib[].waku.brokerCtx,
proc(event: EventConnectionStatusChange) {.async: (raises: []).} =
callEventCallback(ctx, "onConnectionStatusChange"):
$newJsonEvent("connection_status_change", event),
@ -124,16 +124,6 @@ proc logosdelivery_start_node(
chronicles.error "ConnectionStatusChange.listen failed", err = $error
return err("ConnectionStatusChange.listen failed: " & $error)
ctx.myLib[].mountMessagingClient().isOkOr:
let errMsg = $error
chronicles.error "mountMessagingClient failed", error = errMsg
return err("failed to mount messaging: " & errMsg)
ctx.myLib[].mountReliableChannelManager().isOkOr:
let errMsg = $error
chronicles.error "mountReliableChannelManager failed", err = errMsg
return err("failed to mount reliable channel manager: " & errMsg)
(await ctx.myLib[].start()).isOkOr:
let errMsg = $error
chronicles.error "START_NODE failed", err = errMsg
@ -141,16 +131,16 @@ proc logosdelivery_start_node(
return ok("")
proc logosdelivery_stop_node(
ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
) {.ffi.} =
requireInitializedNode(ctx, "STOP_NODE"):
return err(errMsg)
await MessageErrorEvent.dropAllListeners(ctx.myLib[].brokerCtx)
await MessageSentEvent.dropAllListeners(ctx.myLib[].brokerCtx)
await MessagePropagatedEvent.dropAllListeners(ctx.myLib[].brokerCtx)
await MessageReceivedEvent.dropAllListeners(ctx.myLib[].brokerCtx)
await EventConnectionStatusChange.dropAllListeners(ctx.myLib[].brokerCtx)
await MessageErrorEvent.dropAllListeners(ctx.myLib[].waku.brokerCtx)
await MessageSentEvent.dropAllListeners(ctx.myLib[].waku.brokerCtx)
await MessagePropagatedEvent.dropAllListeners(ctx.myLib[].waku.brokerCtx)
await MessageReceivedEvent.dropAllListeners(ctx.myLib[].waku.brokerCtx)
await EventConnectionStatusChange.dropAllListeners(ctx.myLib[].waku.brokerCtx)
(await ctx.myLib[].stop()).isOkOr:
let errMsg = $error

View File

@ -1,10 +1,15 @@
## Main module for using nwaku as a Nimble library
## Package entry point for using Logos Messaging as a Nimble library.
##
## This module re-exports the public API for creating and managing Waku nodes
## when using nwaku as a library dependency.
## This root module is a thin aggregator, following the standard Nimble layout:
## the implementation lives under `./logos_delivery/`, and importing the package
## name re-exports `LogosDelivery` together with every per-layer public API.
##
## See `logos_delivery/logos_delivery.nim` for `LogosDelivery`, the pure
## concentrator that owns one instance of each API layer
##
## Waku <- MessagingClient <- ReliableChannelManager
##
## and drives their shared `new` / `start` / `stop` lifecycle.
import logos_delivery/waku/api
export api
import logos_delivery/waku/factory/waku
export waku
import ./logos_delivery/logos_delivery as logos_delivery_impl
export logos_delivery_impl

View File

@ -15,6 +15,7 @@ import brokers/broker_context
import logos_delivery/waku/events/message_events as waku_message_events
import logos_delivery/messaging/messaging_client
import logos_delivery/waku/api/types
import logos_delivery/waku/waku_core/topics
import logos_delivery/waku/persistency/sds_persistency
@ -27,30 +28,43 @@ const SdsJobId = "sds"
## One persistency job shared by every channel's SDS state; rows are
## keyed by channelId.
type ReliableChannelManager* = ref object
channels: Table[ChannelId, ReliableChannel]
messagingClient: MessagingClient ## Borrowed from the owning `Waku`.
sendHandler: SendHandler
## Default egress dispatch for channels created through this manager.
## Constructed at mount time as a closure over `MessagingClient.send`
## so the channel layer itself stays callable-only.
brokerCtx: BrokerContext
type
ReliableChannelManagerConf* = object
## Per-layer config object for the reliable
## channel API. Placeholder for now (segmentation / SDS / rate-limit defaults
## will move here in a follow-up PR); kept so each layer owns its own config.
ReliableChannelManager* = ref object
channels: Table[ChannelId, ReliableChannel]
messagingClient: MessagingClient ## The channel layer chains onto messaging.
sendHandler: SendHandler
## Default egress dispatch for channels created through this manager.
## Built in `new` as a closure over `MessagingClient.send` so the channel
## layer itself stays callable-only.
brokerCtx: BrokerContext
proc new*(
T: type ReliableChannelManager,
conf: ReliableChannelManagerConf,
messagingClient: MessagingClient,
sendHandler: SendHandler,
brokerCtx: BrokerContext = globalBrokerContext(),
): Result[T, string] =
## The reliable channel layer chains onto the messaging layer: its default
## egress is `MessagingClient.send`, wrapped here so callers never wire the
## handler themselves.
if messagingClient.isNil():
return err("messaging client is required")
if sendHandler.isNil():
return err("sendHandler is required")
let defaultSendHandler: SendHandler = proc(
envelope: MessageEnvelope
): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} =
return await messagingClient.send(envelope)
return ok(
T(
channels: initTable[ChannelId, ReliableChannel](),
messagingClient: messagingClient,
sendHandler: sendHandler,
sendHandler: defaultSendHandler,
brokerCtx: brokerCtx,
)
)

View File

@ -0,0 +1,104 @@
## `LogosDelivery` is the project entry point. It is a pure concentrator: it
## owns exactly one instance of each API layer
##
## Waku <- MessagingClient <- ReliableChannelManager
##
## and chains them together (each layer drives the one below it). Every layer
## keeps its own, separate public API — `LogosDelivery` only wires them up and
## drives the shared `new` / `start` / `stop` lifecycle.
{.push raises: [].}
import results, chronos, chronicles
import logos_delivery/waku/api
export api
import logos_delivery/waku/factory/waku
export waku
import logos_delivery/messaging/messaging_client
export messaging_client
import logos_delivery/channels/reliable_channel_manager
export reliable_channel_manager
import logos_delivery/waku/factory/waku_conf
import logos_delivery/waku/factory/app_callbacks
import logos_delivery/waku/api/[api_conf, types]
logScope:
topics = "logosdelivery"
type
LogosDeliveryConf* = object
## Aggregates the per-layer config objects. For now
## the sub-configs are derived from `WakuConf`; richer per-layer configuration
## (and how it is sourced) lands in a follow-up PR.
waku*: WakuConf
messaging*: MessagingClientConf
reliableChannel*: ReliableChannelManagerConf
LogosDelivery* = ref object ## Entry point. Holds one instance of each API layer.
waku*: Waku
messagingClient*: MessagingClient
reliableChannelManager*: ReliableChannelManager
proc init*(T: type LogosDeliveryConf, wakuConf: WakuConf): LogosDeliveryConf =
## Builds the aggregated config from a `WakuConf`. The messaging / reliable
## channel layers carry trivial config today; this is the seam where their
## dedicated config will be threaded through later.
LogosDeliveryConf(
waku: wakuConf,
messaging: MessagingClientConf(useP2PReliability: wakuConf.p2pReliability),
reliableChannel: ReliableChannelManagerConf(),
)
proc new*(
T: type LogosDelivery, conf: WakuNodeConf, appCallbacks: AppCallbacks = nil
): Future[Result[LogosDelivery, string]] {.async.} =
## Single entry point, from the CLI configuration type. Derives the aggregated
## per-layer config, then creates the full stack bottom-up so each layer can
## chain onto the one below.
let wakuConf = conf.toWakuConf().valueOr:
return err("failed to handle the configuration: " & error)
let layerConf = LogosDeliveryConf.init(wakuConf)
let waku = (await Waku.new(layerConf.waku, appCallbacks)).valueOr:
return err("failed to create Waku: " & error)
let messagingClient = MessagingClient.new(layerConf.messaging, waku.node).valueOr:
return err("failed to create MessagingClient: " & error)
let reliableChannelManager = ReliableChannelManager.new(
layerConf.reliableChannel, messagingClient, waku.brokerCtx
).valueOr:
return err("failed to create ReliableChannelManager: " & error)
return ok(
T(
waku: waku,
messagingClient: messagingClient,
reliableChannelManager: reliableChannelManager,
)
)
proc start*(self: LogosDelivery): Future[Result[void, string]] {.async.} =
## Starts each layer bottom-up: transport first, then messaging, then channels.
(await self.waku.start()).isOkOr:
return err("failed to start Waku: " & error)
self.messagingClient.start().isOkOr:
return err("failed to start MessagingClient: " & error)
self.reliableChannelManager.start().isOkOr:
return err("failed to start ReliableChannelManager: " & error)
return ok()
proc stop*(self: LogosDelivery): Future[Result[void, string]] {.async.} =
## Stops in reverse order so higher layers drain before their dependencies.
await self.reliableChannelManager.stop()
await self.messagingClient.stop()
(await self.waku.stop()).isOkOr:
return err("failed to stop Waku: " & error)
return ok()

View File

@ -6,16 +6,25 @@ import
logos_delivery/messaging/delivery_service/[recv_service, send_service],
logos_delivery/messaging/delivery_service/send_service/delivery_task
type MessagingClient* = ref object
node: WakuNode
sendService*: SendService
recvService*: RecvService
started: bool
type
MessagingClientConf* = object
## Per-layer config object for the messaging API.
## Kept intentionally minimal for now; the full config surface lands in a
## follow-up PR. Today it only carries the p2p reliability toggle.
useP2PReliability*: bool
MessagingClient* = ref object
node: WakuNode
sendService*: SendService
recvService*: RecvService
started: bool
proc new*(
T: type MessagingClient, useP2PReliability: bool, node: WakuNode
T: type MessagingClient, conf: MessagingClientConf, node: WakuNode
): Result[T, string] =
let sendService = ?SendService.new(useP2PReliability, node)
## The messaging layer chains onto Waku: it drives the underlying
## `WakuNode` (Waku's core) for transport while exposing its own send/recv API.
let sendService = ?SendService.new(conf.useP2PReliability, node)
let recvService = RecvService.new(node)
ok(T(node: node, sendService: sendService, recvService: recvService))

View File

@ -4,9 +4,7 @@ import std/[net, options]
import chronicles, chronos, libp2p/peerid, results
import logos_delivery/waku/factory/waku
import logos_delivery/messaging/messaging_client
import logos_delivery/waku/[requests/health_requests, waku_core, waku_node]
import logos_delivery/messaging/delivery_service/send_service
import logos_delivery/waku/node/subscription_manager
import libp2p/peerid
import tools/confutils/cli_args
@ -48,9 +46,3 @@ proc unsubscribe*(w: Waku, contentTopic: ContentTopic): Result[void, string] =
?checkApiAvailability(w)
return w.node.subscriptionManager.unsubscribe(contentTopic)
proc send*(
w: Waku, envelope: MessageEnvelope
): Future[Result[RequestId, string]] {.async.} =
?checkApiAvailability(w)
return await w.messagingClient.send(envelope)

View File

@ -49,8 +49,6 @@ import
factory/app_callbacks,
persistency/persistency,
],
logos_delivery/channels/reliable_channel_manager,
logos_delivery/messaging/messaging_client,
./waku_conf,
./waku_state_info
@ -76,10 +74,6 @@ type Waku* = ref object
healthMonitor*: NodeHealthMonitor
messagingClient*: MessagingClient
reliableChannelManager*: ReliableChannelManager
restServer*: WakuRestServerRef
metricsServer*: MetricsHttpServerRef
appCallbacks*: AppCallbacks
@ -384,35 +378,6 @@ proc startDnsDiscoveryRetryLoop(waku: Waku): Future[void] {.async.} =
error "failed to connect to dynamic bootstrap nodes: " & getCurrentExceptionMsg()
return
proc mountMessagingClient*(waku: Waku): Result[void, string] =
if not waku.messagingClient.isNil():
return err("messaging client already mounted")
if waku.node.started:
return err("cannot mount messaging client on a started node")
waku.messagingClient = MessagingClient.new(waku.conf.p2pReliability, waku.node).valueOr:
return err("could not create messaging client: " & $error)
return ok()
proc mountReliableChannelManager*(waku: Waku): Result[void, string] =
if not waku.reliableChannelManager.isNil():
return err("reliable channel manager already mounted")
if waku.messagingClient.isNil():
return err("reliable channel manager requires a mounted messaging client")
if waku.node.started:
return err("cannot mount reliable channel manager on a started node")
let messagingClient = waku.messagingClient
let defaultSendHandler: SendHandler = proc(
envelope: MessageEnvelope
): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} =
return await messagingClient.send(envelope)
waku.reliableChannelManager = ReliableChannelManager.new(
messagingClient, defaultSendHandler, waku.brokerCtx
).valueOr:
return err("could not create reliable channel manager: " & $error)
return ok()
proc start*(waku: Waku): Future[Result[void, string]] {.async: (raises: []).} =
if waku.node.started:
warn "start: waku node already started"
@ -565,14 +530,6 @@ proc start*(waku: Waku): Future[Result[void, string]] {.async: (raises: []).} =
)
waku.healthMonitor.setOverallHealth(HealthStatus.READY)
if not waku.messagingClient.isNil():
waku.messagingClient.start().isOkOr:
return err("failed to start messaging client: " & $error)
if not waku.reliableChannelManager.isNil():
waku.reliableChannelManager.start().isOkOr:
return err("failed to start reliable channel manager: " & $error)
return ok()
proc stop*(waku: Waku): Future[Result[void, string]] {.async: (raises: []).} =
@ -590,12 +547,6 @@ proc stop*(waku: Waku): Future[Result[void, string]] {.async: (raises: []).} =
if not waku.wakuDiscv5.isNil():
await waku.wakuDiscv5.stop()
if not waku.reliableChannelManager.isNil():
await waku.reliableChannelManager.stop()
if not waku.messagingClient.isNil():
await waku.messagingClient.stop()
if not waku.node.isNil():
await waku.node.stop()

View File

@ -74,7 +74,7 @@ proc waitForShardHealthy(
suite "LM API health checking":
var
serviceNode {.threadvar.}: WakuNode
client {.threadvar.}: Waku
client {.threadvar.}: LogosDelivery
servicePeerInfo {.threadvar.}: RemotePeerInfo
asyncSetup:
@ -102,9 +102,7 @@ suite "LM API health checking":
conf.numShardsInNetwork = 1
conf.rest = false
client = (await createNode(conf)).valueOr:
raiseAssert error
client.mountMessagingClient().isOkOr:
client = (await LogosDelivery.new(conf)).valueOr:
raiseAssert error
(await client.start()).isOkOr:
raiseAssert error
@ -114,13 +112,13 @@ suite "LM API health checking":
await serviceNode.stop()
asyncTest "RequestShardTopicsHealth, check PubsubTopic health":
client.node.wakuRelay.subscribe(DefaultShard, dummyHandler)
await client.node.connectToNodes(@[servicePeerInfo])
client.waku.node.wakuRelay.subscribe(DefaultShard, dummyHandler)
await client.waku.node.connectToNodes(@[servicePeerInfo])
var isHealthy = false
let start = Moment.now()
while Moment.now() - start < TestTimeout:
let req = RequestShardTopicsHealth.request(client.brokerCtx, @[DefaultShard]).valueOr:
let req = RequestShardTopicsHealth.request(client.waku.brokerCtx, @[DefaultShard]).valueOr:
raiseAssert "RequestShardTopicsHealth failed"
if req.topicHealth.len > 0:
@ -134,22 +132,22 @@ suite "LM API health checking":
asyncTest "RequestShardTopicsHealth, check disconnected PubsubTopic":
const GhostShard = PubsubTopic("/waku/2/rs/1/666")
client.node.wakuRelay.subscribe(GhostShard, dummyHandler)
client.waku.node.wakuRelay.subscribe(GhostShard, dummyHandler)
let req = RequestShardTopicsHealth.request(client.brokerCtx, @[GhostShard]).valueOr:
let req = RequestShardTopicsHealth.request(client.waku.brokerCtx, @[GhostShard]).valueOr:
raiseAssert "Request failed"
check req.topicHealth.len > 0
check req.topicHealth[0].health == TopicHealth.UNHEALTHY
asyncTest "RequestProtocolHealth, check relay status":
await client.node.connectToNodes(@[servicePeerInfo])
await client.waku.node.connectToNodes(@[servicePeerInfo])
var isReady = false
let start = Moment.now()
while Moment.now() - start < TestTimeout:
let relayReq = await RequestProtocolHealth.request(
client.brokerCtx, WakuProtocol.RelayProtocol
client.waku.brokerCtx, WakuProtocol.RelayProtocol
)
if relayReq.isOk() and relayReq.get().healthStatus.health == HealthStatus.READY:
isReady = true
@ -158,14 +156,16 @@ suite "LM API health checking":
check isReady == true
let storeReq =
await RequestProtocolHealth.request(client.brokerCtx, WakuProtocol.StoreProtocol)
let storeReq = await RequestProtocolHealth.request(
client.waku.brokerCtx, WakuProtocol.StoreProtocol
)
if storeReq.isOk():
check storeReq.get().healthStatus.health != HealthStatus.READY
asyncTest "RequestProtocolHealth, check unmounted protocol":
let req =
await RequestProtocolHealth.request(client.brokerCtx, WakuProtocol.StoreProtocol)
let req = await RequestProtocolHealth.request(
client.waku.brokerCtx, WakuProtocol.StoreProtocol
)
check req.isOk()
let status = req.get().healthStatus
@ -173,16 +173,16 @@ suite "LM API health checking":
check status.desc.isNone()
asyncTest "RequestConnectionStatus, check connectivity state":
let initialReq = RequestConnectionStatus.request(client.brokerCtx).valueOr:
let initialReq = RequestConnectionStatus.request(client.waku.brokerCtx).valueOr:
raiseAssert "RequestConnectionStatus failed"
check initialReq.connectionStatus == ConnectionStatus.Disconnected
await client.node.connectToNodes(@[servicePeerInfo])
await client.waku.node.connectToNodes(@[servicePeerInfo])
var isConnected = false
let start = Moment.now()
while Moment.now() - start < TestTimeout:
let req = RequestConnectionStatus.request(client.brokerCtx).valueOr:
let req = RequestConnectionStatus.request(client.waku.brokerCtx).valueOr:
raiseAssert "RequestConnectionStatus failed"
if req.connectionStatus == ConnectionStatus.PartiallyConnected or
@ -194,29 +194,30 @@ suite "LM API health checking":
check isConnected == true
asyncTest "EventConnectionStatusChange, detect connect and disconnect":
let connectFuture =
waitForConnectionStatus(client.brokerCtx, ConnectionStatus.PartiallyConnected)
let connectFuture = waitForConnectionStatus(
client.waku.brokerCtx, ConnectionStatus.PartiallyConnected
)
await client.node.connectToNodes(@[servicePeerInfo])
await client.waku.node.connectToNodes(@[servicePeerInfo])
await connectFuture
let disconnectFuture =
waitForConnectionStatus(client.brokerCtx, ConnectionStatus.Disconnected)
await client.node.disconnectNode(servicePeerInfo)
waitForConnectionStatus(client.waku.brokerCtx, ConnectionStatus.Disconnected)
await client.waku.node.disconnectNode(servicePeerInfo)
await disconnectFuture
asyncTest "EventShardTopicHealthChange, detect health improvement":
client.node.wakuRelay.subscribe(DefaultShard, dummyHandler)
client.waku.node.wakuRelay.subscribe(DefaultShard, dummyHandler)
let healthEventFuture = waitForShardHealthy(client.brokerCtx)
let healthEventFuture = waitForShardHealthy(client.waku.brokerCtx)
await client.node.connectToNodes(@[servicePeerInfo])
await client.waku.node.connectToNodes(@[servicePeerInfo])
let event = await healthEventFuture
check event.topic == DefaultShard
asyncTest "RequestHealthReport, check aggregate report":
let req = await RequestHealthReport.request(client.brokerCtx)
let req = await RequestHealthReport.request(client.waku.brokerCtx)
check req.isOk()
@ -228,7 +229,8 @@ suite "LM API health checking":
asyncTest "RequestContentTopicsHealth, smoke test":
let fictionalTopic = ContentTopic("/waku/2/this-does-not-exist/proto")
let req = RequestContentTopicsHealth.request(client.brokerCtx, @[fictionalTopic])
let req =
RequestContentTopicsHealth.request(client.waku.brokerCtx, @[fictionalTopic])
check req.isOk()
@ -241,20 +243,20 @@ suite "LM API health checking":
let cTopic = ContentTopic("/waku/2/my-content-topic/proto")
let shardReq =
RequestRelayShard.request(client.brokerCtx, none(PubsubTopic), cTopic)
RequestRelayShard.request(client.waku.brokerCtx, none(PubsubTopic), cTopic)
check shardReq.isOk()
let targetShard = $shardReq.get().relayShard
client.node.wakuRelay.subscribe(targetShard, dummyHandler)
client.waku.node.wakuRelay.subscribe(targetShard, dummyHandler)
serviceNode.wakuRelay.subscribe(targetShard, dummyHandler)
await client.node.connectToNodes(@[servicePeerInfo])
await client.waku.node.connectToNodes(@[servicePeerInfo])
var isHealthy = false
let start = Moment.now()
while Moment.now() - start < TestTimeout:
let req = RequestContentTopicsHealth.request(client.brokerCtx, @[cTopic]).valueOr:
let req = RequestContentTopicsHealth.request(client.waku.brokerCtx, @[cTopic]).valueOr:
raiseAssert "Request failed"
if req.contentTopicHealth.len > 0:
@ -268,7 +270,7 @@ suite "LM API health checking":
check isHealthy == true
asyncTest "RequestProtocolHealth, edge mode smoke test":
var edgeWaku: Waku
var edgeWaku: LogosDelivery
lockNewGlobalBrokerContext:
var edgeConf = defaultWakuNodeConf().valueOr:
@ -281,20 +283,18 @@ suite "LM API health checking":
edgeConf.maxMessageSize = "150 KiB"
edgeConf.rest = false
edgeWaku = (await createNode(edgeConf)).valueOr:
edgeWaku = (await LogosDelivery.new(edgeConf)).valueOr:
raiseAssert "Failed to create edge node: " & error
edgeWaku.mountMessagingClient().isOkOr:
raiseAssert "Failed to mount edge messaging: " & error
(await edgeWaku.start()).isOkOr:
raiseAssert "Failed to start edge waku: " & error
let relayReq = await RequestProtocolHealth.request(
edgeWaku.brokerCtx, WakuProtocol.RelayProtocol
edgeWaku.waku.brokerCtx, WakuProtocol.RelayProtocol
)
check relayReq.isOk()
check relayReq.get().healthStatus.health == HealthStatus.NOT_MOUNTED
check not edgeWaku.node.wakuFilterClient.isNil()
check not edgeWaku.waku.node.wakuFilterClient.isNil()
discard await edgeWaku.stop()

View File

@ -98,7 +98,7 @@ proc createApiNodeConf(numShards: uint16 = 1): WakuNodeConf =
type TestNetwork = ref object
storeNode: WakuNode
publisher: WakuNode
subscriber: Waku
subscriber: LogosDelivery
storeNodePeerInfo: RemotePeerInfo
missedPayload: seq[byte]
@ -158,12 +158,11 @@ proc setupNetwork(testTopic: ContentTopic): Future[TestNetwork] {.async.} =
# subscriber: created before the publish so the message timestamp lands after
# its RecvService startTimeToCheck watermark
var subscriber: Waku
var subscriber: LogosDelivery
lockNewGlobalBrokerContext:
subscriber = (await createNode(createApiNodeConf(numShards))).expect(
subscriber = (await LogosDelivery.new(createApiNodeConf(numShards))).expect(
"Failed to create subscriber"
)
subscriber.mountMessagingClient().expect("Failed to mount messaging")
(await subscriber.start()).expect("Failed to start subscriber")
# publish while the subscriber is offline: the message reaches the archive but
@ -188,7 +187,7 @@ proc setupNetwork(testTopic: ContentTopic): Future[TestNetwork] {.async.} =
raiseAssert "Message was not archived in time"
# subscribe to the content topic; with no peers yet the subscriber stays offline
(await subscriber.subscribe(testTopic)).expect("Failed to subscribe")
(await subscriber.waku.subscribe(testTopic)).expect("Failed to subscribe")
return TestNetwork(
storeNode: storeNode,
@ -217,11 +216,11 @@ suite "Messaging API, Receive Service (store recovery)":
defer:
await net.teardown()
let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1)
let eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1)
defer:
await eventManager.teardown()
await net.subscriber.node.connectToNodes(@[net.storeNodePeerInfo])
await net.subscriber.waku.node.connectToNodes(@[net.storeNodePeerInfo])
await net.subscriber.messagingClient.recvService.checkStore()
check await eventManager.waitForEvents(TestTimeout)
@ -236,15 +235,15 @@ suite "Messaging API, Receive Service (store recovery)":
defer:
await net.teardown()
let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1)
let eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1)
defer:
await eventManager.teardown()
# sync on coming online (the transition that fires the backfill) before asserting
let onlineFut = waitForConnectionStatus(
net.subscriber.brokerCtx, ConnectionStatus.PartiallyConnected
net.subscriber.waku.brokerCtx, ConnectionStatus.PartiallyConnected
)
await net.subscriber.node.connectToNodes(@[net.storeNodePeerInfo])
await net.subscriber.waku.node.connectToNodes(@[net.storeNodePeerInfo])
await onlineFut
check await eventManager.waitForEvents(TestTimeout)

View File

@ -237,12 +237,10 @@ suite "Waku API - Send":
)
asyncTest "Check API availability (unhealthy node)":
var node: Waku
var node: LogosDelivery
lockNewGlobalBrokerContext:
node = (await createNode(createApiNodeConf())).valueOr:
node = (await LogosDelivery.new(createApiNodeConf())).valueOr:
raiseAssert error
node.mountMessagingClient().isOkOr:
raiseAssert "Failed to mount messaging: " & error
(await node.start()).isOkOr:
raiseAssert "Failed to start Waku node: " & error
# node is not connected !
@ -251,7 +249,7 @@ suite "Waku API - Send":
ContentTopic("/waku/2/default-content/proto"), "test payload"
)
let sendResult = await node.send(envelope)
let sendResult = await node.messagingClient.send(envelope)
# TODO: The API is not enforcing a health check before the send,
# so currently this test cannot successfully fail to send.
@ -261,20 +259,18 @@ suite "Waku API - Send":
raiseAssert "Failed to stop node: " & error
asyncTest "Send fully validated":
var node: Waku
var node: LogosDelivery
lockNewGlobalBrokerContext:
node = (await createNode(createApiNodeConf())).valueOr:
node = (await LogosDelivery.new(createApiNodeConf())).valueOr:
raiseAssert error
node.mountMessagingClient().isOkOr:
raiseAssert "Failed to mount messaging: " & error
(await node.start()).isOkOr:
raiseAssert "Failed to start Waku node: " & error
await node.node.connectToNodes(
await node.waku.node.connectToNodes(
@[relayNode1PeerInfo, lightpushNodePeerInfo, storeNodePeerInfo]
)
let eventManager = newSendEventListenerManager(node.brokerCtx)
let eventManager = newSendEventListenerManager(node.waku.brokerCtx)
defer:
await eventManager.teardown()
@ -282,7 +278,7 @@ suite "Waku API - Send":
ContentTopic("/waku/2/default-content/proto"), "test payload"
)
let requestId = (await node.send(envelope)).valueOr:
let requestId = (await node.messagingClient.send(envelope)).valueOr:
raiseAssert error
# Wait for events with timeout
@ -297,18 +293,16 @@ suite "Waku API - Send":
raiseAssert "Failed to stop node: " & error
asyncTest "Send only propagates":
var node: Waku
var node: LogosDelivery
lockNewGlobalBrokerContext:
node = (await createNode(createApiNodeConf())).valueOr:
node = (await LogosDelivery.new(createApiNodeConf())).valueOr:
raiseAssert error
node.mountMessagingClient().isOkOr:
raiseAssert "Failed to mount messaging: " & error
(await node.start()).isOkOr:
raiseAssert "Failed to start Waku node: " & error
await node.node.connectToNodes(@[relayNode1PeerInfo])
await node.waku.node.connectToNodes(@[relayNode1PeerInfo])
let eventManager = newSendEventListenerManager(node.brokerCtx)
let eventManager = newSendEventListenerManager(node.waku.brokerCtx)
defer:
await eventManager.teardown()
@ -316,7 +310,7 @@ suite "Waku API - Send":
ContentTopic("/waku/2/default-content/proto"), "test payload"
)
let requestId = (await node.send(envelope)).valueOr:
let requestId = (await node.messagingClient.send(envelope)).valueOr:
raiseAssert error
# Wait for events with timeout
@ -329,18 +323,16 @@ suite "Waku API - Send":
raiseAssert "Failed to stop node: " & error
asyncTest "Send only propagates fallback to lightpush":
var node: Waku
var node: LogosDelivery
lockNewGlobalBrokerContext:
node = (await createNode(createApiNodeConf())).valueOr:
node = (await LogosDelivery.new(createApiNodeConf())).valueOr:
raiseAssert error
node.mountMessagingClient().isOkOr:
raiseAssert "Failed to mount messaging: " & error
(await node.start()).isOkOr:
raiseAssert "Failed to start Waku node: " & error
await node.node.connectToNodes(@[lightpushNodePeerInfo])
await node.waku.node.connectToNodes(@[lightpushNodePeerInfo])
let eventManager = newSendEventListenerManager(node.brokerCtx)
let eventManager = newSendEventListenerManager(node.waku.brokerCtx)
defer:
await eventManager.teardown()
@ -348,7 +340,7 @@ suite "Waku API - Send":
ContentTopic("/waku/2/default-content/proto"), "test payload"
)
let requestId = (await node.send(envelope)).valueOr:
let requestId = (await node.messagingClient.send(envelope)).valueOr:
raiseAssert error
# Wait for events with timeout
@ -361,18 +353,16 @@ suite "Waku API - Send":
raiseAssert "Failed to stop node: " & error
asyncTest "Send fully validates fallback to lightpush":
var node: Waku
var node: LogosDelivery
lockNewGlobalBrokerContext:
node = (await createNode(createApiNodeConf())).valueOr:
node = (await LogosDelivery.new(createApiNodeConf())).valueOr:
raiseAssert error
node.mountMessagingClient().isOkOr:
raiseAssert "Failed to mount messaging: " & error
(await node.start()).isOkOr:
raiseAssert "Failed to start Waku node: " & error
await node.node.connectToNodes(@[lightpushNodePeerInfo, storeNodePeerInfo])
await node.waku.node.connectToNodes(@[lightpushNodePeerInfo, storeNodePeerInfo])
let eventManager = newSendEventListenerManager(node.brokerCtx)
let eventManager = newSendEventListenerManager(node.waku.brokerCtx)
defer:
await eventManager.teardown()
@ -380,7 +370,7 @@ suite "Waku API - Send":
ContentTopic("/waku/2/default-content/proto"), "test payload"
)
let requestId = (await node.send(envelope)).valueOr:
let requestId = (await node.messagingClient.send(envelope)).valueOr:
raiseAssert error
# Wait for events with timeout
@ -417,18 +407,16 @@ suite "Waku API - Send":
).isOkOr:
raiseAssert "Failed to subscribe fakeLightpushNode: " & error
var node: Waku
var node: LogosDelivery
lockNewGlobalBrokerContext:
node = (await createNode(createApiNodeConf(cli_args.WakuMode.Edge))).valueOr:
node = (await LogosDelivery.new(createApiNodeConf(cli_args.WakuMode.Edge))).valueOr:
raiseAssert error
node.mountMessagingClient().isOkOr:
raiseAssert "Failed to mount messaging: " & error
(await node.start()).isOkOr:
raiseAssert "Failed to start Waku node: " & error
await node.node.connectToNodes(@[fakeLightpushNodePeerInfo])
await node.waku.node.connectToNodes(@[fakeLightpushNodePeerInfo])
let eventManager = newSendEventListenerManager(node.brokerCtx)
let eventManager = newSendEventListenerManager(node.waku.brokerCtx)
defer:
await eventManager.teardown()
@ -436,7 +424,7 @@ suite "Waku API - Send":
ContentTopic("/waku/2/default-content/proto"), "test payload"
)
let requestId = (await node.send(envelope)).valueOr:
let requestId = (await node.messagingClient.send(envelope)).valueOr:
raiseAssert error
echo "Sent message with requestId=", requestId

View File

@ -64,7 +64,7 @@ proc waitForEvents(
type TestNetwork = ref object
publisher: WakuNode # Relay node that publishes messages in tests.
meshBuddy: WakuNode # Extra relay peer for publisher's mesh (Edge tests only).
subscriber: Waku
subscriber: LogosDelivery
# The receiver node in tests. Edge node in edge tests, Core node in relay tests.
publisherPeerInfo: RemotePeerInfo
@ -83,11 +83,10 @@ proc createApiNodeConf(
conf.rest = false
result = conf
proc setupSubscriberNode(conf: WakuNodeConf): Future[Waku] {.async.} =
var node: Waku
proc setupSubscriberNode(conf: WakuNodeConf): Future[LogosDelivery] {.async.} =
var node: LogosDelivery
lockNewGlobalBrokerContext:
node = (await createNode(conf)).expect("Failed to create subscriber node")
node.mountMessagingClient().expect("Failed to mount messaging")
node = (await LogosDelivery.new(conf)).expect("Failed to create subscriber node")
(await node.start()).expect("Failed to start subscriber node")
return node
@ -141,7 +140,7 @@ proc setupNetwork(
net.subscriber = await setupSubscriberNode(createApiNodeConf(mode, numShards))
await net.subscriber.node.connectToNodes(@[net.publisherPeerInfo])
await net.subscriber.waku.node.connectToNodes(@[net.publisherPeerInfo])
return net
@ -171,28 +170,30 @@ proc waitForMesh(node: WakuNode, shard: PubsubTopic) {.async.} =
await sleepAsync(100.milliseconds)
raise newException(ValueError, "GossipSub Mesh failed to stabilize on " & shard)
proc waitForEdgeSubs(w: Waku, shard: PubsubTopic) {.async.} =
proc waitForEdgeSubs(w: LogosDelivery, shard: PubsubTopic) {.async.} =
let deadline = Moment.now() + EdgeWaitTimeout
while Moment.now() < deadline:
if w.node.subscriptionManager.edgeFilterPeerCount(shard) > 0:
if w.waku.node.subscriptionManager.edgeFilterPeerCount(shard) > 0:
return
await sleepAsync(100.milliseconds)
raise newException(ValueError, "Edge filter subscription failed on " & shard)
proc edgePeersReached(w: Waku, shard: PubsubTopic, n: int): Future[bool] {.async.} =
proc edgePeersReached(
w: LogosDelivery, shard: PubsubTopic, n: int
): Future[bool] {.async.} =
let deadline = Moment.now() + EdgeWaitTimeout
while Moment.now() < deadline:
if w.node.subscriptionManager.edgeFilterPeerCount(shard) >= n:
if w.waku.node.subscriptionManager.edgeFilterPeerCount(shard) >= n:
return true
await sleepAsync(100.milliseconds)
return false
proc edgePeersDroppedBelow(
w: Waku, shard: PubsubTopic, n: int
w: LogosDelivery, shard: PubsubTopic, n: int
): Future[bool] {.async.} =
let deadline = Moment.now() + EdgeWaitTimeout
while Moment.now() < deadline:
if w.node.subscriptionManager.edgeFilterPeerCount(shard) < n:
if w.waku.node.subscriptionManager.edgeFilterPeerCount(shard) < n:
return true
await sleepAsync(100.milliseconds)
return false
@ -201,7 +202,7 @@ proc publishToMesh(
net: TestNetwork, contentTopic: ContentTopic, payload: seq[byte]
): Future[Result[int, string]] {.async.} =
# Publishes a message from "publisher" via relay into the gossipsub mesh.
let shard = net.subscriber.node.getRelayShard(contentTopic)
let shard = net.subscriber.waku.node.getRelayShard(contentTopic)
await waitForMesh(net.publisher, shard)
let msg = WakuMessage(
payload: payload, contentTopic: contentTopic, version: 0, timestamp: now()
@ -213,7 +214,7 @@ proc publishToMeshAfterEdgeReady(
): Future[Result[int, string]] {.async.} =
# First, ensure "subscriber" node (an edge node) is subscribed and ready to receive.
# Afterwards, "publisher" (relay node) sends the message in the gossipsub network.
let shard = net.subscriber.node.getRelayShard(contentTopic)
let shard = net.subscriber.waku.node.getRelayShard(contentTopic)
await waitForEdgeSubs(net.subscriber, shard)
return await net.publishToMesh(contentTopic, payload)
@ -224,11 +225,11 @@ suite "Messaging API, SubscriptionManager":
await net.teardown()
let testTopic = ContentTopic("/waku/2/test-content/proto")
(await net.subscriber.subscribe(testTopic)).expect(
(await net.subscriber.waku.subscribe(testTopic)).expect(
"subscriberNode failed to subscribe"
)
let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1)
let eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1)
defer:
await eventManager.teardown()
@ -247,9 +248,9 @@ suite "Messaging API, SubscriptionManager":
let subbedTopic = ContentTopic("/waku/2/subbed-topic/proto")
let ignoredTopic = ContentTopic("/waku/2/ignored-topic/proto")
(await net.subscriber.subscribe(subbedTopic)).expect("failed to subscribe")
(await net.subscriber.waku.subscribe(subbedTopic)).expect("failed to subscribe")
let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1)
let eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1)
defer:
await eventManager.teardown()
@ -267,10 +268,10 @@ suite "Messaging API, SubscriptionManager":
let testTopic = ContentTopic("/waku/2/unsub-test/proto")
(await net.subscriber.subscribe(testTopic)).expect("failed to subscribe")
net.subscriber.unsubscribe(testTopic).expect("failed to unsubscribe")
(await net.subscriber.waku.subscribe(testTopic)).expect("failed to subscribe")
net.subscriber.waku.unsubscribe(testTopic).expect("failed to unsubscribe")
let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1)
let eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1)
defer:
await eventManager.teardown()
@ -288,14 +289,14 @@ suite "Messaging API, SubscriptionManager":
let topicA = ContentTopic("/waku/2/topic-a/proto")
let topicB = ContentTopic("/waku/2/topic-b/proto")
(await net.subscriber.subscribe(topicA)).expect("failed to sub A")
(await net.subscriber.subscribe(topicB)).expect("failed to sub B")
(await net.subscriber.waku.subscribe(topicA)).expect("failed to sub A")
(await net.subscriber.waku.subscribe(topicB)).expect("failed to sub B")
let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1)
let eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1)
defer:
await eventManager.teardown()
net.subscriber.unsubscribe(topicA).expect("failed to unsub A")
net.subscriber.waku.unsubscribe(topicA).expect("failed to unsub A")
discard (await net.publishToMesh(topicA, "Dropped Message".toBytes())).expect(
"Publish A failed"
@ -314,11 +315,11 @@ suite "Messaging API, SubscriptionManager":
let glitchTopic = ContentTopic("/waku/2/glitch/proto")
(await net.subscriber.subscribe(glitchTopic)).expect("failed to sub")
(await net.subscriber.subscribe(glitchTopic)).expect("failed to double sub")
net.subscriber.unsubscribe(glitchTopic).expect("failed to unsub")
(await net.subscriber.waku.subscribe(glitchTopic)).expect("failed to sub")
(await net.subscriber.waku.subscribe(glitchTopic)).expect("failed to double sub")
net.subscriber.waku.unsubscribe(glitchTopic).expect("failed to unsub")
let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1)
let eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1)
defer:
await eventManager.teardown()
@ -337,9 +338,9 @@ suite "Messaging API, SubscriptionManager":
let testTopic = ContentTopic("/waku/2/resub-test/proto")
# Subscribe
(await net.subscriber.subscribe(testTopic)).expect("Initial sub failed")
(await net.subscriber.waku.subscribe(testTopic)).expect("Initial sub failed")
var eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1)
var eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1)
discard
(await net.publishToMesh(testTopic, "Msg 1".toBytes())).expect("Pub 1 failed")
@ -347,8 +348,8 @@ suite "Messaging API, SubscriptionManager":
await eventManager.teardown()
# Unsubscribe and verify teardown
net.subscriber.unsubscribe(testTopic).expect("Unsub failed")
eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1)
net.subscriber.waku.unsubscribe(testTopic).expect("Unsub failed")
eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1)
discard
(await net.publishToMesh(testTopic, "Ghost".toBytes())).expect("Ghost pub failed")
@ -357,8 +358,8 @@ suite "Messaging API, SubscriptionManager":
await eventManager.teardown()
# Resubscribe
(await net.subscriber.subscribe(testTopic)).expect("Resub failed")
eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1)
(await net.subscriber.waku.subscribe(testTopic)).expect("Resub failed")
eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1)
discard
(await net.publishToMesh(testTopic, "Msg 2".toBytes())).expect("Pub 2 failed")
@ -376,15 +377,15 @@ suite "Messaging API, SubscriptionManager":
# generate two content topics that land in two different shards
var i = 0
while net.subscriber.node.getRelayShard(topicA) ==
net.subscriber.node.getRelayShard(topicB):
while net.subscriber.waku.node.getRelayShard(topicA) ==
net.subscriber.waku.node.getRelayShard(topicB):
topicB = ContentTopic("/appB" & $i & "/2/shard-test-b/proto")
inc i
(await net.subscriber.subscribe(topicA)).expect("failed to sub A")
(await net.subscriber.subscribe(topicB)).expect("failed to sub B")
(await net.subscriber.waku.subscribe(topicA)).expect("failed to sub A")
(await net.subscriber.waku.subscribe(topicB)).expect("failed to sub B")
let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 2)
let eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 2)
defer:
await eventManager.teardown()
@ -411,7 +412,7 @@ suite "Messaging API, SubscriptionManager":
proc verifyNetworkState(expected: seq[ContentTopic]) {.async.} =
let eventManager =
newReceiveEventListenerManager(net.subscriber.brokerCtx, expected.len)
newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, expected.len)
for topic in allTopics:
discard (await net.publishToMesh(topic, "Stress Payload".toBytes())).expect(
@ -439,7 +440,7 @@ suite "Messaging API, SubscriptionManager":
# subscribe to all content topics we generated
for t in allTopics:
(await net.subscriber.subscribe(t)).expect("sub failed")
(await net.subscriber.waku.subscribe(t)).expect("sub failed")
activeSubs.add(t)
await verifyNetworkState(activeSubs)
@ -447,7 +448,7 @@ suite "Messaging API, SubscriptionManager":
# unsubscribe from some content topics
for i in 0 ..< 50:
let t = allTopics[i]
net.subscriber.unsubscribe(t).expect("unsub failed")
net.subscriber.waku.unsubscribe(t).expect("unsub failed")
let idx = activeSubs.find(t)
if idx >= 0:
@ -458,7 +459,7 @@ suite "Messaging API, SubscriptionManager":
# re-subscribe to some content topics
for i in 0 ..< 25:
let t = allTopics[i]
(await net.subscriber.subscribe(t)).expect("resub failed")
(await net.subscriber.waku.subscribe(t)).expect("resub failed")
activeSubs.add(t)
await verifyNetworkState(activeSubs)
@ -469,9 +470,9 @@ suite "Messaging API, SubscriptionManager":
await net.teardown()
let testTopic = ContentTopic("/waku/2/test-content/proto")
(await net.subscriber.subscribe(testTopic)).expect("failed to subscribe")
(await net.subscriber.waku.subscribe(testTopic)).expect("failed to subscribe")
let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1)
let eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1)
defer:
await eventManager.teardown()
@ -490,9 +491,9 @@ suite "Messaging API, SubscriptionManager":
let subbedTopic = ContentTopic("/waku/2/subbed-topic/proto")
let ignoredTopic = ContentTopic("/waku/2/ignored-topic/proto")
(await net.subscriber.subscribe(subbedTopic)).expect("failed to subscribe")
(await net.subscriber.waku.subscribe(subbedTopic)).expect("failed to subscribe")
let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1)
let eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1)
defer:
await eventManager.teardown()
@ -510,10 +511,10 @@ suite "Messaging API, SubscriptionManager":
let testTopic = ContentTopic("/waku/2/unsub-test/proto")
(await net.subscriber.subscribe(testTopic)).expect("failed to subscribe")
net.subscriber.unsubscribe(testTopic).expect("failed to unsubscribe")
(await net.subscriber.waku.subscribe(testTopic)).expect("failed to subscribe")
net.subscriber.waku.unsubscribe(testTopic).expect("failed to unsubscribe")
let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1)
let eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1)
defer:
await eventManager.teardown()
@ -531,17 +532,17 @@ suite "Messaging API, SubscriptionManager":
let topicA = ContentTopic("/waku/2/topic-a/proto")
let topicB = ContentTopic("/waku/2/topic-b/proto")
(await net.subscriber.subscribe(topicA)).expect("failed to sub A")
(await net.subscriber.subscribe(topicB)).expect("failed to sub B")
(await net.subscriber.waku.subscribe(topicA)).expect("failed to sub A")
(await net.subscriber.waku.subscribe(topicB)).expect("failed to sub B")
let shard = net.subscriber.node.getRelayShard(topicA)
let shard = net.subscriber.waku.node.getRelayShard(topicA)
await waitForEdgeSubs(net.subscriber, shard)
let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1)
let eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1)
defer:
await eventManager.teardown()
net.subscriber.unsubscribe(topicA).expect("failed to unsub A")
net.subscriber.waku.unsubscribe(topicA).expect("failed to unsub A")
discard (await net.publishToMesh(topicA, "Dropped Message".toBytes())).expect(
"Publish A failed"
@ -560,9 +561,9 @@ suite "Messaging API, SubscriptionManager":
let testTopic = ContentTopic("/waku/2/resub-test/proto")
(await net.subscriber.subscribe(testTopic)).expect("Initial sub failed")
(await net.subscriber.waku.subscribe(testTopic)).expect("Initial sub failed")
var eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1)
var eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1)
discard (await net.publishToMeshAfterEdgeReady(testTopic, "Msg 1".toBytes())).expect(
"Pub 1 failed"
)
@ -570,8 +571,8 @@ suite "Messaging API, SubscriptionManager":
require await eventManager.waitForEvents(TestTimeout)
await eventManager.teardown()
net.subscriber.unsubscribe(testTopic).expect("Unsub failed")
eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1)
net.subscriber.waku.unsubscribe(testTopic).expect("Unsub failed")
eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1)
discard
(await net.publishToMesh(testTopic, "Ghost".toBytes())).expect("Ghost pub failed")
@ -579,8 +580,8 @@ suite "Messaging API, SubscriptionManager":
check not await eventManager.waitForEvents(NegativeTestTimeout)
await eventManager.teardown()
(await net.subscriber.subscribe(testTopic)).expect("Resub failed")
eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1)
(await net.subscriber.waku.subscribe(testTopic)).expect("Resub failed")
eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1)
discard (await net.publishToMeshAfterEdgeReady(testTopic, "Msg 2".toBytes())).expect(
"Pub 2 failed"
@ -640,19 +641,19 @@ suite "Messaging API, SubscriptionManager":
await meshBuddy.connectToNodes(@[publisherPeerInfo])
let conf = createApiNodeConf(cli_args.WakuMode.Edge, numShards)
var subscriber: Waku
var subscriber: LogosDelivery
lockNewGlobalBrokerContext:
subscriber = (await createNode(conf)).expect("Failed to create edge subscriber")
subscriber.mountMessagingClient().expect("Failed to mount messaging")
subscriber =
(await LogosDelivery.new(conf)).expect("Failed to create edge subscriber")
(await subscriber.start()).expect("Failed to start edge subscriber")
# Connect edge subscriber to both filter servers so selectPeers finds both
await subscriber.node.connectToNodes(@[publisherPeerInfo, meshBuddyPeerInfo])
await subscriber.waku.node.connectToNodes(@[publisherPeerInfo, meshBuddyPeerInfo])
let testTopic = ContentTopic("/waku/2/failover-test/proto")
let shard = subscriber.node.getRelayShard(testTopic)
let shard = subscriber.waku.node.getRelayShard(testTopic)
(await subscriber.subscribe(testTopic)).expect("Failed to subscribe")
(await subscriber.waku.subscribe(testTopic)).expect("Failed to subscribe")
# Wait for dialing both filter servers (HealthyThreshold = 2)
check await edgePeersReached(subscriber, shard, 2)
@ -660,7 +661,7 @@ suite "Messaging API, SubscriptionManager":
# Verify message delivery with both servers alive
await waitForMesh(publisher, shard)
var eventManager = newReceiveEventListenerManager(subscriber.brokerCtx, 1)
var eventManager = newReceiveEventListenerManager(subscriber.waku.brokerCtx, 1)
let msg1 = WakuMessage(
payload: "Before failover".toBytes(),
contentTopic: testTopic,
@ -674,14 +675,14 @@ suite "Messaging API, SubscriptionManager":
await eventManager.teardown()
# Disconnect meshBuddy from edge (keeps relay mesh alive for publishing)
await subscriber.node.disconnectNode(meshBuddyPeerInfo)
await subscriber.waku.node.disconnectNode(meshBuddyPeerInfo)
# Wait for the dead peer to be pruned
check await edgePeersDroppedBelow(subscriber, shard, 2)
check subscriber.node.subscriptionManager.edgeFilterPeerCount(shard) >= 1
check subscriber.waku.node.subscriptionManager.edgeFilterPeerCount(shard) >= 1
# Verify messages still arrive through the surviving filter server (publisher)
eventManager = newReceiveEventListenerManager(subscriber.brokerCtx, 1)
eventManager = newReceiveEventListenerManager(subscriber.waku.brokerCtx, 1)
let msg2 = WakuMessage(
payload: "After failover".toBytes(),
contentTopic: testTopic,
@ -769,33 +770,33 @@ suite "Messaging API, SubscriptionManager":
await sparePeer.connectToNodes(@[publisherPeerInfo])
let conf = createApiNodeConf(cli_args.WakuMode.Edge, numShards)
var subscriber: Waku
var subscriber: LogosDelivery
lockNewGlobalBrokerContext:
subscriber = (await createNode(conf)).expect("Failed to create edge subscriber")
subscriber.mountMessagingClient().expect("Failed to mount messaging")
subscriber =
(await LogosDelivery.new(conf)).expect("Failed to create edge subscriber")
(await subscriber.start()).expect("Failed to start edge subscriber")
await subscriber.node.connectToNodes(
await subscriber.waku.node.connectToNodes(
@[publisherPeerInfo, meshBuddyPeerInfo, sparePeerInfo]
)
let testTopic = ContentTopic("/waku/2/replacement-test/proto")
let shard = subscriber.node.getRelayShard(testTopic)
let shard = subscriber.waku.node.getRelayShard(testTopic)
(await subscriber.subscribe(testTopic)).expect("Failed to subscribe")
(await subscriber.waku.subscribe(testTopic)).expect("Failed to subscribe")
# Wait for 2 confirmed peers (HealthyThreshold). The 3rd is available but not dialed.
check await edgePeersReached(subscriber, shard, 2)
require subscriber.node.subscriptionManager.edgeFilterPeerCount(shard) == 2
require subscriber.waku.node.subscriptionManager.edgeFilterPeerCount(shard) == 2
await subscriber.node.disconnectNode(meshBuddyPeerInfo)
await subscriber.waku.node.disconnectNode(meshBuddyPeerInfo)
# Wait for the sub loop to detect the loss and dial a replacement
check await edgePeersReached(subscriber, shard, 2)
await waitForMesh(publisher, shard)
var eventManager = newReceiveEventListenerManager(subscriber.brokerCtx, 1)
var eventManager = newReceiveEventListenerManager(subscriber.waku.brokerCtx, 1)
let msg = WakuMessage(
payload: "After replacement".toBytes(),
contentTopic: testTopic,

View File

@ -53,14 +53,12 @@ suite "Reliable Channel - ingress":
contentTopic = ContentTopic("/reliable-channel/test/proto")
let appPayload = "hello reliable channel".toBytes()
var waku: Waku
var waku: LogosDelivery
var manager: ReliableChannelManager
var brokerCtx: BrokerContext
lockNewGlobalBrokerContext:
brokerCtx = globalBrokerContext()
waku = (await createNode(createApiNodeConf())).expect("createNode")
waku.mountMessagingClient().expect("mountMessagingClient")
waku.mountReliableChannelManager().expect("mountReliableChannelManager")
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
manager = waku.reliableChannelManager
## Noop encryption providers so the Encrypt/Decrypt brokers have
@ -121,14 +119,12 @@ suite "Reliable Channel - ingress":
contentTopic = ContentTopic("/reliable-channel/test/proto")
let appPayload = "foreign payload".toBytes()
var waku: Waku
var waku: LogosDelivery
var manager: ReliableChannelManager
var brokerCtx: BrokerContext
lockNewGlobalBrokerContext:
brokerCtx = globalBrokerContext()
waku = (await createNode(createApiNodeConf())).expect("createNode")
waku.mountMessagingClient().expect("mountMessagingClient")
waku.mountReliableChannelManager().expect("mountReliableChannelManager")
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
manager = waku.reliableChannelManager
setNoopEncryption()
@ -180,14 +176,12 @@ suite "Reliable Channel - send state machine":
contentTopic = ContentTopic("/reliable-channel/test/sm-success")
fakeMsgReqId = RequestId("fake-msg-req-1")
var waku: Waku
var waku: LogosDelivery
var manager: ReliableChannelManager
var brokerCtx: BrokerContext
lockNewGlobalBrokerContext:
brokerCtx = globalBrokerContext()
waku = (await createNode(createApiNodeConf())).expect("createNode")
waku.mountMessagingClient().expect("mountMessagingClient")
waku.mountReliableChannelManager().expect("mountReliableChannelManager")
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
manager = waku.reliableChannelManager
setNoopEncryption()
@ -247,14 +241,12 @@ suite "Reliable Channel - send state machine":
channelId = ChannelId("sm-multi-channel")
contentTopic = ContentTopic("/reliable-channel/test/sm-multi")
var waku: Waku
var waku: LogosDelivery
var manager: ReliableChannelManager
var brokerCtx: BrokerContext
lockNewGlobalBrokerContext:
brokerCtx = globalBrokerContext()
waku = (await createNode(createApiNodeConf())).expect("createNode")
waku.mountMessagingClient().expect("mountMessagingClient")
waku.mountReliableChannelManager().expect("mountReliableChannelManager")
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
manager = waku.reliableChannelManager
setNoopEncryption()
@ -350,14 +342,12 @@ suite "Reliable Channel - send state machine":
channelId = ChannelId("sm-race-channel")
contentTopic = ContentTopic("/reliable-channel/test/sm-race")
var waku: Waku
var waku: LogosDelivery
var manager: ReliableChannelManager
var brokerCtx: BrokerContext
lockNewGlobalBrokerContext:
brokerCtx = globalBrokerContext()
waku = (await createNode(createApiNodeConf())).expect("createNode")
waku.mountMessagingClient().expect("mountMessagingClient")
waku.mountReliableChannelManager().expect("mountReliableChannelManager")
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
manager = waku.reliableChannelManager
setNoopEncryption()
@ -459,12 +449,10 @@ suite "Reliable Channel - SDS persistence":
Persistency.reset()
removeDir(root)
var waku: Waku
var waku: LogosDelivery
var manager: ReliableChannelManager
lockNewGlobalBrokerContext:
waku = (await createNode(createApiNodeConf())).expect("createNode")
waku.mountMessagingClient().expect("mountMessagingClient")
waku.mountReliableChannelManager().expect("mountReliableChannelManager")
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
manager = waku.reliableChannelManager
setNoopEncryption()
@ -529,14 +517,12 @@ suite "Reliable Channel - SDS lifecycle":
let payload1 = "first message".toBytes()
let payload2 = "second message".toBytes()
var waku: Waku
var waku: LogosDelivery
var manager: ReliableChannelManager
var brokerCtx: BrokerContext
lockNewGlobalBrokerContext:
brokerCtx = globalBrokerContext()
waku = (await createNode(createApiNodeConf())).expect("createNode")
waku.mountMessagingClient().expect("mountMessagingClient")
waku.mountReliableChannelManager().expect("mountReliableChannelManager")
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
manager = waku.reliableChannelManager
setNoopEncryption()
@ -602,14 +588,12 @@ suite "Reliable Channel - SDS lifecycle":
contentTopic = ContentTopic("/reliable-channel/test/dup")
let appPayload = "deliver once".toBytes()
var waku: Waku
var waku: LogosDelivery
var manager: ReliableChannelManager
var brokerCtx: BrokerContext
lockNewGlobalBrokerContext:
brokerCtx = globalBrokerContext()
waku = (await createNode(createApiNodeConf())).expect("createNode")
waku.mountMessagingClient().expect("mountMessagingClient")
waku.mountReliableChannelManager().expect("mountReliableChannelManager")
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
manager = waku.reliableChannelManager
setNoopEncryption()
@ -661,14 +645,12 @@ suite "Reliable Channel - SDS lifecycle":
channelId = ChannelId("sds-foreign-channel")
contentTopic = ContentTopic("/reliable-channel/test/foreign")
var waku: Waku
var waku: LogosDelivery
var manager: ReliableChannelManager
var brokerCtx: BrokerContext
lockNewGlobalBrokerContext:
brokerCtx = globalBrokerContext()
waku = (await createNode(createApiNodeConf())).expect("createNode")
waku.mountMessagingClient().expect("mountMessagingClient")
waku.mountReliableChannelManager().expect("mountReliableChannelManager")
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
manager = waku.reliableChannelManager
setNoopEncryption()
@ -723,14 +705,12 @@ suite "Reliable Channel - SDS lifecycle":
Persistency.reset()
removeDir(root)
var waku: Waku
var waku: LogosDelivery
var manager: ReliableChannelManager
var brokerCtx: BrokerContext
lockNewGlobalBrokerContext:
brokerCtx = globalBrokerContext()
waku = (await createNode(createApiNodeConf())).expect("createNode")
waku.mountMessagingClient().expect("mountMessagingClient")
waku.mountReliableChannelManager().expect("mountReliableChannelManager")
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
manager = waku.reliableChannelManager
setNoopEncryption()
@ -807,14 +787,12 @@ suite "Reliable Channel - SDS protocol semantics":
channelId = ChannelId("sds-semantics-channel")
contentTopic = ContentTopic("/reliable-channel/test/semantics")
var waku: Waku
var waku: LogosDelivery
var manager: ReliableChannelManager
var brokerCtx: BrokerContext
lockNewGlobalBrokerContext:
brokerCtx = globalBrokerContext()
waku = (await createNode(createApiNodeConf())).expect("createNode")
waku.mountMessagingClient().expect("mountMessagingClient")
waku.mountReliableChannelManager().expect("mountReliableChannelManager")
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
manager = waku.reliableChannelManager
setNoopEncryption()
@ -877,14 +855,12 @@ suite "Reliable Channel - SDS protocol semantics":
Persistency.reset()
removeDir(root)
var waku: Waku
var waku: LogosDelivery
var manager: ReliableChannelManager
var brokerCtx: BrokerContext
lockNewGlobalBrokerContext:
brokerCtx = globalBrokerContext()
waku = (await createNode(createApiNodeConf())).expect("createNode")
waku.mountMessagingClient().expect("mountMessagingClient")
waku.mountReliableChannelManager().expect("mountReliableChannelManager")
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
manager = waku.reliableChannelManager
setNoopEncryption()
@ -967,14 +943,12 @@ suite "Reliable Channel - SDS protocol semantics":
let payloads =
@["chain first".toBytes(), "chain second".toBytes(), "chain third".toBytes()]
var waku: Waku
var waku: LogosDelivery
var manager: ReliableChannelManager
var brokerCtx: BrokerContext
lockNewGlobalBrokerContext:
brokerCtx = globalBrokerContext()
waku = (await createNode(createApiNodeConf())).expect("createNode")
waku.mountMessagingClient().expect("mountMessagingClient")
waku.mountReliableChannelManager().expect("mountReliableChannelManager")
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
manager = waku.reliableChannelManager
setNoopEncryption()
@ -1044,14 +1018,12 @@ suite "Reliable Channel - SDS protocol semantics":
contentTopic = ContentTopic("/reliable-channel/test/sync")
let appPayload = "real message".toBytes()
var waku: Waku
var waku: LogosDelivery
var manager: ReliableChannelManager
var brokerCtx: BrokerContext
lockNewGlobalBrokerContext:
brokerCtx = globalBrokerContext()
waku = (await createNode(createApiNodeConf())).expect("createNode")
waku.mountMessagingClient().expect("mountMessagingClient")
waku.mountReliableChannelManager().expect("mountReliableChannelManager")
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
manager = waku.reliableChannelManager
setNoopEncryption()
@ -1119,14 +1091,12 @@ suite "Reliable Channel - SDS protocol semantics":
contentTopic = ContentTopic("/reliable-channel/test/unique-id")
let appPayload = "ok".toBytes()
var waku: Waku
var waku: LogosDelivery
var manager: ReliableChannelManager
var brokerCtx: BrokerContext
lockNewGlobalBrokerContext:
brokerCtx = globalBrokerContext()
waku = (await createNode(createApiNodeConf())).expect("createNode")
waku.mountMessagingClient().expect("mountMessagingClient")
waku.mountReliableChannelManager().expect("mountReliableChannelManager")
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
manager = waku.reliableChannelManager
setNoopEncryption()
@ -1189,12 +1159,10 @@ suite "Reliable Channel - SDS protocol semantics":
(await waku.stop()).expect("stop")
asyncTest "manager rejects operations on unknown channels":
var waku: Waku
var waku: LogosDelivery
var manager: ReliableChannelManager
lockNewGlobalBrokerContext:
waku = (await createNode(createApiNodeConf())).expect("createNode")
waku.mountMessagingClient().expect("mountMessagingClient")
waku.mountReliableChannelManager().expect("mountReliableChannelManager")
waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode")
manager = waku.reliableChannelManager
check (await manager.send(ChannelId("no-such-channel"), "x".toBytes())).isErr()

View File

@ -228,8 +228,9 @@ suite "Health Monitor - events":
nodeA.mountMetadata(1, @[0'u16]).expect("Node A failed to mount metadata")
await nodeA.start()
let ds =
MessagingClient.new(false, nodeA).expect("Failed to create MessagingClient")
let ds = MessagingClient
.new(MessagingClientConf(useP2PReliability: false), nodeA)
.expect("Failed to create MessagingClient")
ds.start().expect("Failed to start MessagingClient")
let monitorA = NodeHealthMonitor.new(nodeA)
@ -332,8 +333,9 @@ suite "Health Monitor - events":
nodeA.mountMetadata(1, @[0'u16]).expect("Node A failed to mount metadata")
await nodeA.start()
let ds =
MessagingClient.new(false, nodeA).expect("Failed to create MessagingClient")
let ds = MessagingClient
.new(MessagingClientConf(useP2PReliability: false), nodeA)
.expect("Failed to create MessagingClient")
ds.start().expect("Failed to start MessagingClient")
let subMgr = nodeA.subscriptionManager