diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 278f37a05..a407e37f4 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -101,7 +101,14 @@ jobs: touch nimbledeps/.nimble-setup - name: Build binaries - run: make V=1 all + # -j1: `all` builds wakunode2 and liblogosdelivery, each via `nimble + # `. Under -j2 the two nimble invocations re-resolve git deps + # concurrently and clobber each other in the shared ~/.nimble/pkgcache + # (e.g. "destination already exists", "could not lock config file"), + # failing the build. Serializing the targets removes the race; Nim's + # own --parallelBuild still parallelizes compilation. Mirrors the test + # job, which already forces -j1. + run: make V=1 -j1 all build-windows: needs: changes diff --git a/examples/api_example/api_example.nim b/examples/api_example/api_example.nim index 9e96858db..8c8759191 100644 --- a/examples/api_example/api_example.nim +++ b/examples/api_example/api_example.nim @@ -7,7 +7,7 @@ type CliArgs = object defaultValue: "", desc: "ETH RPC Endpoint, if passed, RLN is enabled" .}: string -proc periodicSender(w: Waku): Future[void] {.async.} = +proc periodicSender(logos: LogosDelivery): Future[void] {.async.} = let sentListener = MessageSentEvent.listen( proc(event: MessageSentEvent) {.async: (raises: []).} = echo "Message sent with request ID: ", @@ -45,7 +45,7 @@ proc periodicSender(w: Waku): Future[void] {.async.} = payload = "Hello Waku! Message number: " & $counter, ) - let sendRequestId = (await w.send(envelope)).valueOr: + let sendRequestId = (await logos.messagingClient.send(envelope)).valueOr: echo "Failed to send message: ", error quit(QuitFailure) @@ -75,16 +75,12 @@ when isMainModule: conf.preset = "twn" conf.ethClientUrls = @[EthRpcUrl(args.ethRpcEndpoint)] - # Create the node using the library API's createNode function - let node = (waitFor createNode(conf)).valueOr: + # Create the full Logos Messaging stack (Waku + messaging + channels) + let node = (waitFor LogosDelivery.new(conf)).valueOr: echo "Failed to create node: ", error quit(QuitFailure) - echo("Waku node created successfully!") - - node.mountMessagingClient().isOkOr: - echo "Failed to mount messaging: ", error - quit(QuitFailure) + echo("Logos Messaging node created successfully!") # Start the node (waitFor node.start()).isOkOr: diff --git a/library/declare_lib.nim b/library/declare_lib.nim index 77e992eea..eaf8f6315 100644 --- a/library/declare_lib.nim +++ b/library/declare_lib.nim @@ -1,6 +1,6 @@ import ffi import std/locks -import logos_delivery/waku/factory/waku +import logos_delivery declareLibrary("logosdelivery") @@ -8,7 +8,7 @@ var eventCallbackLock: Lock initLock(eventCallbackLock) template requireInitializedNode*( - ctx: ptr FFIContext[Waku], opName: string, onError: untyped + ctx: ptr FFIContext[LogosDelivery], opName: string, onError: untyped ) = if isNil(ctx): let errMsg {.inject.} = opName & " failed: invalid context" @@ -18,7 +18,7 @@ template requireInitializedNode*( onError proc logosdelivery_set_event_callback( - ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer + ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer ) {.dynlib, exportc, cdecl.} = if isNil(ctx): echo "error: invalid context in logosdelivery_set_event_callback" diff --git a/library/kernel_api/debug_node_api.nim b/library/kernel_api/debug_node_api.nim index bbce84939..b1229d660 100644 --- a/library/kernel_api/debug_node_api.nim +++ b/library/kernel_api/debug_node_api.nim @@ -22,32 +22,32 @@ proc getMetrics(): string = return defaultRegistry.toText() ## defaultRegistry is {.global.} in metrics module proc waku_version( - ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer + ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer ) {.ffi.} = return ok(WakuNodeVersionString) proc waku_listen_addresses( - ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer + ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer ) {.ffi.} = ## returns a comma-separated string of the listen addresses - return ok(ctx.myLib[].node.getMultiaddresses().join(",")) + return ok(ctx.myLib[].waku.node.getMultiaddresses().join(",")) proc waku_get_my_enr( - ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer + ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer ) {.ffi.} = - return ok(ctx.myLib[].node.enr.toURI()) + return ok(ctx.myLib[].waku.node.enr.toURI()) proc waku_get_my_peerid( - ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer + ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer ) {.ffi.} = - return ok($ctx.myLib[].node.peerId()) + return ok($ctx.myLib[].waku.node.peerId()) proc waku_get_metrics( - ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer + ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer ) {.ffi.} = return ok(getMetrics()) proc waku_is_online( - ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer + ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer ) {.ffi.} = - return ok($ctx.myLib[].healthMonitor.onlineMonitor.amIOnline()) + return ok($ctx.myLib[].waku.healthMonitor.onlineMonitor.amIOnline()) diff --git a/library/kernel_api/discovery_api.nim b/library/kernel_api/discovery_api.nim index 9dec4575a..7ae7d1dea 100644 --- a/library/kernel_api/discovery_api.nim +++ b/library/kernel_api/discovery_api.nim @@ -39,7 +39,7 @@ proc performPeerExchangeRequestTo*( return ok(numPeersRecv) proc waku_discv5_update_bootnodes( - ctx: ptr FFIContext[Waku], + ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer, bootnodes: cstring, @@ -47,14 +47,14 @@ proc waku_discv5_update_bootnodes( ## Updates the bootnode list used for discovering new peers via DiscoveryV5 ## bootnodes - JSON array containing the bootnode ENRs i.e. `["enr:...", "enr:..."]` - updateDiscv5BootstrapNodes($bootnodes, ctx.myLib[]).isOkOr: + updateDiscv5BootstrapNodes($bootnodes, ctx.myLib[].waku).isOkOr: error "UPDATE_DISCV5_BOOTSTRAP_NODES failed", error = error return err($error) return ok("discovery request processed correctly") proc waku_dns_discovery( - ctx: ptr FFIContext[Waku], + ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer, enrTreeUrl: cstring, @@ -69,27 +69,27 @@ proc waku_dns_discovery( return ok(nodes.join(",")) proc waku_start_discv5( - ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer + ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer ) {.ffi.} = - (await ctx.myLib[].wakuDiscv5.start()).isOkOr: + (await ctx.myLib[].waku.wakuDiscv5.start()).isOkOr: error "START_DISCV5 failed", error = error return err("error starting discv5: " & $error) return ok("discv5 started correctly") proc waku_stop_discv5( - ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer + ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer ) {.ffi.} = - await ctx.myLib[].wakuDiscv5.stop() + await ctx.myLib[].waku.wakuDiscv5.stop() return ok("discv5 stopped correctly") proc waku_peer_exchange_request( - ctx: ptr FFIContext[Waku], + ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer, numPeers: uint64, ) {.ffi.} = - let numValidPeers = (await performPeerExchangeRequestTo(numPeers, ctx.myLib[])).valueOr: + let numValidPeers = (await performPeerExchangeRequestTo(numPeers, ctx.myLib[].waku)).valueOr: error "waku_peer_exchange_request failed", error = error return err("failed peer exchange: " & $error) diff --git a/library/kernel_api/node_lifecycle_api.nim b/library/kernel_api/node_lifecycle_api.nim index 8ee735f52..279480031 100644 --- a/library/kernel_api/node_lifecycle_api.nim +++ b/library/kernel_api/node_lifecycle_api.nim @@ -13,7 +13,7 @@ import proc createWaku( configJson: cstring, appCallbacks: AppCallbacks = nil -): Future[Result[Waku, string]] {.async.} = +): Future[Result[LogosDelivery, string]] {.async.} = var conf = defaultWakuNodeConf().valueOr: return err("Failed creating node: " & error) @@ -47,19 +47,15 @@ proc createWaku( appCallbacks.relayHandler = nil appCallbacks.topicHealthChangeHandler = nil - # TODO: Convert `confJson` directly to `WakuConf` - var wakuConf = conf.toWakuConf().valueOr: - return err("Configuration error: " & $error) + conf.rest = false ## libwaku never runs the REST server - wakuConf.restServerConf = none(RestServerConf) ## don't want REST in libwaku + let logosRes = (await LogosDelivery.new(conf, appCallbacks)).valueOr: + error "LogosDelivery initialization failed", error = error + return err("Failed setting up LogosDelivery: " & $error) - let wakuRes = (await Waku.new(wakuConf, appCallbacks)).valueOr: - error "waku initialization failed", error = error - return err("Failed setting up Waku: " & $error) + return ok(logosRes) - return ok(wakuRes) - -registerReqFFI(CreateNodeWithCallbacksRequest, ctx: ptr FFIContext[Waku]): +registerReqFFI(CreateNodeWithCallbacksRequest, ctx: ptr FFIContext[LogosDelivery]): proc( configJson: cstring, appCallbacks: AppCallbacks ): Future[Result[string, string]] {.async.} = @@ -70,7 +66,7 @@ registerReqFFI(CreateNodeWithCallbacksRequest, ctx: ptr FFIContext[Waku]): return ok("") proc waku_start( - ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer + ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer ) {.ffi.} = (await ctx.myLib[].start()).isOkOr: error "START_NODE failed", error = error @@ -78,7 +74,7 @@ proc waku_start( return ok("") proc waku_stop( - ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer + ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer ) {.ffi.} = (await ctx.myLib[].stop()).isOkOr: error "STOP_NODE failed", error = error diff --git a/library/kernel_api/peer_manager_api.nim b/library/kernel_api/peer_manager_api.nim index 775e58cae..38c15fc3d 100644 --- a/library/kernel_api/peer_manager_api.nim +++ b/library/kernel_api/peer_manager_api.nim @@ -12,41 +12,46 @@ type PeerInfo = object addresses: seq[string] proc waku_get_peerids_from_peerstore( - ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer + ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer ) {.ffi.} = ## returns a comma-separated string of peerIDs - let peerIDs = - ctx.myLib[].node.peerManager.switch.peerStore.peers().mapIt($it.peerId).join(",") + let peerIDs = ctx.myLib[].waku.node.peerManager.switch.peerStore + .peers() + .mapIt($it.peerId) + .join(",") return ok(peerIDs) proc waku_connect( - ctx: ptr FFIContext[Waku], + ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer, peerMultiAddr: cstring, timeoutMs: cuint, ) {.ffi.} = let peers = ($peerMultiAddr).split(",").mapIt(strip(it)) - await ctx.myLib[].node.connectToNodes(peers, source = "static") + await ctx.myLib[].waku.node.connectToNodes(peers, source = "static") return ok("") proc waku_disconnect_peer_by_id( - ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer, peerId: cstring + ctx: ptr FFIContext[LogosDelivery], + callback: FFICallBack, + userData: pointer, + peerId: cstring, ) {.ffi.} = let pId = PeerId.init($peerId).valueOr: error "DISCONNECT_PEER_BY_ID failed", error = $error return err($error) - await ctx.myLib[].node.peerManager.disconnectNode(pId) + await ctx.myLib[].waku.node.peerManager.disconnectNode(pId) return ok("") proc waku_disconnect_all_peers( - ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer + ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer ) {.ffi.} = - await ctx.myLib[].node.peerManager.disconnectAllPeers() + await ctx.myLib[].waku.node.peerManager.disconnectAllPeers() return ok("") proc waku_dial_peer( - ctx: ptr FFIContext[Waku], + ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer, peerMultiAddr: cstring, @@ -56,7 +61,7 @@ proc waku_dial_peer( let remotePeerInfo = parsePeerInfo($peerMultiAddr).valueOr: error "DIAL_PEER failed", error = $error return err($error) - let conn = await ctx.myLib[].node.peerManager.dialPeer(remotePeerInfo, $protocol) + let conn = await ctx.myLib[].waku.node.peerManager.dialPeer(remotePeerInfo, $protocol) if conn.isNone(): let msg = "failed dialing peer" error "DIAL_PEER failed", error = msg, peerId = $remotePeerInfo.peerId @@ -64,7 +69,7 @@ proc waku_dial_peer( return ok("") proc waku_dial_peer_by_id( - ctx: ptr FFIContext[Waku], + ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer, peerId: cstring, @@ -74,7 +79,7 @@ proc waku_dial_peer_by_id( let pId = PeerId.init($peerId).valueOr: error "DIAL_PEER_BY_ID failed", error = $error return err($error) - let conn = await ctx.myLib[].node.peerManager.dialPeer(pId, $protocol) + let conn = await ctx.myLib[].waku.node.peerManager.dialPeer(pId, $protocol) if conn.isNone(): let msg = "failed dialing peer" error "DIAL_PEER_BY_ID failed", error = msg, peerId = $peerId @@ -83,12 +88,12 @@ proc waku_dial_peer_by_id( return ok("") proc waku_get_connected_peers_info( - ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer + ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer ) {.ffi.} = ## returns a JSON string mapping peerIDs to objects with protocols and addresses var peersMap = initTable[string, PeerInfo]() - let peers = ctx.myLib[].node.peerManager.switch.peerStore.peers().filterIt( + let peers = ctx.myLib[].waku.node.peerManager.switch.peerStore.peers().filterIt( it.connectedness == Connected ) @@ -104,23 +109,23 @@ proc waku_get_connected_peers_info( return ok(jsonStr) proc waku_get_connected_peers( - ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer + ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer ) {.ffi.} = ## returns a comma-separated string of peerIDs let - (inPeerIds, outPeerIds) = ctx.myLib[].node.peerManager.connectedPeers() + (inPeerIds, outPeerIds) = ctx.myLib[].waku.node.peerManager.connectedPeers() connectedPeerids = concat(inPeerIds, outPeerIds) return ok(connectedPeerids.mapIt($it).join(",")) proc waku_get_peerids_by_protocol( - ctx: ptr FFIContext[Waku], + ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer, protocol: cstring, ) {.ffi.} = ## returns a comma-separated string of peerIDs that mount the given protocol - let connectedPeers = ctx.myLib[].node.peerManager.switch.peerStore + let connectedPeers = ctx.myLib[].waku.node.peerManager.switch.peerStore .peers($protocol) .filterIt(it.connectedness == Connected) .mapIt($it.peerId) diff --git a/library/kernel_api/ping_api.nim b/library/kernel_api/ping_api.nim index da830bc5d..8313004d3 100644 --- a/library/kernel_api/ping_api.nim +++ b/library/kernel_api/ping_api.nim @@ -6,7 +6,7 @@ import library/declare_lib proc waku_ping_peer( - ctx: ptr FFIContext[Waku], + ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer, peerAddr: cstring, @@ -18,12 +18,13 @@ proc waku_ping_peer( let timeout = chronos.milliseconds(timeoutMs) proc ping(): Future[Result[Duration, string]] {.async, gcsafe.} = try: - let conn = - await ctx.myLib[].node.switch.dial(peerInfo.peerId, peerInfo.addrs, PingCodec) + let conn = await ctx.myLib[].waku.node.switch.dial( + peerInfo.peerId, peerInfo.addrs, PingCodec + ) defer: await conn.close() - let pingRTT = await ctx.myLib[].node.libp2pPing.ping(conn) + let pingRTT = await ctx.myLib[].waku.node.libp2pPing.ping(conn) if pingRTT == 0.nanos: return err("could not ping peer: rtt-0") return ok(pingRTT) diff --git a/library/kernel_api/protocols/filter_api.nim b/library/kernel_api/protocols/filter_api.nim index 5e2628953..fb1c905ca 100644 --- a/library/kernel_api/protocols/filter_api.nim +++ b/library/kernel_api/protocols/filter_api.nim @@ -25,7 +25,7 @@ proc checkFilterClientMounted(waku: Waku): Result[string, string] = return ok("") proc waku_filter_subscribe( - ctx: ptr FFIContext[Waku], + ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer, pubSubTopic: cstring, @@ -36,18 +36,18 @@ proc waku_filter_subscribe( callEventCallback(ctx, "onReceivedMessage"): $JsonMessageEvent.new(pubsubTopic, msg) - checkFilterClientMounted(ctx.myLib[]).isOkOr: + checkFilterClientMounted(ctx.myLib[].waku).isOkOr: return err($error) var filterPushEventCallback = FilterPushHandler(onReceivedMessage(ctx)) - ctx.myLib[].node.wakuFilterClient.registerPushHandler(filterPushEventCallback) + ctx.myLib[].waku.node.wakuFilterClient.registerPushHandler(filterPushEventCallback) - let peer = ctx.myLib[].node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr: + let peer = ctx.myLib[].waku.node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr: let errorMsg = "could not find peer with WakuFilterSubscribeCodec when subscribing" error "fail filter subscribe", error = errorMsg return err(errorMsg) - let subFut = ctx.myLib[].node.filterSubscribe( + let subFut = ctx.myLib[].waku.node.filterSubscribe( some(PubsubTopic($pubsubTopic)), ($contentTopics).split(",").mapIt(ContentTopic(it)), peer, @@ -61,22 +61,22 @@ proc waku_filter_subscribe( return ok("") proc waku_filter_unsubscribe( - ctx: ptr FFIContext[Waku], + ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer, pubSubTopic: cstring, contentTopics: cstring, ) {.ffi.} = - checkFilterClientMounted(ctx.myLib[]).isOkOr: + checkFilterClientMounted(ctx.myLib[].waku).isOkOr: return err($error) - let peer = ctx.myLib[].node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr: + let peer = ctx.myLib[].waku.node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr: let errorMsg = "could not find peer with WakuFilterSubscribeCodec when unsubscribing" error "fail filter process", error = errorMsg return err(errorMsg) - let subFut = ctx.myLib[].node.filterUnsubscribe( + let subFut = ctx.myLib[].waku.node.filterUnsubscribe( some(PubsubTopic($pubsubTopic)), ($contentTopics).split(",").mapIt(ContentTopic(it)), peer, @@ -88,18 +88,18 @@ proc waku_filter_unsubscribe( return ok("") proc waku_filter_unsubscribe_all( - ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer + ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer ) {.ffi.} = - checkFilterClientMounted(ctx.myLib[]).isOkOr: + checkFilterClientMounted(ctx.myLib[].waku).isOkOr: return err($error) - let peer = ctx.myLib[].node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr: + let peer = ctx.myLib[].waku.node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr: let errorMsg = "could not find peer with WakuFilterSubscribeCodec when unsubscribing all" error "fail filter unsubscribe all", error = errorMsg return err(errorMsg) - let unsubFut = ctx.myLib[].node.filterUnsubscribeAll(peer) + let unsubFut = ctx.myLib[].waku.node.filterUnsubscribeAll(peer) if not await unsubFut.withTimeout(FilterOpTimeout): let errorMsg = "filter un-subscription all timed out" diff --git a/library/kernel_api/protocols/lightpush_api.nim b/library/kernel_api/protocols/lightpush_api.nim index dc9197481..287e37aa5 100644 --- a/library/kernel_api/protocols/lightpush_api.nim +++ b/library/kernel_api/protocols/lightpush_api.nim @@ -13,13 +13,13 @@ import library/declare_lib proc waku_lightpush_publish( - ctx: ptr FFIContext[Waku], + ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer, pubSubTopic: cstring, jsonWakuMessage: cstring, ) {.ffi.} = - if ctx.myLib[].node.wakuLightpushClient.isNil(): + if ctx.myLib[].waku.node.wakuLightpushClient.isNil(): let errorMsg = "LightpushRequest waku.node.wakuLightpushClient is nil" error "PUBLISH failed", error = errorMsg return err(errorMsg) @@ -35,14 +35,14 @@ proc waku_lightpush_publish( let msg = json_message_event.toWakuMessage(jsonMessage).valueOr: return err("Problem building the WakuMessage: " & $error) - let peerOpt = ctx.myLib[].node.peerManager.selectPeer(WakuLightPushCodec) + let peerOpt = ctx.myLib[].waku.node.peerManager.selectPeer(WakuLightPushCodec) if peerOpt.isNone(): let errorMsg = "failed to lightpublish message, no suitable remote peers" error "PUBLISH failed", error = errorMsg return err(errorMsg) let msgHashHex = ( - await ctx.myLib[].node.wakuLegacyLightpushClient.publish( + await ctx.myLib[].waku.node.wakuLegacyLightpushClient.publish( $pubsubTopic, msg, peer = peerOpt.get() ) ).valueOr: diff --git a/library/kernel_api/protocols/relay_api.nim b/library/kernel_api/protocols/relay_api.nim index ae7e29ceb..2195cdffd 100644 --- a/library/kernel_api/protocols/relay_api.nim +++ b/library/kernel_api/protocols/relay_api.nim @@ -15,54 +15,54 @@ import library/declare_lib proc waku_relay_get_peers_in_mesh( - ctx: ptr FFIContext[Waku], + ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer, pubSubTopic: cstring, ) {.ffi.} = - let meshPeers = ctx.myLib[].node.wakuRelay.getPeersInMesh($pubsubTopic).valueOr: + let meshPeers = ctx.myLib[].waku.node.wakuRelay.getPeersInMesh($pubsubTopic).valueOr: error "LIST_MESH_PEERS failed", error = error return err($error) ## returns a comma-separated string of peerIDs return ok(meshPeers.mapIt($it).join(",")) proc waku_relay_get_num_peers_in_mesh( - ctx: ptr FFIContext[Waku], + ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer, pubSubTopic: cstring, ) {.ffi.} = - let numPeersInMesh = ctx.myLib[].node.wakuRelay.getNumPeersInMesh($pubsubTopic).valueOr: + let numPeersInMesh = ctx.myLib[].waku.node.wakuRelay.getNumPeersInMesh($pubsubTopic).valueOr: error "NUM_MESH_PEERS failed", error = error return err($error) return ok($numPeersInMesh) proc waku_relay_get_connected_peers( - ctx: ptr FFIContext[Waku], + ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer, pubSubTopic: cstring, ) {.ffi.} = ## Returns the list of all connected peers to an specific pubsub topic - let connPeers = ctx.myLib[].node.wakuRelay.getConnectedPeers($pubsubTopic).valueOr: + let connPeers = ctx.myLib[].waku.node.wakuRelay.getConnectedPeers($pubsubTopic).valueOr: error "LIST_CONNECTED_PEERS failed", error = error return err($error) ## returns a comma-separated string of peerIDs return ok(connPeers.mapIt($it).join(",")) proc waku_relay_get_num_connected_peers( - ctx: ptr FFIContext[Waku], + ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer, pubSubTopic: cstring, ) {.ffi.} = - let numConnPeers = ctx.myLib[].node.wakuRelay.getNumConnectedPeers($pubsubTopic).valueOr: + let numConnPeers = ctx.myLib[].waku.node.wakuRelay.getNumConnectedPeers($pubsubTopic).valueOr: error "NUM_CONNECTED_PEERS failed", error = error return err($error) return ok($numConnPeers) proc waku_relay_add_protected_shard( - ctx: ptr FFIContext[Waku], + ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer, clusterId: cint, @@ -73,7 +73,7 @@ proc waku_relay_add_protected_shard( try: let relayShard = RelayShard(clusterId: uint16(clusterId), shardId: uint16(shardId)) let protectedShard = ProtectedShard.parseCmdArg($relayShard & ":" & $publicKey) - ctx.myLib[].node.wakuRelay.addSignedShardsValidator( + ctx.myLib[].waku.node.wakuRelay.addSignedShardsValidator( @[protectedShard], uint16(clusterId) ) except ValueError as exc: @@ -82,20 +82,20 @@ proc waku_relay_add_protected_shard( return ok("") proc waku_relay_subscribe( - ctx: ptr FFIContext[Waku], + ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer, pubSubTopic: cstring, ) {.ffi.} = echo "Subscribing to topic: " & $pubSubTopic & " ..." - proc onReceivedMessage(ctx: ptr FFIContext[Waku]): WakuRelayHandler = + proc onReceivedMessage(ctx: ptr FFIContext[LogosDelivery]): WakuRelayHandler = return proc(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async.} = callEventCallback(ctx, "onReceivedMessage"): $JsonMessageEvent.new(pubsubTopic, msg) var cb = onReceivedMessage(ctx) - ctx.myLib[].node.subscribe( + ctx.myLib[].waku.node.subscribe( (kind: SubscriptionKind.PubsubSub, topic: $pubsubTopic), handler = WakuRelayHandler(cb), ).isOkOr: @@ -104,19 +104,21 @@ proc waku_relay_subscribe( return ok("") proc waku_relay_unsubscribe( - ctx: ptr FFIContext[Waku], + ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer, pubSubTopic: cstring, ) {.ffi.} = - ctx.myLib[].node.unsubscribe((kind: SubscriptionKind.PubsubSub, topic: $pubsubTopic)).isOkOr: + ctx.myLib[].waku.node.unsubscribe( + (kind: SubscriptionKind.PubsubSub, topic: $pubsubTopic) + ).isOkOr: error "UNSUBSCRIBE failed", error = error return err($error) return ok("") proc waku_relay_publish( - ctx: ptr FFIContext[Waku], + ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer, pubSubTopic: cstring, @@ -136,7 +138,7 @@ proc waku_relay_publish( let msg = json_message_event.toWakuMessage(jsonMessage).valueOr: return err("Problem building the WakuMessage: " & $error) - (await ctx.myLib[].node.wakuRelay.publish($pubsubTopic, msg)).isOkOr: + (await ctx.myLib[].waku.node.wakuRelay.publish($pubsubTopic, msg)).isOkOr: error "PUBLISH failed", error = error return err($error) @@ -144,13 +146,13 @@ proc waku_relay_publish( return ok(msgHash) proc waku_default_pubsub_topic( - ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer + ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer ) {.ffi.} = # https://rfc.vac.dev/spec/36/#extern-char-waku_default_pubsub_topic return ok(DefaultPubsubTopic) proc waku_content_topic( - ctx: ptr FFIContext[Waku], + ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer, appName: cstring, @@ -163,7 +165,7 @@ proc waku_content_topic( return ok(fmt"/{$appName}/{$appVersion}/{$contentTopicName}/{$encoding}") proc waku_pubsub_topic( - ctx: ptr FFIContext[Waku], + ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer, topicName: cstring, diff --git a/library/kernel_api/protocols/store_api.nim b/library/kernel_api/protocols/store_api.nim index 22d514d14..7083f93f2 100644 --- a/library/kernel_api/protocols/store_api.nim +++ b/library/kernel_api/protocols/store_api.nim @@ -68,7 +68,7 @@ func fromJsonNode(jsonContent: JsonNode): Result[StoreQueryRequest, string] = ) proc waku_store_query( - ctx: ptr FFIContext[Waku], + ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer, jsonQuery: cstring, @@ -87,7 +87,7 @@ proc waku_store_query( return err("StoreRequest failed to parse peer addr: " & $error) let queryResponse = ( - await ctx.myLib[].node.wakuStoreClient.query(storeQueryRequest, peer) + await ctx.myLib[].waku.node.wakuStoreClient.query(storeQueryRequest, peer) ).valueOr: return err("StoreRequest failed store query: " & $error) diff --git a/library/liblogosdelivery.nim b/library/liblogosdelivery.nim index 08541634a..8dbcfac29 100644 --- a/library/liblogosdelivery.nim +++ b/library/liblogosdelivery.nim @@ -5,6 +5,7 @@ import logos_delivery/waku/waku_core/message/message, logos_delivery/waku/waku_core/topics/pubsub_topic, logos_delivery/waku/waku_relay, + logos_delivery, logos_delivery/waku/factory/waku, logos_delivery/waku/node/waku_node, logos_delivery/waku/node/health_monitor/health_status, @@ -46,7 +47,7 @@ proc waku_new( return nil ## Create the Waku thread that will keep waiting for req from the main thread. - var ctx = ffi.createFFIContext[Waku]().valueOr: + var ctx = ffi.createFFIContext[LogosDelivery]().valueOr: let msg = "Error in createFFIContext: " & $error callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) return nil @@ -93,7 +94,7 @@ proc waku_new( return ctx proc waku_destroy( - ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer + ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer ): cint {.dynlib, exportc, cdecl.} = initializeLibrary() checkParams(ctx, callback, userData) diff --git a/library/logos_delivery_api/debug_api.nim b/library/logos_delivery_api/debug_api.nim index e93467d84..98d48c97c 100644 --- a/library/logos_delivery_api/debug_api.nim +++ b/library/logos_delivery_api/debug_api.nim @@ -3,17 +3,17 @@ import logos_delivery/waku/factory/waku_state_info import tools/confutils/[cli_args, config_option_meta] proc logosdelivery_get_available_node_info_ids( - ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer + ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer ) {.ffi.} = ## Returns the list of all available node info item ids that ## can be queried with `get_node_info_item`. requireInitializedNode(ctx, "GetNodeInfoIds"): return err(errMsg) - return ok($ctx.myLib[].stateInfo.getAllPossibleInfoItemIds()) + return ok($ctx.myLib[].waku.stateInfo.getAllPossibleInfoItemIds()) proc logosdelivery_get_node_info( - ctx: ptr FFIContext[Waku], + ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer, nodeInfoId: cstring, @@ -28,10 +28,10 @@ proc logosdelivery_get_node_info( except ValueError: return err("Invalid node info id: " & $nodeInfoId) - return ok(ctx.myLib[].stateInfo.getNodeInfoItem(infoItemIdEnum)) + return ok(ctx.myLib[].waku.stateInfo.getNodeInfoItem(infoItemIdEnum)) proc logosdelivery_get_available_configs( - ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer + ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer ) {.ffi.} = ## Returns information about the accepted config items. requireInitializedNode(ctx, "GetAvailableConfigs"): diff --git a/library/logos_delivery_api/messaging_api.nim b/library/logos_delivery_api/messaging_api.nim index ff7c7f6d0..3c86138b4 100644 --- a/library/logos_delivery_api/messaging_api.nim +++ b/library/logos_delivery_api/messaging_api.nim @@ -9,7 +9,7 @@ import ../declare_lib proc logosdelivery_subscribe( - ctx: ptr FFIContext[Waku], + ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer, contentTopicStr: cstring, @@ -20,14 +20,14 @@ proc logosdelivery_subscribe( # ContentTopic is just a string type alias let contentTopic = ContentTopic($contentTopicStr) - (await api.subscribe(ctx.myLib[], contentTopic)).isOkOr: + (await api.subscribe(ctx.myLib[].waku, contentTopic)).isOkOr: let errMsg = $error return err("Subscribe failed: " & errMsg) return ok("") proc logosdelivery_unsubscribe( - ctx: ptr FFIContext[Waku], + ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer, contentTopicStr: cstring, @@ -38,14 +38,14 @@ proc logosdelivery_unsubscribe( # ContentTopic is just a string type alias let contentTopic = ContentTopic($contentTopicStr) - api.unsubscribe(ctx.myLib[], contentTopic).isOkOr: + api.unsubscribe(ctx.myLib[].waku, contentTopic).isOkOr: let errMsg = $error return err("Unsubscribe failed: " & errMsg) return ok("") proc logosdelivery_send( - ctx: ptr FFIContext[Waku], + ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer, messageJson: cstring, @@ -83,8 +83,8 @@ proc logosdelivery_send( contentTopic = contentTopic, payload = payload, ephemeral = ephemeral ) - # Send the message - let requestId = (await api.send(ctx.myLib[], envelope)).valueOr: + # Send the message via the messaging layer's own API. + let requestId = (await ctx.myLib[].messagingClient.send(envelope)).valueOr: let errMsg = $error return err("Send failed: " & errMsg) diff --git a/library/logos_delivery_api/node_api.nim b/library/logos_delivery_api/node_api.nim index 58785e80d..f6a0945ec 100644 --- a/library/logos_delivery_api/node_api.nim +++ b/library/logos_delivery_api/node_api.nim @@ -1,7 +1,7 @@ import std/json import chronos, chronicles, results, ffi import - logos_delivery/waku/factory/waku, + logos_delivery, logos_delivery/waku/node/waku_node, logos_delivery/waku/api/[api, types], logos_delivery/waku/events/[message_events, health_events], @@ -13,14 +13,14 @@ import proc `%`*(id: RequestId): JsonNode = %($id) -registerReqFFI(CreateNodeRequest, ctx: ptr FFIContext[Waku]): +registerReqFFI(CreateNodeRequest, ctx: ptr FFIContext[LogosDelivery]): proc(configJson: cstring): Future[Result[string, string]] {.async.} = let conf = parseNodeConfFromJson($configJson).valueOr: error "Failed to assemble WakuNodeConf from JSON", error = error, configJson = $configJson return err("failed parseNodeConfFromJson " & error) - ctx.myLib[] = (await api.createNode(conf)).valueOr: + ctx.myLib[] = (await LogosDelivery.new(conf)).valueOr: let errMsg = $error chronicles.error "CreateNodeRequest failed", err = errMsg return err(errMsg) @@ -28,7 +28,7 @@ registerReqFFI(CreateNodeRequest, ctx: ptr FFIContext[Waku]): return ok("") proc logosdelivery_destroy( - ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer + ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer ): cint {.dynlib, exportc, cdecl.} = initializeLibrary() checkParams(ctx, callback, userData) @@ -52,7 +52,7 @@ proc logosdelivery_create_node( echo "error: missing callback in logosdelivery_create_node" return nil - var ctx = ffi.createFFIContext[Waku]().valueOr: + var ctx = ffi.createFFIContext[LogosDelivery]().valueOr: let msg = "Error in createFFIContext: " & $error callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) return nil @@ -73,14 +73,14 @@ proc logosdelivery_create_node( return ctx proc logosdelivery_start_node( - ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer + ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer ) {.ffi.} = requireInitializedNode(ctx, "START_NODE"): return err(errMsg) # setting up outgoing event listeners let sentListener = MessageSentEvent.listen( - ctx.myLib[].brokerCtx, + ctx.myLib[].waku.brokerCtx, proc(event: MessageSentEvent) {.async: (raises: []).} = callEventCallback(ctx, "onMessageSent"): $newJsonEvent("message_sent", event), @@ -89,7 +89,7 @@ proc logosdelivery_start_node( return err("MessageSentEvent.listen failed: " & $error) let errorListener = MessageErrorEvent.listen( - ctx.myLib[].brokerCtx, + ctx.myLib[].waku.brokerCtx, proc(event: MessageErrorEvent) {.async: (raises: []).} = callEventCallback(ctx, "onMessageError"): $newJsonEvent("message_error", event), @@ -98,7 +98,7 @@ proc logosdelivery_start_node( return err("MessageErrorEvent.listen failed: " & $error) let propagatedListener = MessagePropagatedEvent.listen( - ctx.myLib[].brokerCtx, + ctx.myLib[].waku.brokerCtx, proc(event: MessagePropagatedEvent) {.async: (raises: []).} = callEventCallback(ctx, "onMessagePropagated"): $newJsonEvent("message_propagated", event), @@ -107,7 +107,7 @@ proc logosdelivery_start_node( return err("MessagePropagatedEvent.listen failed: " & $error) let receivedListener = MessageReceivedEvent.listen( - ctx.myLib[].brokerCtx, + ctx.myLib[].waku.brokerCtx, proc(event: MessageReceivedEvent) {.async: (raises: []).} = callEventCallback(ctx, "onMessageReceived"): $newJsonEvent("message_received", event), @@ -116,7 +116,7 @@ proc logosdelivery_start_node( return err("MessageReceivedEvent.listen failed: " & $error) let ConnectionStatusChangeListener = EventConnectionStatusChange.listen( - ctx.myLib[].brokerCtx, + ctx.myLib[].waku.brokerCtx, proc(event: EventConnectionStatusChange) {.async: (raises: []).} = callEventCallback(ctx, "onConnectionStatusChange"): $newJsonEvent("connection_status_change", event), @@ -124,16 +124,6 @@ proc logosdelivery_start_node( chronicles.error "ConnectionStatusChange.listen failed", err = $error return err("ConnectionStatusChange.listen failed: " & $error) - ctx.myLib[].mountMessagingClient().isOkOr: - let errMsg = $error - chronicles.error "mountMessagingClient failed", error = errMsg - return err("failed to mount messaging: " & errMsg) - - ctx.myLib[].mountReliableChannelManager().isOkOr: - let errMsg = $error - chronicles.error "mountReliableChannelManager failed", err = errMsg - return err("failed to mount reliable channel manager: " & errMsg) - (await ctx.myLib[].start()).isOkOr: let errMsg = $error chronicles.error "START_NODE failed", err = errMsg @@ -141,16 +131,16 @@ proc logosdelivery_start_node( return ok("") proc logosdelivery_stop_node( - ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer + ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer ) {.ffi.} = requireInitializedNode(ctx, "STOP_NODE"): return err(errMsg) - await MessageErrorEvent.dropAllListeners(ctx.myLib[].brokerCtx) - await MessageSentEvent.dropAllListeners(ctx.myLib[].brokerCtx) - await MessagePropagatedEvent.dropAllListeners(ctx.myLib[].brokerCtx) - await MessageReceivedEvent.dropAllListeners(ctx.myLib[].brokerCtx) - await EventConnectionStatusChange.dropAllListeners(ctx.myLib[].brokerCtx) + await MessageErrorEvent.dropAllListeners(ctx.myLib[].waku.brokerCtx) + await MessageSentEvent.dropAllListeners(ctx.myLib[].waku.brokerCtx) + await MessagePropagatedEvent.dropAllListeners(ctx.myLib[].waku.brokerCtx) + await MessageReceivedEvent.dropAllListeners(ctx.myLib[].waku.brokerCtx) + await EventConnectionStatusChange.dropAllListeners(ctx.myLib[].waku.brokerCtx) (await ctx.myLib[].stop()).isOkOr: let errMsg = $error diff --git a/logos_delivery.nim b/logos_delivery.nim index c4edbe6d6..bab1d3e24 100644 --- a/logos_delivery.nim +++ b/logos_delivery.nim @@ -1,10 +1,15 @@ -## Main module for using nwaku as a Nimble library +## Package entry point for using Logos Messaging as a Nimble library. ## -## This module re-exports the public API for creating and managing Waku nodes -## when using nwaku as a library dependency. +## This root module is a thin aggregator, following the standard Nimble layout: +## the implementation lives under `./logos_delivery/`, and importing the package +## name re-exports `LogosDelivery` together with every per-layer public API. +## +## See `logos_delivery/logos_delivery.nim` for `LogosDelivery`, the pure +## concentrator that owns one instance of each API layer +## +## Waku <- MessagingClient <- ReliableChannelManager +## +## and drives their shared `new` / `start` / `stop` lifecycle. -import logos_delivery/waku/api -export api - -import logos_delivery/waku/factory/waku -export waku +import ./logos_delivery/logos_delivery as logos_delivery_impl +export logos_delivery_impl diff --git a/logos_delivery/channels/reliable_channel_manager.nim b/logos_delivery/channels/reliable_channel_manager.nim index 0717eb3f8..887444b19 100644 --- a/logos_delivery/channels/reliable_channel_manager.nim +++ b/logos_delivery/channels/reliable_channel_manager.nim @@ -15,6 +15,7 @@ import brokers/broker_context import logos_delivery/waku/events/message_events as waku_message_events import logos_delivery/messaging/messaging_client +import logos_delivery/waku/api/types import logos_delivery/waku/waku_core/topics import logos_delivery/waku/persistency/sds_persistency @@ -27,30 +28,43 @@ const SdsJobId = "sds" ## One persistency job shared by every channel's SDS state; rows are ## keyed by channelId. -type ReliableChannelManager* = ref object - channels: Table[ChannelId, ReliableChannel] - messagingClient: MessagingClient ## Borrowed from the owning `Waku`. - sendHandler: SendHandler - ## Default egress dispatch for channels created through this manager. - ## Constructed at mount time as a closure over `MessagingClient.send` - ## so the channel layer itself stays callable-only. - brokerCtx: BrokerContext +type + ReliableChannelManagerConf* = object + ## Per-layer config object for the reliable + ## channel API. Placeholder for now (segmentation / SDS / rate-limit defaults + ## will move here in a follow-up PR); kept so each layer owns its own config. + + ReliableChannelManager* = ref object + channels: Table[ChannelId, ReliableChannel] + messagingClient: MessagingClient ## The channel layer chains onto messaging. + sendHandler: SendHandler + ## Default egress dispatch for channels created through this manager. + ## Built in `new` as a closure over `MessagingClient.send` so the channel + ## layer itself stays callable-only. + brokerCtx: BrokerContext proc new*( T: type ReliableChannelManager, + conf: ReliableChannelManagerConf, messagingClient: MessagingClient, - sendHandler: SendHandler, brokerCtx: BrokerContext = globalBrokerContext(), ): Result[T, string] = + ## The reliable channel layer chains onto the messaging layer: its default + ## egress is `MessagingClient.send`, wrapped here so callers never wire the + ## handler themselves. if messagingClient.isNil(): return err("messaging client is required") - if sendHandler.isNil(): - return err("sendHandler is required") + + let defaultSendHandler: SendHandler = proc( + envelope: MessageEnvelope + ): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} = + return await messagingClient.send(envelope) + return ok( T( channels: initTable[ChannelId, ReliableChannel](), messagingClient: messagingClient, - sendHandler: sendHandler, + sendHandler: defaultSendHandler, brokerCtx: brokerCtx, ) ) diff --git a/logos_delivery/logos_delivery.nim b/logos_delivery/logos_delivery.nim new file mode 100644 index 000000000..f01cf4207 --- /dev/null +++ b/logos_delivery/logos_delivery.nim @@ -0,0 +1,104 @@ +## `LogosDelivery` is the project entry point. It is a pure concentrator: it +## owns exactly one instance of each API layer +## +## Waku <- MessagingClient <- ReliableChannelManager +## +## and chains them together (each layer drives the one below it). Every layer +## keeps its own, separate public API — `LogosDelivery` only wires them up and +## drives the shared `new` / `start` / `stop` lifecycle. + +{.push raises: [].} + +import results, chronos, chronicles + +import logos_delivery/waku/api +export api +import logos_delivery/waku/factory/waku +export waku +import logos_delivery/messaging/messaging_client +export messaging_client +import logos_delivery/channels/reliable_channel_manager +export reliable_channel_manager + +import logos_delivery/waku/factory/waku_conf +import logos_delivery/waku/factory/app_callbacks +import logos_delivery/waku/api/[api_conf, types] + +logScope: + topics = "logosdelivery" + +type + LogosDeliveryConf* = object + ## Aggregates the per-layer config objects. For now + ## the sub-configs are derived from `WakuConf`; richer per-layer configuration + ## (and how it is sourced) lands in a follow-up PR. + waku*: WakuConf + messaging*: MessagingClientConf + reliableChannel*: ReliableChannelManagerConf + + LogosDelivery* = ref object ## Entry point. Holds one instance of each API layer. + waku*: Waku + messagingClient*: MessagingClient + reliableChannelManager*: ReliableChannelManager + +proc init*(T: type LogosDeliveryConf, wakuConf: WakuConf): LogosDeliveryConf = + ## Builds the aggregated config from a `WakuConf`. The messaging / reliable + ## channel layers carry trivial config today; this is the seam where their + ## dedicated config will be threaded through later. + LogosDeliveryConf( + waku: wakuConf, + messaging: MessagingClientConf(useP2PReliability: wakuConf.p2pReliability), + reliableChannel: ReliableChannelManagerConf(), + ) + +proc new*( + T: type LogosDelivery, conf: WakuNodeConf, appCallbacks: AppCallbacks = nil +): Future[Result[LogosDelivery, string]] {.async.} = + ## Single entry point, from the CLI configuration type. Derives the aggregated + ## per-layer config, then creates the full stack bottom-up so each layer can + ## chain onto the one below. + let wakuConf = conf.toWakuConf().valueOr: + return err("failed to handle the configuration: " & error) + let layerConf = LogosDeliveryConf.init(wakuConf) + + let waku = (await Waku.new(layerConf.waku, appCallbacks)).valueOr: + return err("failed to create Waku: " & error) + + let messagingClient = MessagingClient.new(layerConf.messaging, waku.node).valueOr: + return err("failed to create MessagingClient: " & error) + + let reliableChannelManager = ReliableChannelManager.new( + layerConf.reliableChannel, messagingClient, waku.brokerCtx + ).valueOr: + return err("failed to create ReliableChannelManager: " & error) + + return ok( + T( + waku: waku, + messagingClient: messagingClient, + reliableChannelManager: reliableChannelManager, + ) + ) + +proc start*(self: LogosDelivery): Future[Result[void, string]] {.async.} = + ## Starts each layer bottom-up: transport first, then messaging, then channels. + (await self.waku.start()).isOkOr: + return err("failed to start Waku: " & error) + + self.messagingClient.start().isOkOr: + return err("failed to start MessagingClient: " & error) + + self.reliableChannelManager.start().isOkOr: + return err("failed to start ReliableChannelManager: " & error) + + return ok() + +proc stop*(self: LogosDelivery): Future[Result[void, string]] {.async.} = + ## Stops in reverse order so higher layers drain before their dependencies. + await self.reliableChannelManager.stop() + await self.messagingClient.stop() + + (await self.waku.stop()).isOkOr: + return err("failed to stop Waku: " & error) + + return ok() diff --git a/logos_delivery/messaging/messaging_client.nim b/logos_delivery/messaging/messaging_client.nim index 1d3892250..652671e46 100644 --- a/logos_delivery/messaging/messaging_client.nim +++ b/logos_delivery/messaging/messaging_client.nim @@ -6,16 +6,25 @@ import logos_delivery/messaging/delivery_service/[recv_service, send_service], logos_delivery/messaging/delivery_service/send_service/delivery_task -type MessagingClient* = ref object - node: WakuNode - sendService*: SendService - recvService*: RecvService - started: bool +type + MessagingClientConf* = object + ## Per-layer config object for the messaging API. + ## Kept intentionally minimal for now; the full config surface lands in a + ## follow-up PR. Today it only carries the p2p reliability toggle. + useP2PReliability*: bool + + MessagingClient* = ref object + node: WakuNode + sendService*: SendService + recvService*: RecvService + started: bool proc new*( - T: type MessagingClient, useP2PReliability: bool, node: WakuNode + T: type MessagingClient, conf: MessagingClientConf, node: WakuNode ): Result[T, string] = - let sendService = ?SendService.new(useP2PReliability, node) + ## The messaging layer chains onto Waku: it drives the underlying + ## `WakuNode` (Waku's core) for transport while exposing its own send/recv API. + let sendService = ?SendService.new(conf.useP2PReliability, node) let recvService = RecvService.new(node) ok(T(node: node, sendService: sendService, recvService: recvService)) diff --git a/logos_delivery/waku/api/api.nim b/logos_delivery/waku/api/api.nim index 5be1e2086..0ffc38dc8 100644 --- a/logos_delivery/waku/api/api.nim +++ b/logos_delivery/waku/api/api.nim @@ -4,9 +4,7 @@ import std/[net, options] import chronicles, chronos, libp2p/peerid, results import logos_delivery/waku/factory/waku -import logos_delivery/messaging/messaging_client import logos_delivery/waku/[requests/health_requests, waku_core, waku_node] -import logos_delivery/messaging/delivery_service/send_service import logos_delivery/waku/node/subscription_manager import libp2p/peerid import tools/confutils/cli_args @@ -48,9 +46,3 @@ proc unsubscribe*(w: Waku, contentTopic: ContentTopic): Result[void, string] = ?checkApiAvailability(w) return w.node.subscriptionManager.unsubscribe(contentTopic) - -proc send*( - w: Waku, envelope: MessageEnvelope -): Future[Result[RequestId, string]] {.async.} = - ?checkApiAvailability(w) - return await w.messagingClient.send(envelope) diff --git a/logos_delivery/waku/factory/waku.nim b/logos_delivery/waku/factory/waku.nim index e15a92fd0..a797b8294 100644 --- a/logos_delivery/waku/factory/waku.nim +++ b/logos_delivery/waku/factory/waku.nim @@ -49,8 +49,6 @@ import factory/app_callbacks, persistency/persistency, ], - logos_delivery/channels/reliable_channel_manager, - logos_delivery/messaging/messaging_client, ./waku_conf, ./waku_state_info @@ -76,10 +74,6 @@ type Waku* = ref object healthMonitor*: NodeHealthMonitor - messagingClient*: MessagingClient - - reliableChannelManager*: ReliableChannelManager - restServer*: WakuRestServerRef metricsServer*: MetricsHttpServerRef appCallbacks*: AppCallbacks @@ -384,35 +378,6 @@ proc startDnsDiscoveryRetryLoop(waku: Waku): Future[void] {.async.} = error "failed to connect to dynamic bootstrap nodes: " & getCurrentExceptionMsg() return -proc mountMessagingClient*(waku: Waku): Result[void, string] = - if not waku.messagingClient.isNil(): - return err("messaging client already mounted") - if waku.node.started: - return err("cannot mount messaging client on a started node") - waku.messagingClient = MessagingClient.new(waku.conf.p2pReliability, waku.node).valueOr: - return err("could not create messaging client: " & $error) - return ok() - -proc mountReliableChannelManager*(waku: Waku): Result[void, string] = - if not waku.reliableChannelManager.isNil(): - return err("reliable channel manager already mounted") - if waku.messagingClient.isNil(): - return err("reliable channel manager requires a mounted messaging client") - if waku.node.started: - return err("cannot mount reliable channel manager on a started node") - - let messagingClient = waku.messagingClient - let defaultSendHandler: SendHandler = proc( - envelope: MessageEnvelope - ): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} = - return await messagingClient.send(envelope) - - waku.reliableChannelManager = ReliableChannelManager.new( - messagingClient, defaultSendHandler, waku.brokerCtx - ).valueOr: - return err("could not create reliable channel manager: " & $error) - return ok() - proc start*(waku: Waku): Future[Result[void, string]] {.async: (raises: []).} = if waku.node.started: warn "start: waku node already started" @@ -565,14 +530,6 @@ proc start*(waku: Waku): Future[Result[void, string]] {.async: (raises: []).} = ) waku.healthMonitor.setOverallHealth(HealthStatus.READY) - if not waku.messagingClient.isNil(): - waku.messagingClient.start().isOkOr: - return err("failed to start messaging client: " & $error) - - if not waku.reliableChannelManager.isNil(): - waku.reliableChannelManager.start().isOkOr: - return err("failed to start reliable channel manager: " & $error) - return ok() proc stop*(waku: Waku): Future[Result[void, string]] {.async: (raises: []).} = @@ -590,12 +547,6 @@ proc stop*(waku: Waku): Future[Result[void, string]] {.async: (raises: []).} = if not waku.wakuDiscv5.isNil(): await waku.wakuDiscv5.stop() - if not waku.reliableChannelManager.isNil(): - await waku.reliableChannelManager.stop() - - if not waku.messagingClient.isNil(): - await waku.messagingClient.stop() - if not waku.node.isNil(): await waku.node.stop() diff --git a/tests/api/test_api_health.nim b/tests/api/test_api_health.nim index d8a7eabf2..83efedd18 100644 --- a/tests/api/test_api_health.nim +++ b/tests/api/test_api_health.nim @@ -74,7 +74,7 @@ proc waitForShardHealthy( suite "LM API health checking": var serviceNode {.threadvar.}: WakuNode - client {.threadvar.}: Waku + client {.threadvar.}: LogosDelivery servicePeerInfo {.threadvar.}: RemotePeerInfo asyncSetup: @@ -102,9 +102,7 @@ suite "LM API health checking": conf.numShardsInNetwork = 1 conf.rest = false - client = (await createNode(conf)).valueOr: - raiseAssert error - client.mountMessagingClient().isOkOr: + client = (await LogosDelivery.new(conf)).valueOr: raiseAssert error (await client.start()).isOkOr: raiseAssert error @@ -114,13 +112,13 @@ suite "LM API health checking": await serviceNode.stop() asyncTest "RequestShardTopicsHealth, check PubsubTopic health": - client.node.wakuRelay.subscribe(DefaultShard, dummyHandler) - await client.node.connectToNodes(@[servicePeerInfo]) + client.waku.node.wakuRelay.subscribe(DefaultShard, dummyHandler) + await client.waku.node.connectToNodes(@[servicePeerInfo]) var isHealthy = false let start = Moment.now() while Moment.now() - start < TestTimeout: - let req = RequestShardTopicsHealth.request(client.brokerCtx, @[DefaultShard]).valueOr: + let req = RequestShardTopicsHealth.request(client.waku.brokerCtx, @[DefaultShard]).valueOr: raiseAssert "RequestShardTopicsHealth failed" if req.topicHealth.len > 0: @@ -134,22 +132,22 @@ suite "LM API health checking": asyncTest "RequestShardTopicsHealth, check disconnected PubsubTopic": const GhostShard = PubsubTopic("/waku/2/rs/1/666") - client.node.wakuRelay.subscribe(GhostShard, dummyHandler) + client.waku.node.wakuRelay.subscribe(GhostShard, dummyHandler) - let req = RequestShardTopicsHealth.request(client.brokerCtx, @[GhostShard]).valueOr: + let req = RequestShardTopicsHealth.request(client.waku.brokerCtx, @[GhostShard]).valueOr: raiseAssert "Request failed" check req.topicHealth.len > 0 check req.topicHealth[0].health == TopicHealth.UNHEALTHY asyncTest "RequestProtocolHealth, check relay status": - await client.node.connectToNodes(@[servicePeerInfo]) + await client.waku.node.connectToNodes(@[servicePeerInfo]) var isReady = false let start = Moment.now() while Moment.now() - start < TestTimeout: let relayReq = await RequestProtocolHealth.request( - client.brokerCtx, WakuProtocol.RelayProtocol + client.waku.brokerCtx, WakuProtocol.RelayProtocol ) if relayReq.isOk() and relayReq.get().healthStatus.health == HealthStatus.READY: isReady = true @@ -158,14 +156,16 @@ suite "LM API health checking": check isReady == true - let storeReq = - await RequestProtocolHealth.request(client.brokerCtx, WakuProtocol.StoreProtocol) + let storeReq = await RequestProtocolHealth.request( + client.waku.brokerCtx, WakuProtocol.StoreProtocol + ) if storeReq.isOk(): check storeReq.get().healthStatus.health != HealthStatus.READY asyncTest "RequestProtocolHealth, check unmounted protocol": - let req = - await RequestProtocolHealth.request(client.brokerCtx, WakuProtocol.StoreProtocol) + let req = await RequestProtocolHealth.request( + client.waku.brokerCtx, WakuProtocol.StoreProtocol + ) check req.isOk() let status = req.get().healthStatus @@ -173,16 +173,16 @@ suite "LM API health checking": check status.desc.isNone() asyncTest "RequestConnectionStatus, check connectivity state": - let initialReq = RequestConnectionStatus.request(client.brokerCtx).valueOr: + let initialReq = RequestConnectionStatus.request(client.waku.brokerCtx).valueOr: raiseAssert "RequestConnectionStatus failed" check initialReq.connectionStatus == ConnectionStatus.Disconnected - await client.node.connectToNodes(@[servicePeerInfo]) + await client.waku.node.connectToNodes(@[servicePeerInfo]) var isConnected = false let start = Moment.now() while Moment.now() - start < TestTimeout: - let req = RequestConnectionStatus.request(client.brokerCtx).valueOr: + let req = RequestConnectionStatus.request(client.waku.brokerCtx).valueOr: raiseAssert "RequestConnectionStatus failed" if req.connectionStatus == ConnectionStatus.PartiallyConnected or @@ -194,29 +194,30 @@ suite "LM API health checking": check isConnected == true asyncTest "EventConnectionStatusChange, detect connect and disconnect": - let connectFuture = - waitForConnectionStatus(client.brokerCtx, ConnectionStatus.PartiallyConnected) + let connectFuture = waitForConnectionStatus( + client.waku.brokerCtx, ConnectionStatus.PartiallyConnected + ) - await client.node.connectToNodes(@[servicePeerInfo]) + await client.waku.node.connectToNodes(@[servicePeerInfo]) await connectFuture let disconnectFuture = - waitForConnectionStatus(client.brokerCtx, ConnectionStatus.Disconnected) - await client.node.disconnectNode(servicePeerInfo) + waitForConnectionStatus(client.waku.brokerCtx, ConnectionStatus.Disconnected) + await client.waku.node.disconnectNode(servicePeerInfo) await disconnectFuture asyncTest "EventShardTopicHealthChange, detect health improvement": - client.node.wakuRelay.subscribe(DefaultShard, dummyHandler) + client.waku.node.wakuRelay.subscribe(DefaultShard, dummyHandler) - let healthEventFuture = waitForShardHealthy(client.brokerCtx) + let healthEventFuture = waitForShardHealthy(client.waku.brokerCtx) - await client.node.connectToNodes(@[servicePeerInfo]) + await client.waku.node.connectToNodes(@[servicePeerInfo]) let event = await healthEventFuture check event.topic == DefaultShard asyncTest "RequestHealthReport, check aggregate report": - let req = await RequestHealthReport.request(client.brokerCtx) + let req = await RequestHealthReport.request(client.waku.brokerCtx) check req.isOk() @@ -228,7 +229,8 @@ suite "LM API health checking": asyncTest "RequestContentTopicsHealth, smoke test": let fictionalTopic = ContentTopic("/waku/2/this-does-not-exist/proto") - let req = RequestContentTopicsHealth.request(client.brokerCtx, @[fictionalTopic]) + let req = + RequestContentTopicsHealth.request(client.waku.brokerCtx, @[fictionalTopic]) check req.isOk() @@ -241,20 +243,20 @@ suite "LM API health checking": let cTopic = ContentTopic("/waku/2/my-content-topic/proto") let shardReq = - RequestRelayShard.request(client.brokerCtx, none(PubsubTopic), cTopic) + RequestRelayShard.request(client.waku.brokerCtx, none(PubsubTopic), cTopic) check shardReq.isOk() let targetShard = $shardReq.get().relayShard - client.node.wakuRelay.subscribe(targetShard, dummyHandler) + client.waku.node.wakuRelay.subscribe(targetShard, dummyHandler) serviceNode.wakuRelay.subscribe(targetShard, dummyHandler) - await client.node.connectToNodes(@[servicePeerInfo]) + await client.waku.node.connectToNodes(@[servicePeerInfo]) var isHealthy = false let start = Moment.now() while Moment.now() - start < TestTimeout: - let req = RequestContentTopicsHealth.request(client.brokerCtx, @[cTopic]).valueOr: + let req = RequestContentTopicsHealth.request(client.waku.brokerCtx, @[cTopic]).valueOr: raiseAssert "Request failed" if req.contentTopicHealth.len > 0: @@ -268,7 +270,7 @@ suite "LM API health checking": check isHealthy == true asyncTest "RequestProtocolHealth, edge mode smoke test": - var edgeWaku: Waku + var edgeWaku: LogosDelivery lockNewGlobalBrokerContext: var edgeConf = defaultWakuNodeConf().valueOr: @@ -281,20 +283,18 @@ suite "LM API health checking": edgeConf.maxMessageSize = "150 KiB" edgeConf.rest = false - edgeWaku = (await createNode(edgeConf)).valueOr: + edgeWaku = (await LogosDelivery.new(edgeConf)).valueOr: raiseAssert "Failed to create edge node: " & error - edgeWaku.mountMessagingClient().isOkOr: - raiseAssert "Failed to mount edge messaging: " & error (await edgeWaku.start()).isOkOr: raiseAssert "Failed to start edge waku: " & error let relayReq = await RequestProtocolHealth.request( - edgeWaku.brokerCtx, WakuProtocol.RelayProtocol + edgeWaku.waku.brokerCtx, WakuProtocol.RelayProtocol ) check relayReq.isOk() check relayReq.get().healthStatus.health == HealthStatus.NOT_MOUNTED - check not edgeWaku.node.wakuFilterClient.isNil() + check not edgeWaku.waku.node.wakuFilterClient.isNil() discard await edgeWaku.stop() diff --git a/tests/api/test_api_receive.nim b/tests/api/test_api_receive.nim index 362ee1847..d9eb02427 100644 --- a/tests/api/test_api_receive.nim +++ b/tests/api/test_api_receive.nim @@ -98,7 +98,7 @@ proc createApiNodeConf(numShards: uint16 = 1): WakuNodeConf = type TestNetwork = ref object storeNode: WakuNode publisher: WakuNode - subscriber: Waku + subscriber: LogosDelivery storeNodePeerInfo: RemotePeerInfo missedPayload: seq[byte] @@ -158,12 +158,11 @@ proc setupNetwork(testTopic: ContentTopic): Future[TestNetwork] {.async.} = # subscriber: created before the publish so the message timestamp lands after # its RecvService startTimeToCheck watermark - var subscriber: Waku + var subscriber: LogosDelivery lockNewGlobalBrokerContext: - subscriber = (await createNode(createApiNodeConf(numShards))).expect( + subscriber = (await LogosDelivery.new(createApiNodeConf(numShards))).expect( "Failed to create subscriber" ) - subscriber.mountMessagingClient().expect("Failed to mount messaging") (await subscriber.start()).expect("Failed to start subscriber") # publish while the subscriber is offline: the message reaches the archive but @@ -188,7 +187,7 @@ proc setupNetwork(testTopic: ContentTopic): Future[TestNetwork] {.async.} = raiseAssert "Message was not archived in time" # subscribe to the content topic; with no peers yet the subscriber stays offline - (await subscriber.subscribe(testTopic)).expect("Failed to subscribe") + (await subscriber.waku.subscribe(testTopic)).expect("Failed to subscribe") return TestNetwork( storeNode: storeNode, @@ -217,11 +216,11 @@ suite "Messaging API, Receive Service (store recovery)": defer: await net.teardown() - let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1) + let eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1) defer: await eventManager.teardown() - await net.subscriber.node.connectToNodes(@[net.storeNodePeerInfo]) + await net.subscriber.waku.node.connectToNodes(@[net.storeNodePeerInfo]) await net.subscriber.messagingClient.recvService.checkStore() check await eventManager.waitForEvents(TestTimeout) @@ -236,15 +235,15 @@ suite "Messaging API, Receive Service (store recovery)": defer: await net.teardown() - let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1) + let eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1) defer: await eventManager.teardown() # sync on coming online (the transition that fires the backfill) before asserting let onlineFut = waitForConnectionStatus( - net.subscriber.brokerCtx, ConnectionStatus.PartiallyConnected + net.subscriber.waku.brokerCtx, ConnectionStatus.PartiallyConnected ) - await net.subscriber.node.connectToNodes(@[net.storeNodePeerInfo]) + await net.subscriber.waku.node.connectToNodes(@[net.storeNodePeerInfo]) await onlineFut check await eventManager.waitForEvents(TestTimeout) diff --git a/tests/api/test_api_send.nim b/tests/api/test_api_send.nim index e51473b17..5c75f2e4a 100644 --- a/tests/api/test_api_send.nim +++ b/tests/api/test_api_send.nim @@ -237,12 +237,10 @@ suite "Waku API - Send": ) asyncTest "Check API availability (unhealthy node)": - var node: Waku + var node: LogosDelivery lockNewGlobalBrokerContext: - node = (await createNode(createApiNodeConf())).valueOr: + node = (await LogosDelivery.new(createApiNodeConf())).valueOr: raiseAssert error - node.mountMessagingClient().isOkOr: - raiseAssert "Failed to mount messaging: " & error (await node.start()).isOkOr: raiseAssert "Failed to start Waku node: " & error # node is not connected ! @@ -251,7 +249,7 @@ suite "Waku API - Send": ContentTopic("/waku/2/default-content/proto"), "test payload" ) - let sendResult = await node.send(envelope) + let sendResult = await node.messagingClient.send(envelope) # TODO: The API is not enforcing a health check before the send, # so currently this test cannot successfully fail to send. @@ -261,20 +259,18 @@ suite "Waku API - Send": raiseAssert "Failed to stop node: " & error asyncTest "Send fully validated": - var node: Waku + var node: LogosDelivery lockNewGlobalBrokerContext: - node = (await createNode(createApiNodeConf())).valueOr: + node = (await LogosDelivery.new(createApiNodeConf())).valueOr: raiseAssert error - node.mountMessagingClient().isOkOr: - raiseAssert "Failed to mount messaging: " & error (await node.start()).isOkOr: raiseAssert "Failed to start Waku node: " & error - await node.node.connectToNodes( + await node.waku.node.connectToNodes( @[relayNode1PeerInfo, lightpushNodePeerInfo, storeNodePeerInfo] ) - let eventManager = newSendEventListenerManager(node.brokerCtx) + let eventManager = newSendEventListenerManager(node.waku.brokerCtx) defer: await eventManager.teardown() @@ -282,7 +278,7 @@ suite "Waku API - Send": ContentTopic("/waku/2/default-content/proto"), "test payload" ) - let requestId = (await node.send(envelope)).valueOr: + let requestId = (await node.messagingClient.send(envelope)).valueOr: raiseAssert error # Wait for events with timeout @@ -297,18 +293,16 @@ suite "Waku API - Send": raiseAssert "Failed to stop node: " & error asyncTest "Send only propagates": - var node: Waku + var node: LogosDelivery lockNewGlobalBrokerContext: - node = (await createNode(createApiNodeConf())).valueOr: + node = (await LogosDelivery.new(createApiNodeConf())).valueOr: raiseAssert error - node.mountMessagingClient().isOkOr: - raiseAssert "Failed to mount messaging: " & error (await node.start()).isOkOr: raiseAssert "Failed to start Waku node: " & error - await node.node.connectToNodes(@[relayNode1PeerInfo]) + await node.waku.node.connectToNodes(@[relayNode1PeerInfo]) - let eventManager = newSendEventListenerManager(node.brokerCtx) + let eventManager = newSendEventListenerManager(node.waku.brokerCtx) defer: await eventManager.teardown() @@ -316,7 +310,7 @@ suite "Waku API - Send": ContentTopic("/waku/2/default-content/proto"), "test payload" ) - let requestId = (await node.send(envelope)).valueOr: + let requestId = (await node.messagingClient.send(envelope)).valueOr: raiseAssert error # Wait for events with timeout @@ -329,18 +323,16 @@ suite "Waku API - Send": raiseAssert "Failed to stop node: " & error asyncTest "Send only propagates fallback to lightpush": - var node: Waku + var node: LogosDelivery lockNewGlobalBrokerContext: - node = (await createNode(createApiNodeConf())).valueOr: + node = (await LogosDelivery.new(createApiNodeConf())).valueOr: raiseAssert error - node.mountMessagingClient().isOkOr: - raiseAssert "Failed to mount messaging: " & error (await node.start()).isOkOr: raiseAssert "Failed to start Waku node: " & error - await node.node.connectToNodes(@[lightpushNodePeerInfo]) + await node.waku.node.connectToNodes(@[lightpushNodePeerInfo]) - let eventManager = newSendEventListenerManager(node.brokerCtx) + let eventManager = newSendEventListenerManager(node.waku.brokerCtx) defer: await eventManager.teardown() @@ -348,7 +340,7 @@ suite "Waku API - Send": ContentTopic("/waku/2/default-content/proto"), "test payload" ) - let requestId = (await node.send(envelope)).valueOr: + let requestId = (await node.messagingClient.send(envelope)).valueOr: raiseAssert error # Wait for events with timeout @@ -361,18 +353,16 @@ suite "Waku API - Send": raiseAssert "Failed to stop node: " & error asyncTest "Send fully validates fallback to lightpush": - var node: Waku + var node: LogosDelivery lockNewGlobalBrokerContext: - node = (await createNode(createApiNodeConf())).valueOr: + node = (await LogosDelivery.new(createApiNodeConf())).valueOr: raiseAssert error - node.mountMessagingClient().isOkOr: - raiseAssert "Failed to mount messaging: " & error (await node.start()).isOkOr: raiseAssert "Failed to start Waku node: " & error - await node.node.connectToNodes(@[lightpushNodePeerInfo, storeNodePeerInfo]) + await node.waku.node.connectToNodes(@[lightpushNodePeerInfo, storeNodePeerInfo]) - let eventManager = newSendEventListenerManager(node.brokerCtx) + let eventManager = newSendEventListenerManager(node.waku.brokerCtx) defer: await eventManager.teardown() @@ -380,7 +370,7 @@ suite "Waku API - Send": ContentTopic("/waku/2/default-content/proto"), "test payload" ) - let requestId = (await node.send(envelope)).valueOr: + let requestId = (await node.messagingClient.send(envelope)).valueOr: raiseAssert error # Wait for events with timeout @@ -417,18 +407,16 @@ suite "Waku API - Send": ).isOkOr: raiseAssert "Failed to subscribe fakeLightpushNode: " & error - var node: Waku + var node: LogosDelivery lockNewGlobalBrokerContext: - node = (await createNode(createApiNodeConf(cli_args.WakuMode.Edge))).valueOr: + node = (await LogosDelivery.new(createApiNodeConf(cli_args.WakuMode.Edge))).valueOr: raiseAssert error - node.mountMessagingClient().isOkOr: - raiseAssert "Failed to mount messaging: " & error (await node.start()).isOkOr: raiseAssert "Failed to start Waku node: " & error - await node.node.connectToNodes(@[fakeLightpushNodePeerInfo]) + await node.waku.node.connectToNodes(@[fakeLightpushNodePeerInfo]) - let eventManager = newSendEventListenerManager(node.brokerCtx) + let eventManager = newSendEventListenerManager(node.waku.brokerCtx) defer: await eventManager.teardown() @@ -436,7 +424,7 @@ suite "Waku API - Send": ContentTopic("/waku/2/default-content/proto"), "test payload" ) - let requestId = (await node.send(envelope)).valueOr: + let requestId = (await node.messagingClient.send(envelope)).valueOr: raiseAssert error echo "Sent message with requestId=", requestId diff --git a/tests/api/test_api_subscription.nim b/tests/api/test_api_subscription.nim index 2a6ce36ee..1169a7366 100644 --- a/tests/api/test_api_subscription.nim +++ b/tests/api/test_api_subscription.nim @@ -64,7 +64,7 @@ proc waitForEvents( type TestNetwork = ref object publisher: WakuNode # Relay node that publishes messages in tests. meshBuddy: WakuNode # Extra relay peer for publisher's mesh (Edge tests only). - subscriber: Waku + subscriber: LogosDelivery # The receiver node in tests. Edge node in edge tests, Core node in relay tests. publisherPeerInfo: RemotePeerInfo @@ -83,11 +83,10 @@ proc createApiNodeConf( conf.rest = false result = conf -proc setupSubscriberNode(conf: WakuNodeConf): Future[Waku] {.async.} = - var node: Waku +proc setupSubscriberNode(conf: WakuNodeConf): Future[LogosDelivery] {.async.} = + var node: LogosDelivery lockNewGlobalBrokerContext: - node = (await createNode(conf)).expect("Failed to create subscriber node") - node.mountMessagingClient().expect("Failed to mount messaging") + node = (await LogosDelivery.new(conf)).expect("Failed to create subscriber node") (await node.start()).expect("Failed to start subscriber node") return node @@ -141,7 +140,7 @@ proc setupNetwork( net.subscriber = await setupSubscriberNode(createApiNodeConf(mode, numShards)) - await net.subscriber.node.connectToNodes(@[net.publisherPeerInfo]) + await net.subscriber.waku.node.connectToNodes(@[net.publisherPeerInfo]) return net @@ -171,28 +170,30 @@ proc waitForMesh(node: WakuNode, shard: PubsubTopic) {.async.} = await sleepAsync(100.milliseconds) raise newException(ValueError, "GossipSub Mesh failed to stabilize on " & shard) -proc waitForEdgeSubs(w: Waku, shard: PubsubTopic) {.async.} = +proc waitForEdgeSubs(w: LogosDelivery, shard: PubsubTopic) {.async.} = let deadline = Moment.now() + EdgeWaitTimeout while Moment.now() < deadline: - if w.node.subscriptionManager.edgeFilterPeerCount(shard) > 0: + if w.waku.node.subscriptionManager.edgeFilterPeerCount(shard) > 0: return await sleepAsync(100.milliseconds) raise newException(ValueError, "Edge filter subscription failed on " & shard) -proc edgePeersReached(w: Waku, shard: PubsubTopic, n: int): Future[bool] {.async.} = +proc edgePeersReached( + w: LogosDelivery, shard: PubsubTopic, n: int +): Future[bool] {.async.} = let deadline = Moment.now() + EdgeWaitTimeout while Moment.now() < deadline: - if w.node.subscriptionManager.edgeFilterPeerCount(shard) >= n: + if w.waku.node.subscriptionManager.edgeFilterPeerCount(shard) >= n: return true await sleepAsync(100.milliseconds) return false proc edgePeersDroppedBelow( - w: Waku, shard: PubsubTopic, n: int + w: LogosDelivery, shard: PubsubTopic, n: int ): Future[bool] {.async.} = let deadline = Moment.now() + EdgeWaitTimeout while Moment.now() < deadline: - if w.node.subscriptionManager.edgeFilterPeerCount(shard) < n: + if w.waku.node.subscriptionManager.edgeFilterPeerCount(shard) < n: return true await sleepAsync(100.milliseconds) return false @@ -201,7 +202,7 @@ proc publishToMesh( net: TestNetwork, contentTopic: ContentTopic, payload: seq[byte] ): Future[Result[int, string]] {.async.} = # Publishes a message from "publisher" via relay into the gossipsub mesh. - let shard = net.subscriber.node.getRelayShard(contentTopic) + let shard = net.subscriber.waku.node.getRelayShard(contentTopic) await waitForMesh(net.publisher, shard) let msg = WakuMessage( payload: payload, contentTopic: contentTopic, version: 0, timestamp: now() @@ -213,7 +214,7 @@ proc publishToMeshAfterEdgeReady( ): Future[Result[int, string]] {.async.} = # First, ensure "subscriber" node (an edge node) is subscribed and ready to receive. # Afterwards, "publisher" (relay node) sends the message in the gossipsub network. - let shard = net.subscriber.node.getRelayShard(contentTopic) + let shard = net.subscriber.waku.node.getRelayShard(contentTopic) await waitForEdgeSubs(net.subscriber, shard) return await net.publishToMesh(contentTopic, payload) @@ -224,11 +225,11 @@ suite "Messaging API, SubscriptionManager": await net.teardown() let testTopic = ContentTopic("/waku/2/test-content/proto") - (await net.subscriber.subscribe(testTopic)).expect( + (await net.subscriber.waku.subscribe(testTopic)).expect( "subscriberNode failed to subscribe" ) - let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1) + let eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1) defer: await eventManager.teardown() @@ -247,9 +248,9 @@ suite "Messaging API, SubscriptionManager": let subbedTopic = ContentTopic("/waku/2/subbed-topic/proto") let ignoredTopic = ContentTopic("/waku/2/ignored-topic/proto") - (await net.subscriber.subscribe(subbedTopic)).expect("failed to subscribe") + (await net.subscriber.waku.subscribe(subbedTopic)).expect("failed to subscribe") - let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1) + let eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1) defer: await eventManager.teardown() @@ -267,10 +268,10 @@ suite "Messaging API, SubscriptionManager": let testTopic = ContentTopic("/waku/2/unsub-test/proto") - (await net.subscriber.subscribe(testTopic)).expect("failed to subscribe") - net.subscriber.unsubscribe(testTopic).expect("failed to unsubscribe") + (await net.subscriber.waku.subscribe(testTopic)).expect("failed to subscribe") + net.subscriber.waku.unsubscribe(testTopic).expect("failed to unsubscribe") - let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1) + let eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1) defer: await eventManager.teardown() @@ -288,14 +289,14 @@ suite "Messaging API, SubscriptionManager": let topicA = ContentTopic("/waku/2/topic-a/proto") let topicB = ContentTopic("/waku/2/topic-b/proto") - (await net.subscriber.subscribe(topicA)).expect("failed to sub A") - (await net.subscriber.subscribe(topicB)).expect("failed to sub B") + (await net.subscriber.waku.subscribe(topicA)).expect("failed to sub A") + (await net.subscriber.waku.subscribe(topicB)).expect("failed to sub B") - let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1) + let eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1) defer: await eventManager.teardown() - net.subscriber.unsubscribe(topicA).expect("failed to unsub A") + net.subscriber.waku.unsubscribe(topicA).expect("failed to unsub A") discard (await net.publishToMesh(topicA, "Dropped Message".toBytes())).expect( "Publish A failed" @@ -314,11 +315,11 @@ suite "Messaging API, SubscriptionManager": let glitchTopic = ContentTopic("/waku/2/glitch/proto") - (await net.subscriber.subscribe(glitchTopic)).expect("failed to sub") - (await net.subscriber.subscribe(glitchTopic)).expect("failed to double sub") - net.subscriber.unsubscribe(glitchTopic).expect("failed to unsub") + (await net.subscriber.waku.subscribe(glitchTopic)).expect("failed to sub") + (await net.subscriber.waku.subscribe(glitchTopic)).expect("failed to double sub") + net.subscriber.waku.unsubscribe(glitchTopic).expect("failed to unsub") - let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1) + let eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1) defer: await eventManager.teardown() @@ -337,9 +338,9 @@ suite "Messaging API, SubscriptionManager": let testTopic = ContentTopic("/waku/2/resub-test/proto") # Subscribe - (await net.subscriber.subscribe(testTopic)).expect("Initial sub failed") + (await net.subscriber.waku.subscribe(testTopic)).expect("Initial sub failed") - var eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1) + var eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1) discard (await net.publishToMesh(testTopic, "Msg 1".toBytes())).expect("Pub 1 failed") @@ -347,8 +348,8 @@ suite "Messaging API, SubscriptionManager": await eventManager.teardown() # Unsubscribe and verify teardown - net.subscriber.unsubscribe(testTopic).expect("Unsub failed") - eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1) + net.subscriber.waku.unsubscribe(testTopic).expect("Unsub failed") + eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1) discard (await net.publishToMesh(testTopic, "Ghost".toBytes())).expect("Ghost pub failed") @@ -357,8 +358,8 @@ suite "Messaging API, SubscriptionManager": await eventManager.teardown() # Resubscribe - (await net.subscriber.subscribe(testTopic)).expect("Resub failed") - eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1) + (await net.subscriber.waku.subscribe(testTopic)).expect("Resub failed") + eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1) discard (await net.publishToMesh(testTopic, "Msg 2".toBytes())).expect("Pub 2 failed") @@ -376,15 +377,15 @@ suite "Messaging API, SubscriptionManager": # generate two content topics that land in two different shards var i = 0 - while net.subscriber.node.getRelayShard(topicA) == - net.subscriber.node.getRelayShard(topicB): + while net.subscriber.waku.node.getRelayShard(topicA) == + net.subscriber.waku.node.getRelayShard(topicB): topicB = ContentTopic("/appB" & $i & "/2/shard-test-b/proto") inc i - (await net.subscriber.subscribe(topicA)).expect("failed to sub A") - (await net.subscriber.subscribe(topicB)).expect("failed to sub B") + (await net.subscriber.waku.subscribe(topicA)).expect("failed to sub A") + (await net.subscriber.waku.subscribe(topicB)).expect("failed to sub B") - let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 2) + let eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 2) defer: await eventManager.teardown() @@ -411,7 +412,7 @@ suite "Messaging API, SubscriptionManager": proc verifyNetworkState(expected: seq[ContentTopic]) {.async.} = let eventManager = - newReceiveEventListenerManager(net.subscriber.brokerCtx, expected.len) + newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, expected.len) for topic in allTopics: discard (await net.publishToMesh(topic, "Stress Payload".toBytes())).expect( @@ -439,7 +440,7 @@ suite "Messaging API, SubscriptionManager": # subscribe to all content topics we generated for t in allTopics: - (await net.subscriber.subscribe(t)).expect("sub failed") + (await net.subscriber.waku.subscribe(t)).expect("sub failed") activeSubs.add(t) await verifyNetworkState(activeSubs) @@ -447,7 +448,7 @@ suite "Messaging API, SubscriptionManager": # unsubscribe from some content topics for i in 0 ..< 50: let t = allTopics[i] - net.subscriber.unsubscribe(t).expect("unsub failed") + net.subscriber.waku.unsubscribe(t).expect("unsub failed") let idx = activeSubs.find(t) if idx >= 0: @@ -458,7 +459,7 @@ suite "Messaging API, SubscriptionManager": # re-subscribe to some content topics for i in 0 ..< 25: let t = allTopics[i] - (await net.subscriber.subscribe(t)).expect("resub failed") + (await net.subscriber.waku.subscribe(t)).expect("resub failed") activeSubs.add(t) await verifyNetworkState(activeSubs) @@ -469,9 +470,9 @@ suite "Messaging API, SubscriptionManager": await net.teardown() let testTopic = ContentTopic("/waku/2/test-content/proto") - (await net.subscriber.subscribe(testTopic)).expect("failed to subscribe") + (await net.subscriber.waku.subscribe(testTopic)).expect("failed to subscribe") - let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1) + let eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1) defer: await eventManager.teardown() @@ -490,9 +491,9 @@ suite "Messaging API, SubscriptionManager": let subbedTopic = ContentTopic("/waku/2/subbed-topic/proto") let ignoredTopic = ContentTopic("/waku/2/ignored-topic/proto") - (await net.subscriber.subscribe(subbedTopic)).expect("failed to subscribe") + (await net.subscriber.waku.subscribe(subbedTopic)).expect("failed to subscribe") - let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1) + let eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1) defer: await eventManager.teardown() @@ -510,10 +511,10 @@ suite "Messaging API, SubscriptionManager": let testTopic = ContentTopic("/waku/2/unsub-test/proto") - (await net.subscriber.subscribe(testTopic)).expect("failed to subscribe") - net.subscriber.unsubscribe(testTopic).expect("failed to unsubscribe") + (await net.subscriber.waku.subscribe(testTopic)).expect("failed to subscribe") + net.subscriber.waku.unsubscribe(testTopic).expect("failed to unsubscribe") - let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1) + let eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1) defer: await eventManager.teardown() @@ -531,17 +532,17 @@ suite "Messaging API, SubscriptionManager": let topicA = ContentTopic("/waku/2/topic-a/proto") let topicB = ContentTopic("/waku/2/topic-b/proto") - (await net.subscriber.subscribe(topicA)).expect("failed to sub A") - (await net.subscriber.subscribe(topicB)).expect("failed to sub B") + (await net.subscriber.waku.subscribe(topicA)).expect("failed to sub A") + (await net.subscriber.waku.subscribe(topicB)).expect("failed to sub B") - let shard = net.subscriber.node.getRelayShard(topicA) + let shard = net.subscriber.waku.node.getRelayShard(topicA) await waitForEdgeSubs(net.subscriber, shard) - let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1) + let eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1) defer: await eventManager.teardown() - net.subscriber.unsubscribe(topicA).expect("failed to unsub A") + net.subscriber.waku.unsubscribe(topicA).expect("failed to unsub A") discard (await net.publishToMesh(topicA, "Dropped Message".toBytes())).expect( "Publish A failed" @@ -560,9 +561,9 @@ suite "Messaging API, SubscriptionManager": let testTopic = ContentTopic("/waku/2/resub-test/proto") - (await net.subscriber.subscribe(testTopic)).expect("Initial sub failed") + (await net.subscriber.waku.subscribe(testTopic)).expect("Initial sub failed") - var eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1) + var eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1) discard (await net.publishToMeshAfterEdgeReady(testTopic, "Msg 1".toBytes())).expect( "Pub 1 failed" ) @@ -570,8 +571,8 @@ suite "Messaging API, SubscriptionManager": require await eventManager.waitForEvents(TestTimeout) await eventManager.teardown() - net.subscriber.unsubscribe(testTopic).expect("Unsub failed") - eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1) + net.subscriber.waku.unsubscribe(testTopic).expect("Unsub failed") + eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1) discard (await net.publishToMesh(testTopic, "Ghost".toBytes())).expect("Ghost pub failed") @@ -579,8 +580,8 @@ suite "Messaging API, SubscriptionManager": check not await eventManager.waitForEvents(NegativeTestTimeout) await eventManager.teardown() - (await net.subscriber.subscribe(testTopic)).expect("Resub failed") - eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1) + (await net.subscriber.waku.subscribe(testTopic)).expect("Resub failed") + eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1) discard (await net.publishToMeshAfterEdgeReady(testTopic, "Msg 2".toBytes())).expect( "Pub 2 failed" @@ -640,19 +641,19 @@ suite "Messaging API, SubscriptionManager": await meshBuddy.connectToNodes(@[publisherPeerInfo]) let conf = createApiNodeConf(cli_args.WakuMode.Edge, numShards) - var subscriber: Waku + var subscriber: LogosDelivery lockNewGlobalBrokerContext: - subscriber = (await createNode(conf)).expect("Failed to create edge subscriber") - subscriber.mountMessagingClient().expect("Failed to mount messaging") + subscriber = + (await LogosDelivery.new(conf)).expect("Failed to create edge subscriber") (await subscriber.start()).expect("Failed to start edge subscriber") # Connect edge subscriber to both filter servers so selectPeers finds both - await subscriber.node.connectToNodes(@[publisherPeerInfo, meshBuddyPeerInfo]) + await subscriber.waku.node.connectToNodes(@[publisherPeerInfo, meshBuddyPeerInfo]) let testTopic = ContentTopic("/waku/2/failover-test/proto") - let shard = subscriber.node.getRelayShard(testTopic) + let shard = subscriber.waku.node.getRelayShard(testTopic) - (await subscriber.subscribe(testTopic)).expect("Failed to subscribe") + (await subscriber.waku.subscribe(testTopic)).expect("Failed to subscribe") # Wait for dialing both filter servers (HealthyThreshold = 2) check await edgePeersReached(subscriber, shard, 2) @@ -660,7 +661,7 @@ suite "Messaging API, SubscriptionManager": # Verify message delivery with both servers alive await waitForMesh(publisher, shard) - var eventManager = newReceiveEventListenerManager(subscriber.brokerCtx, 1) + var eventManager = newReceiveEventListenerManager(subscriber.waku.brokerCtx, 1) let msg1 = WakuMessage( payload: "Before failover".toBytes(), contentTopic: testTopic, @@ -674,14 +675,14 @@ suite "Messaging API, SubscriptionManager": await eventManager.teardown() # Disconnect meshBuddy from edge (keeps relay mesh alive for publishing) - await subscriber.node.disconnectNode(meshBuddyPeerInfo) + await subscriber.waku.node.disconnectNode(meshBuddyPeerInfo) # Wait for the dead peer to be pruned check await edgePeersDroppedBelow(subscriber, shard, 2) - check subscriber.node.subscriptionManager.edgeFilterPeerCount(shard) >= 1 + check subscriber.waku.node.subscriptionManager.edgeFilterPeerCount(shard) >= 1 # Verify messages still arrive through the surviving filter server (publisher) - eventManager = newReceiveEventListenerManager(subscriber.brokerCtx, 1) + eventManager = newReceiveEventListenerManager(subscriber.waku.brokerCtx, 1) let msg2 = WakuMessage( payload: "After failover".toBytes(), contentTopic: testTopic, @@ -769,33 +770,33 @@ suite "Messaging API, SubscriptionManager": await sparePeer.connectToNodes(@[publisherPeerInfo]) let conf = createApiNodeConf(cli_args.WakuMode.Edge, numShards) - var subscriber: Waku + var subscriber: LogosDelivery lockNewGlobalBrokerContext: - subscriber = (await createNode(conf)).expect("Failed to create edge subscriber") - subscriber.mountMessagingClient().expect("Failed to mount messaging") + subscriber = + (await LogosDelivery.new(conf)).expect("Failed to create edge subscriber") (await subscriber.start()).expect("Failed to start edge subscriber") - await subscriber.node.connectToNodes( + await subscriber.waku.node.connectToNodes( @[publisherPeerInfo, meshBuddyPeerInfo, sparePeerInfo] ) let testTopic = ContentTopic("/waku/2/replacement-test/proto") - let shard = subscriber.node.getRelayShard(testTopic) + let shard = subscriber.waku.node.getRelayShard(testTopic) - (await subscriber.subscribe(testTopic)).expect("Failed to subscribe") + (await subscriber.waku.subscribe(testTopic)).expect("Failed to subscribe") # Wait for 2 confirmed peers (HealthyThreshold). The 3rd is available but not dialed. check await edgePeersReached(subscriber, shard, 2) - require subscriber.node.subscriptionManager.edgeFilterPeerCount(shard) == 2 + require subscriber.waku.node.subscriptionManager.edgeFilterPeerCount(shard) == 2 - await subscriber.node.disconnectNode(meshBuddyPeerInfo) + await subscriber.waku.node.disconnectNode(meshBuddyPeerInfo) # Wait for the sub loop to detect the loss and dial a replacement check await edgePeersReached(subscriber, shard, 2) await waitForMesh(publisher, shard) - var eventManager = newReceiveEventListenerManager(subscriber.brokerCtx, 1) + var eventManager = newReceiveEventListenerManager(subscriber.waku.brokerCtx, 1) let msg = WakuMessage( payload: "After replacement".toBytes(), contentTopic: testTopic, diff --git a/tests/channels/test_reliable_channel_send_receive.nim b/tests/channels/test_reliable_channel_send_receive.nim index ca6177e9d..881c22b3a 100644 --- a/tests/channels/test_reliable_channel_send_receive.nim +++ b/tests/channels/test_reliable_channel_send_receive.nim @@ -53,14 +53,12 @@ suite "Reliable Channel - ingress": contentTopic = ContentTopic("/reliable-channel/test/proto") let appPayload = "hello reliable channel".toBytes() - var waku: Waku + var waku: LogosDelivery var manager: ReliableChannelManager var brokerCtx: BrokerContext lockNewGlobalBrokerContext: brokerCtx = globalBrokerContext() - waku = (await createNode(createApiNodeConf())).expect("createNode") - waku.mountMessagingClient().expect("mountMessagingClient") - waku.mountReliableChannelManager().expect("mountReliableChannelManager") + waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode") manager = waku.reliableChannelManager ## Noop encryption providers so the Encrypt/Decrypt brokers have @@ -121,14 +119,12 @@ suite "Reliable Channel - ingress": contentTopic = ContentTopic("/reliable-channel/test/proto") let appPayload = "foreign payload".toBytes() - var waku: Waku + var waku: LogosDelivery var manager: ReliableChannelManager var brokerCtx: BrokerContext lockNewGlobalBrokerContext: brokerCtx = globalBrokerContext() - waku = (await createNode(createApiNodeConf())).expect("createNode") - waku.mountMessagingClient().expect("mountMessagingClient") - waku.mountReliableChannelManager().expect("mountReliableChannelManager") + waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode") manager = waku.reliableChannelManager setNoopEncryption() @@ -180,14 +176,12 @@ suite "Reliable Channel - send state machine": contentTopic = ContentTopic("/reliable-channel/test/sm-success") fakeMsgReqId = RequestId("fake-msg-req-1") - var waku: Waku + var waku: LogosDelivery var manager: ReliableChannelManager var brokerCtx: BrokerContext lockNewGlobalBrokerContext: brokerCtx = globalBrokerContext() - waku = (await createNode(createApiNodeConf())).expect("createNode") - waku.mountMessagingClient().expect("mountMessagingClient") - waku.mountReliableChannelManager().expect("mountReliableChannelManager") + waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode") manager = waku.reliableChannelManager setNoopEncryption() @@ -247,14 +241,12 @@ suite "Reliable Channel - send state machine": channelId = ChannelId("sm-multi-channel") contentTopic = ContentTopic("/reliable-channel/test/sm-multi") - var waku: Waku + var waku: LogosDelivery var manager: ReliableChannelManager var brokerCtx: BrokerContext lockNewGlobalBrokerContext: brokerCtx = globalBrokerContext() - waku = (await createNode(createApiNodeConf())).expect("createNode") - waku.mountMessagingClient().expect("mountMessagingClient") - waku.mountReliableChannelManager().expect("mountReliableChannelManager") + waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode") manager = waku.reliableChannelManager setNoopEncryption() @@ -350,14 +342,12 @@ suite "Reliable Channel - send state machine": channelId = ChannelId("sm-race-channel") contentTopic = ContentTopic("/reliable-channel/test/sm-race") - var waku: Waku + var waku: LogosDelivery var manager: ReliableChannelManager var brokerCtx: BrokerContext lockNewGlobalBrokerContext: brokerCtx = globalBrokerContext() - waku = (await createNode(createApiNodeConf())).expect("createNode") - waku.mountMessagingClient().expect("mountMessagingClient") - waku.mountReliableChannelManager().expect("mountReliableChannelManager") + waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode") manager = waku.reliableChannelManager setNoopEncryption() @@ -459,12 +449,10 @@ suite "Reliable Channel - SDS persistence": Persistency.reset() removeDir(root) - var waku: Waku + var waku: LogosDelivery var manager: ReliableChannelManager lockNewGlobalBrokerContext: - waku = (await createNode(createApiNodeConf())).expect("createNode") - waku.mountMessagingClient().expect("mountMessagingClient") - waku.mountReliableChannelManager().expect("mountReliableChannelManager") + waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode") manager = waku.reliableChannelManager setNoopEncryption() @@ -529,14 +517,12 @@ suite "Reliable Channel - SDS lifecycle": let payload1 = "first message".toBytes() let payload2 = "second message".toBytes() - var waku: Waku + var waku: LogosDelivery var manager: ReliableChannelManager var brokerCtx: BrokerContext lockNewGlobalBrokerContext: brokerCtx = globalBrokerContext() - waku = (await createNode(createApiNodeConf())).expect("createNode") - waku.mountMessagingClient().expect("mountMessagingClient") - waku.mountReliableChannelManager().expect("mountReliableChannelManager") + waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode") manager = waku.reliableChannelManager setNoopEncryption() @@ -602,14 +588,12 @@ suite "Reliable Channel - SDS lifecycle": contentTopic = ContentTopic("/reliable-channel/test/dup") let appPayload = "deliver once".toBytes() - var waku: Waku + var waku: LogosDelivery var manager: ReliableChannelManager var brokerCtx: BrokerContext lockNewGlobalBrokerContext: brokerCtx = globalBrokerContext() - waku = (await createNode(createApiNodeConf())).expect("createNode") - waku.mountMessagingClient().expect("mountMessagingClient") - waku.mountReliableChannelManager().expect("mountReliableChannelManager") + waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode") manager = waku.reliableChannelManager setNoopEncryption() @@ -661,14 +645,12 @@ suite "Reliable Channel - SDS lifecycle": channelId = ChannelId("sds-foreign-channel") contentTopic = ContentTopic("/reliable-channel/test/foreign") - var waku: Waku + var waku: LogosDelivery var manager: ReliableChannelManager var brokerCtx: BrokerContext lockNewGlobalBrokerContext: brokerCtx = globalBrokerContext() - waku = (await createNode(createApiNodeConf())).expect("createNode") - waku.mountMessagingClient().expect("mountMessagingClient") - waku.mountReliableChannelManager().expect("mountReliableChannelManager") + waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode") manager = waku.reliableChannelManager setNoopEncryption() @@ -723,14 +705,12 @@ suite "Reliable Channel - SDS lifecycle": Persistency.reset() removeDir(root) - var waku: Waku + var waku: LogosDelivery var manager: ReliableChannelManager var brokerCtx: BrokerContext lockNewGlobalBrokerContext: brokerCtx = globalBrokerContext() - waku = (await createNode(createApiNodeConf())).expect("createNode") - waku.mountMessagingClient().expect("mountMessagingClient") - waku.mountReliableChannelManager().expect("mountReliableChannelManager") + waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode") manager = waku.reliableChannelManager setNoopEncryption() @@ -807,14 +787,12 @@ suite "Reliable Channel - SDS protocol semantics": channelId = ChannelId("sds-semantics-channel") contentTopic = ContentTopic("/reliable-channel/test/semantics") - var waku: Waku + var waku: LogosDelivery var manager: ReliableChannelManager var brokerCtx: BrokerContext lockNewGlobalBrokerContext: brokerCtx = globalBrokerContext() - waku = (await createNode(createApiNodeConf())).expect("createNode") - waku.mountMessagingClient().expect("mountMessagingClient") - waku.mountReliableChannelManager().expect("mountReliableChannelManager") + waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode") manager = waku.reliableChannelManager setNoopEncryption() @@ -877,14 +855,12 @@ suite "Reliable Channel - SDS protocol semantics": Persistency.reset() removeDir(root) - var waku: Waku + var waku: LogosDelivery var manager: ReliableChannelManager var brokerCtx: BrokerContext lockNewGlobalBrokerContext: brokerCtx = globalBrokerContext() - waku = (await createNode(createApiNodeConf())).expect("createNode") - waku.mountMessagingClient().expect("mountMessagingClient") - waku.mountReliableChannelManager().expect("mountReliableChannelManager") + waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode") manager = waku.reliableChannelManager setNoopEncryption() @@ -967,14 +943,12 @@ suite "Reliable Channel - SDS protocol semantics": let payloads = @["chain first".toBytes(), "chain second".toBytes(), "chain third".toBytes()] - var waku: Waku + var waku: LogosDelivery var manager: ReliableChannelManager var brokerCtx: BrokerContext lockNewGlobalBrokerContext: brokerCtx = globalBrokerContext() - waku = (await createNode(createApiNodeConf())).expect("createNode") - waku.mountMessagingClient().expect("mountMessagingClient") - waku.mountReliableChannelManager().expect("mountReliableChannelManager") + waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode") manager = waku.reliableChannelManager setNoopEncryption() @@ -1044,14 +1018,12 @@ suite "Reliable Channel - SDS protocol semantics": contentTopic = ContentTopic("/reliable-channel/test/sync") let appPayload = "real message".toBytes() - var waku: Waku + var waku: LogosDelivery var manager: ReliableChannelManager var brokerCtx: BrokerContext lockNewGlobalBrokerContext: brokerCtx = globalBrokerContext() - waku = (await createNode(createApiNodeConf())).expect("createNode") - waku.mountMessagingClient().expect("mountMessagingClient") - waku.mountReliableChannelManager().expect("mountReliableChannelManager") + waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode") manager = waku.reliableChannelManager setNoopEncryption() @@ -1119,14 +1091,12 @@ suite "Reliable Channel - SDS protocol semantics": contentTopic = ContentTopic("/reliable-channel/test/unique-id") let appPayload = "ok".toBytes() - var waku: Waku + var waku: LogosDelivery var manager: ReliableChannelManager var brokerCtx: BrokerContext lockNewGlobalBrokerContext: brokerCtx = globalBrokerContext() - waku = (await createNode(createApiNodeConf())).expect("createNode") - waku.mountMessagingClient().expect("mountMessagingClient") - waku.mountReliableChannelManager().expect("mountReliableChannelManager") + waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode") manager = waku.reliableChannelManager setNoopEncryption() @@ -1189,12 +1159,10 @@ suite "Reliable Channel - SDS protocol semantics": (await waku.stop()).expect("stop") asyncTest "manager rejects operations on unknown channels": - var waku: Waku + var waku: LogosDelivery var manager: ReliableChannelManager lockNewGlobalBrokerContext: - waku = (await createNode(createApiNodeConf())).expect("createNode") - waku.mountMessagingClient().expect("mountMessagingClient") - waku.mountReliableChannelManager().expect("mountReliableChannelManager") + waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode") manager = waku.reliableChannelManager check (await manager.send(ChannelId("no-such-channel"), "x".toBytes())).isErr() diff --git a/tests/node/test_wakunode_health_monitor.nim b/tests/node/test_wakunode_health_monitor.nim index 480cb235c..04a39455a 100644 --- a/tests/node/test_wakunode_health_monitor.nim +++ b/tests/node/test_wakunode_health_monitor.nim @@ -228,8 +228,9 @@ suite "Health Monitor - events": nodeA.mountMetadata(1, @[0'u16]).expect("Node A failed to mount metadata") await nodeA.start() - let ds = - MessagingClient.new(false, nodeA).expect("Failed to create MessagingClient") + let ds = MessagingClient + .new(MessagingClientConf(useP2PReliability: false), nodeA) + .expect("Failed to create MessagingClient") ds.start().expect("Failed to start MessagingClient") let monitorA = NodeHealthMonitor.new(nodeA) @@ -332,8 +333,9 @@ suite "Health Monitor - events": nodeA.mountMetadata(1, @[0'u16]).expect("Node A failed to mount metadata") await nodeA.start() - let ds = - MessagingClient.new(false, nodeA).expect("Failed to create MessagingClient") + let ds = MessagingClient + .new(MessagingClientConf(useP2PReliability: false), nodeA) + .expect("Failed to create MessagingClient") ds.start().expect("Failed to start MessagingClient") let subMgr = nodeA.subscriptionManager