diff --git a/library/kernel_api/debug_node_api.nim b/library/kernel_api/debug_node_api.nim index 44764a673..7d39935c6 100644 --- a/library/kernel_api/debug_node_api.nim +++ b/library/kernel_api/debug_node_api.nim @@ -1,53 +1,46 @@ -import std/json -import - chronicles, - chronos, - results, - eth/p2p/discoveryv5/enr, - strutils, - libp2p/peerid, - metrics, - ffi -import - logos_delivery/waku/waku, - logos_delivery/waku/node/waku_node, - logos_delivery/waku/node/health_monitor, - library/declare_lib - -proc getMultiaddresses(node: WakuNode): seq[string] = - return node.info().listenAddresses - -proc getMetrics(): string = - {.gcsafe.}: - return defaultRegistry.toText() ## defaultRegistry is {.global.} in metrics module +import std/strutils +import chronos, results, ffi +import logos_delivery, library/declare_lib proc waku_version( ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer ) {.ffi.} = - return ok(WakuNodeVersionString) + let v = (await ctx.myLib[].waku.version()).valueOr: + return err(error) + return ok(v) proc waku_listen_addresses( ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer ) {.ffi.} = ## returns a comma-separated string of the listen addresses - return ok(ctx.myLib[].waku.node.getMultiaddresses().join(",")) + let addrs = (await ctx.myLib[].waku.listenAddresses()).valueOr: + return err(error) + return ok(addrs.join(",")) proc waku_get_my_enr( ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer ) {.ffi.} = - return ok(ctx.myLib[].waku.node.enr.toURI()) + let enrUri = (await ctx.myLib[].waku.myEnr()).valueOr: + return err(error) + return ok(enrUri) proc waku_get_my_peerid( ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer ) {.ffi.} = - return ok($ctx.myLib[].waku.node.peerId()) + let peerId = (await ctx.myLib[].waku.myPeerId()).valueOr: + return err(error) + return ok(peerId) proc waku_get_metrics( ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer ) {.ffi.} = - return ok(getMetrics()) + let m = (await ctx.myLib[].waku.metrics()).valueOr: + return err(error) + return ok(m) proc waku_is_online( ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer ) {.ffi.} = - return ok($ctx.myLib[].waku.healthMonitor.onlineMonitor.amIOnline()) + let online = (await ctx.myLib[].waku.isOnline()).valueOr: + return err(error) + return ok($online) diff --git a/library/kernel_api/discovery_api.nim b/library/kernel_api/discovery_api.nim index 158c3a925..98c83e42d 100644 --- a/library/kernel_api/discovery_api.nim +++ b/library/kernel_api/discovery_api.nim @@ -1,42 +1,6 @@ -import logos_delivery/waku/compat/option_valueor -import std/json -import chronos, chronicles, results, strutils, libp2p/multiaddress, ffi -import - logos_delivery/waku/waku, - logos_delivery/waku/discovery/waku_dnsdisc, - logos_delivery/waku/discovery/waku_discv5, - logos_delivery/waku/waku_core/peers, - logos_delivery/waku/waku_node, - library/declare_lib - -proc retrieveBootstrapNodes( - enrTreeUrl: string, ipDnsServer: string -): Future[Result[seq[string], string]] {.async.} = - let dnsNameServers = @[parseIpAddress(ipDnsServer)] - let discoveredPeers: seq[RemotePeerInfo] = ( - await retrieveDynamicBootstrapNodes(enrTreeUrl, dnsNameServers) - ).valueOr: - return err("failed discovering peers from DNS: " & $error) - - var multiAddresses = newSeq[string]() - - for discPeer in discoveredPeers: - for address in discPeer.addrs: - multiAddresses.add($address & "/p2p/" & $discPeer) - - return ok(multiAddresses) - -proc updateDiscv5BootstrapNodes(nodes: string, waku: Waku): Result[void, string] = - waku.wakuDiscv5.updateBootstrapRecords(nodes).isOkOr: - return err("error in updateDiscv5BootstrapNodes: " & $error) - return ok() - -proc performPeerExchangeRequestTo*( - numPeers: uint64, waku: Waku -): Future[Result[int, string]] {.async.} = - let numPeersRecv = (await waku.node.fetchPeerExchangePeers(numPeers)).valueOr: - return err($error) - return ok(numPeersRecv) +import std/strutils +import chronos, chronicles, results, ffi +import logos_delivery, library/declare_lib proc waku_discv5_update_bootnodes( ctx: ptr FFIContext[LogosDelivery], @@ -46,11 +10,9 @@ proc waku_discv5_update_bootnodes( ) {.ffi.} = ## Updates the bootnode list used for discovering new peers via DiscoveryV5 ## bootnodes - JSON array containing the bootnode ENRs i.e. `["enr:...", "enr:..."]` - - updateDiscv5BootstrapNodes($bootnodes, ctx.myLib[].waku).isOkOr: + (await ctx.myLib[].waku.discv5UpdateBootnodes($bootnodes)).isOkOr: error "UPDATE_DISCV5_BOOTSTRAP_NODES failed", error = error - return err($error) - + return err(error) return ok("discovery request processed correctly") proc waku_dns_discovery( @@ -61,26 +23,28 @@ proc waku_dns_discovery( nameDnsServer: cstring, timeoutMs: cint, ) {.ffi.} = - let nodes = (await retrieveBootstrapNodes($enrTreeUrl, $nameDnsServer)).valueOr: + let nodes = ( + await ctx.myLib[].waku.dnsDiscovery($enrTreeUrl, $nameDnsServer, int(timeoutMs)) + ).valueOr: error "GET_BOOTSTRAP_NODES failed", error = error - return err($error) - + return err(error) ## returns a comma-separated string of bootstrap nodes' multiaddresses return ok(nodes.join(",")) proc waku_start_discv5( ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer ) {.ffi.} = - (await ctx.myLib[].waku.wakuDiscv5.start()).isOkOr: + (await ctx.myLib[].waku.startDiscv5()).isOkOr: error "START_DISCV5 failed", error = error - return err("error starting discv5: " & $error) - + return err(error) return ok("discv5 started correctly") proc waku_stop_discv5( ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer ) {.ffi.} = - await ctx.myLib[].waku.wakuDiscv5.stop() + (await ctx.myLib[].waku.stopDiscv5()).isOkOr: + error "STOP_DISCV5 failed", error = error + return err(error) return ok("discv5 stopped correctly") proc waku_peer_exchange_request( @@ -89,8 +53,7 @@ proc waku_peer_exchange_request( userData: pointer, numPeers: uint64, ) {.ffi.} = - let numValidPeers = (await performPeerExchangeRequestTo(numPeers, ctx.myLib[].waku)).valueOr: + let numValidPeers = (await ctx.myLib[].waku.peerExchangeRequest(numPeers)).valueOr: error "waku_peer_exchange_request failed", error = error - return err("failed peer exchange: " & $error) - + return err(error) return ok($numValidPeers) diff --git a/library/kernel_api/peer_manager_api.nim b/library/kernel_api/peer_manager_api.nim index eeea2c63f..e14b8b2c9 100644 --- a/library/kernel_api/peer_manager_api.nim +++ b/library/kernel_api/peer_manager_api.nim @@ -1,11 +1,6 @@ -import logos_delivery/waku/compat/option_valueor -import std/[sequtils, strutils, tables] -import chronicles, chronos, results, options, json, ffi -import - logos_delivery/waku/waku, - logos_delivery/waku/node/waku_node, - logos_delivery/waku/node/peer_manager, - library/declare_lib +import std/[strutils, tables, json] +import chronicles, chronos, results, ffi +import logos_delivery, library/declare_lib type PeerInfo = object protocols: seq[string] @@ -15,11 +10,9 @@ proc waku_get_peerids_from_peerstore( ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer ) {.ffi.} = ## returns a comma-separated string of peerIDs - let peerIDs = ctx.myLib[].waku.node.peerManager.switch.peerStore - .peers() - .mapIt($it.peerId) - .join(",") - return ok(peerIDs) + let peerIds = (await ctx.myLib[].waku.peerIdsFromPeerstore()).valueOr: + return err(error) + return ok(peerIds.join(",")) proc waku_connect( ctx: ptr FFIContext[LogosDelivery], @@ -28,8 +21,9 @@ proc waku_connect( peerMultiAddr: cstring, timeoutMs: cuint, ) {.ffi.} = - let peers = ($peerMultiAddr).split(",").mapIt(strip(it)) - await ctx.myLib[].waku.node.connectToNodes(peers, source = "static") + let peers = ($peerMultiAddr).split(",") + (await ctx.myLib[].waku.connect(peers, uint32(timeoutMs))).isOkOr: + return err(error) return ok("") proc waku_disconnect_peer_by_id( @@ -38,16 +32,16 @@ proc waku_disconnect_peer_by_id( userData: pointer, peerId: cstring, ) {.ffi.} = - let pId = PeerId.init($peerId).valueOr: - error "DISCONNECT_PEER_BY_ID failed", error = $error - return err($error) - await ctx.myLib[].waku.node.peerManager.disconnectNode(pId) + (await ctx.myLib[].waku.disconnectPeerById($peerId)).isOkOr: + error "DISCONNECT_PEER_BY_ID failed", error = error + return err(error) return ok("") proc waku_disconnect_all_peers( ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer ) {.ffi.} = - await ctx.myLib[].waku.node.peerManager.disconnectAllPeers() + (await ctx.myLib[].waku.disconnectAllPeers()).isOkOr: + return err(error) return ok("") proc waku_dial_peer( @@ -58,14 +52,9 @@ proc waku_dial_peer( protocol: cstring, timeoutMs: cuint, ) {.ffi.} = - let remotePeerInfo = parsePeerInfo($peerMultiAddr).valueOr: - error "DIAL_PEER failed", error = $error - return err($error) - let conn = await ctx.myLib[].waku.node.peerManager.dialPeer(remotePeerInfo, $protocol) - if conn.isNone(): - let msg = "failed dialing peer" - error "DIAL_PEER failed", error = msg, peerId = $remotePeerInfo.peerId - return err(msg) + (await ctx.myLib[].waku.dialPeer($peerMultiAddr, $protocol, int(timeoutMs))).isOkOr: + error "DIAL_PEER failed", error = error + return err(error) return ok("") proc waku_dial_peer_by_id( @@ -76,47 +65,32 @@ proc waku_dial_peer_by_id( protocol: cstring, timeoutMs: cuint, ) {.ffi.} = - let pId = PeerId.init($peerId).valueOr: - error "DIAL_PEER_BY_ID failed", error = $error - return err($error) - let conn = await ctx.myLib[].waku.node.peerManager.dialPeer(pId, $protocol) - if conn.isNone(): - let msg = "failed dialing peer" - error "DIAL_PEER_BY_ID failed", error = msg, peerId = $peerId - return err(msg) - + (await ctx.myLib[].waku.dialPeerById($peerId, $protocol, int(timeoutMs))).isOkOr: + error "DIAL_PEER_BY_ID failed", error = error + return err(error) return ok("") proc waku_get_connected_peers_info( ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer ) {.ffi.} = ## returns a JSON string mapping peerIDs to objects with protocols and addresses + let peers = (await ctx.myLib[].waku.connectedPeersInfo()).valueOr: + return err(error) var peersMap = initTable[string, PeerInfo]() - let peers = ctx.myLib[].waku.node.peerManager.switch.peerStore.peers().filterIt( - it.connectedness == Connected - ) - - # Build a map of peer IDs to peer info objects for peer in peers: - let peerIdStr = $peer.peerId - peersMap[peerIdStr] = - PeerInfo(protocols: peer.protocols, addresses: peer.addrs.mapIt($it)) + peersMap[peer.peerId] = + PeerInfo(protocols: peer.protocols, addresses: peer.addresses) - # Convert the map to JSON string - let jsonObj = %*peersMap - let jsonStr = $jsonObj - return ok(jsonStr) + return ok($(%*peersMap)) proc waku_get_connected_peers( ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer ) {.ffi.} = ## returns a comma-separated string of peerIDs - let - (inPeerIds, outPeerIds) = ctx.myLib[].waku.node.peerManager.connectedPeers() - connectedPeerids = concat(inPeerIds, outPeerIds) - - return ok(connectedPeerids.mapIt($it).join(",")) + let peerIds = (await ctx.myLib[].waku.connectedPeers()).valueOr: + return err(error) + return ok(peerIds.join(",")) proc waku_get_peerids_by_protocol( ctx: ptr FFIContext[LogosDelivery], @@ -125,9 +99,6 @@ proc waku_get_peerids_by_protocol( protocol: cstring, ) {.ffi.} = ## returns a comma-separated string of peerIDs that mount the given protocol - let connectedPeers = ctx.myLib[].waku.node.peerManager.switch.peerStore - .peers($protocol) - .filterIt(it.connectedness == Connected) - .mapIt($it.peerId) - .join(",") - return ok(connectedPeers) + let peerIds = (await ctx.myLib[].waku.peerIdsByProtocol($protocol)).valueOr: + return err(error) + return ok(peerIds.join(",")) diff --git a/library/kernel_api/ping_api.nim b/library/kernel_api/ping_api.nim index e6ed69dd1..6570fffd5 100644 --- a/library/kernel_api/ping_api.nim +++ b/library/kernel_api/ping_api.nim @@ -1,7 +1,5 @@ -import std/[json, strutils] import chronos, results, ffi -import libp2p/[protocols/ping, switch, multiaddress, multicodec] -import logos_delivery/waku/[waku, waku_core/peers, node/waku_node], library/declare_lib +import logos_delivery, library/declare_lib proc waku_ping_peer( ctx: ptr FFIContext[LogosDelivery], @@ -10,35 +8,6 @@ proc waku_ping_peer( peerAddr: cstring, timeoutMs: cuint, ) {.ffi.} = - let peerInfo = peers.parsePeerInfo(($peerAddr).split(",")).valueOr: - return err("PingRequest failed to parse peer addr: " & $error) - - let timeout = chronos.milliseconds(timeoutMs) - proc ping(): Future[Result[Duration, string]] {.async, gcsafe.} = - try: - let conn = await ctx.myLib[].waku.node.switch.dial( - peerInfo.peerId, peerInfo.addrs, PingCodec - ) - defer: - await conn.close() - - let pingRTT = await ctx.myLib[].waku.node.libp2pPing.ping(conn) - if pingRTT == 0.nanos: - return err("could not ping peer: rtt-0") - return ok(pingRTT) - except CatchableError as exc: - return err("could not ping peer: " & exc.msg) - - let pingFuture = ping() - let pingRTT: Duration = - if timeout == chronos.milliseconds(0): # No timeout expected - (await pingFuture).valueOr: - return err("ping failed, no timeout expected: " & error) - else: - let timedOut = not (await pingFuture.withTimeout(timeout)) - if timedOut: - return err("ping timed out") - pingFuture.read().valueOr: - return err("failed to read ping future: " & error) - - return ok($(pingRTT.nanos)) + let rttNanos = (await ctx.myLib[].waku.pingPeer($peerAddr, int(timeoutMs))).valueOr: + return err(error) + return ok($rttNanos) diff --git a/library/kernel_api/protocols/filter_api.nim b/library/kernel_api/protocols/filter_api.nim index a070bd2c7..cd613c1e0 100644 --- a/library/kernel_api/protocols/filter_api.nim +++ b/library/kernel_api/protocols/filter_api.nim @@ -1,29 +1,14 @@ -import logos_delivery/waku/compat/option_valueor -import options, std/[strutils, sequtils] +import std/[strutils, sequtils] import chronicles, chronos, results, ffi import - logos_delivery/waku/waku_filter_v2/client, + logos_delivery, logos_delivery/waku/waku_core/message/message, - logos_delivery/waku/waku, - logos_delivery/waku/waku_relay, - logos_delivery/waku/waku_filter_v2/common, logos_delivery/waku/waku_core/subscription/push_handler, - logos_delivery/waku/node/peer_manager/peer_manager, - logos_delivery/waku/waku_node, logos_delivery/waku/waku_core/topics/pubsub_topic, logos_delivery/waku/waku_core/topics/content_topic, library/events/json_message_event, library/declare_lib -const FilterOpTimeout = 5.seconds - -proc checkFilterClientMounted(waku: Waku): Result[string, string] = - if waku.node.wakuFilterClient.isNil(): - let errorMsg = "wakuFilterClient is not mounted" - error "fail filter process", error = errorMsg - return err(errorMsg) - return ok("") - proc waku_filter_subscribe( ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, @@ -31,33 +16,20 @@ proc waku_filter_subscribe( pubSubTopic: cstring, contentTopics: cstring, ) {.ffi.} = - proc onReceivedMessage(ctx: ptr FFIContext): WakuRelayHandler = + proc onReceivedMessage(ctx: ptr FFIContext[LogosDelivery]): FilterPushHandler = return proc(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async.} = callEventCallback(ctx, "onReceivedMessage"): $JsonMessageEvent.new(pubsubTopic, msg) - checkFilterClientMounted(ctx.myLib[].waku).isOkOr: - return err($error) - - var filterPushEventCallback = FilterPushHandler(onReceivedMessage(ctx)) - ctx.myLib[].waku.node.wakuFilterClient.registerPushHandler(filterPushEventCallback) - - let peer = ctx.myLib[].waku.node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr: - let errorMsg = "could not find peer with WakuFilterSubscribeCodec when subscribing" - error "fail filter subscribe", error = errorMsg - return err(errorMsg) - - let subFut = ctx.myLib[].waku.node.filterSubscribe( - some(PubsubTopic($pubsubTopic)), - ($contentTopics).split(",").mapIt(ContentTopic(it)), - peer, - ) - if not await subFut.withTimeout(FilterOpTimeout): - let errorMsg = "filter subscription timed out" - error "fail filter unsubscribe", error = errorMsg - - return err(errorMsg) - + ( + await ctx.myLib[].waku.filterSubscribe( + PubsubTopic($pubSubTopic), + ($contentTopics).split(",").mapIt(ContentTopic(it)), + FilterPushHandler(onReceivedMessage(ctx)), + ) + ).isOkOr: + error "fail filter subscribe", error = error + return err(error) return ok("") proc waku_filter_unsubscribe( @@ -67,43 +39,19 @@ proc waku_filter_unsubscribe( pubSubTopic: cstring, contentTopics: cstring, ) {.ffi.} = - checkFilterClientMounted(ctx.myLib[].waku).isOkOr: - return err($error) - - let peer = ctx.myLib[].waku.node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr: - let errorMsg = - "could not find peer with WakuFilterSubscribeCodec when unsubscribing" - error "fail filter process", error = errorMsg - return err(errorMsg) - - let subFut = ctx.myLib[].waku.node.filterUnsubscribe( - some(PubsubTopic($pubsubTopic)), - ($contentTopics).split(",").mapIt(ContentTopic(it)), - peer, - ) - if not await subFut.withTimeout(FilterOpTimeout): - let errorMsg = "filter un-subscription timed out" - error "fail filter unsubscribe", error = errorMsg - return err(errorMsg) + ( + await ctx.myLib[].waku.filterUnsubscribe( + PubsubTopic($pubSubTopic), ($contentTopics).split(",").mapIt(ContentTopic(it)) + ) + ).isOkOr: + error "fail filter unsubscribe", error = error + return err(error) return ok("") proc waku_filter_unsubscribe_all( ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer ) {.ffi.} = - checkFilterClientMounted(ctx.myLib[].waku).isOkOr: - return err($error) - - let peer = ctx.myLib[].waku.node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr: - let errorMsg = - "could not find peer with WakuFilterSubscribeCodec when unsubscribing all" - error "fail filter unsubscribe all", error = errorMsg - return err(errorMsg) - - let unsubFut = ctx.myLib[].waku.node.filterUnsubscribeAll(peer) - - if not await unsubFut.withTimeout(FilterOpTimeout): - let errorMsg = "filter un-subscription all timed out" - error "fail filter unsubscribe all", error = errorMsg - - return err(errorMsg) + (await ctx.myLib[].waku.filterUnsubscribeAll()).isOkOr: + error "fail filter unsubscribe all", error = error + return err(error) return ok("") diff --git a/library/kernel_api/protocols/lightpush_api.nim b/library/kernel_api/protocols/lightpush_api.nim index eb0d1de09..c7bc8169d 100644 --- a/library/kernel_api/protocols/lightpush_api.nim +++ b/library/kernel_api/protocols/lightpush_api.nim @@ -1,14 +1,9 @@ -import logos_delivery/waku/compat/option_valueor -import options, std/[json, strformat] +import std/[json, strformat] import chronicles, chronos, results, ffi import - logos_delivery/waku/waku_core/message/message, - logos_delivery/waku/waku_core/codecs, - logos_delivery/waku/waku, + logos_delivery, logos_delivery/waku/waku_core/message, logos_delivery/waku/waku_core/topics/pubsub_topic, - logos_delivery/waku/waku_lightpush_legacy/client, - logos_delivery/waku/node/peer_manager/peer_manager, library/events/json_message_event, library/declare_lib @@ -19,11 +14,6 @@ proc waku_lightpush_publish( pubSubTopic: cstring, jsonWakuMessage: cstring, ) {.ffi.} = - if ctx.myLib[].waku.node.wakuLightpushClient.isNil(): - let errorMsg = "LightpushRequest waku.node.wakuLightpushClient is nil" - error "PUBLISH failed", error = errorMsg - return err(errorMsg) - var jsonMessage: JsonMessage try: let jsonContent = parseJson($jsonWakuMessage) @@ -35,18 +25,10 @@ proc waku_lightpush_publish( let msg = json_message_event.toWakuMessage(jsonMessage).valueOr: return err("Problem building the WakuMessage: " & $error) - let peerOpt = ctx.myLib[].waku.node.peerManager.selectPeer(WakuLightPushCodec) - if peerOpt.isNone(): - let errorMsg = "failed to lightpublish message, no suitable remote peers" - error "PUBLISH failed", error = errorMsg - return err(errorMsg) - let msgHashHex = ( - await ctx.myLib[].waku.node.wakuLegacyLightpushClient.publish( - $pubsubTopic, msg, peer = peerOpt.get() - ) + await ctx.myLib[].waku.lightpushPublish(PubsubTopic($pubSubTopic), msg) ).valueOr: error "PUBLISH failed", error = error - return err($error) + return err(error) return ok(msgHashHex) diff --git a/library/kernel_api/protocols/relay_api.nim b/library/kernel_api/protocols/relay_api.nim index d580597ae..528ab7ba2 100644 --- a/library/kernel_api/protocols/relay_api.nim +++ b/library/kernel_api/protocols/relay_api.nim @@ -1,17 +1,10 @@ -import logos_delivery/waku/compat/option_valueor -import std/[net, sequtils, strutils, json], strformat -import chronicles, chronos, stew/byteutils, results, ffi +import std/[strutils, json] +import chronicles, chronos, results, ffi import - logos_delivery/waku/waku_core/message/message, - logos_delivery/waku/factory/validator_signed, - logos_delivery/waku/waku, - tools/confutils/cli_args, - logos_delivery/waku/waku_core/message, + logos_delivery, logos_delivery/waku/waku_core/topics/pubsub_topic, - logos_delivery/waku/waku_core/topics, - logos_delivery/waku/node/waku_node/relay, + logos_delivery/waku/waku_core/message, logos_delivery/waku/waku_relay/protocol, - logos_delivery/waku/node/peer_manager, library/events/json_message_event, library/declare_lib @@ -21,11 +14,11 @@ proc waku_relay_get_peers_in_mesh( userData: pointer, pubSubTopic: cstring, ) {.ffi.} = - let meshPeers = ctx.myLib[].waku.node.wakuRelay.getPeersInMesh($pubsubTopic).valueOr: + let peers = (await ctx.myLib[].waku.relayPeersInMesh(PubsubTopic($pubSubTopic))).valueOr: error "LIST_MESH_PEERS failed", error = error - return err($error) + return err(error) ## returns a comma-separated string of peerIDs - return ok(meshPeers.mapIt($it).join(",")) + return ok(peers.join(",")) proc waku_relay_get_num_peers_in_mesh( ctx: ptr FFIContext[LogosDelivery], @@ -33,10 +26,10 @@ proc waku_relay_get_num_peers_in_mesh( userData: pointer, pubSubTopic: cstring, ) {.ffi.} = - let numPeersInMesh = ctx.myLib[].waku.node.wakuRelay.getNumPeersInMesh($pubsubTopic).valueOr: + let n = (await ctx.myLib[].waku.relayNumPeersInMesh(PubsubTopic($pubSubTopic))).valueOr: error "NUM_MESH_PEERS failed", error = error - return err($error) - return ok($numPeersInMesh) + return err(error) + return ok($n) proc waku_relay_get_connected_peers( ctx: ptr FFIContext[LogosDelivery], @@ -45,11 +38,10 @@ proc waku_relay_get_connected_peers( pubSubTopic: cstring, ) {.ffi.} = ## Returns the list of all connected peers to an specific pubsub topic - let connPeers = ctx.myLib[].waku.node.wakuRelay.getConnectedPeers($pubsubTopic).valueOr: + let peers = (await ctx.myLib[].waku.relayConnectedPeers(PubsubTopic($pubSubTopic))).valueOr: error "LIST_CONNECTED_PEERS failed", error = error - return err($error) - ## returns a comma-separated string of peerIDs - return ok(connPeers.mapIt($it).join(",")) + return err(error) + return ok(peers.join(",")) proc waku_relay_get_num_connected_peers( ctx: ptr FFIContext[LogosDelivery], @@ -57,10 +49,10 @@ proc waku_relay_get_num_connected_peers( userData: pointer, pubSubTopic: cstring, ) {.ffi.} = - let numConnPeers = ctx.myLib[].waku.node.wakuRelay.getNumConnectedPeers($pubsubTopic).valueOr: + let n = (await ctx.myLib[].waku.relayNumConnectedPeers(PubsubTopic($pubSubTopic))).valueOr: error "NUM_CONNECTED_PEERS failed", error = error - return err($error) - return ok($numConnPeers) + return err(error) + return ok($n) proc waku_relay_add_protected_shard( ctx: ptr FFIContext[LogosDelivery], @@ -71,15 +63,12 @@ proc waku_relay_add_protected_shard( publicKey: cstring, ) {.ffi.} = ## Protects a shard with a public key - try: - let relayShard = RelayShard(clusterId: uint16(clusterId), shardId: uint16(shardId)) - let protectedShard = ProtectedShard.parseCmdArg($relayShard & ":" & $publicKey) - ctx.myLib[].waku.node.wakuRelay.addSignedShardsValidator( - @[protectedShard], uint16(clusterId) + ( + await ctx.myLib[].waku.relayAddProtectedShard( + uint16(clusterId), uint16(shardId), $publicKey ) - except ValueError as exc: - return err("ERROR in waku_relay_add_protected_shard: " & exc.msg) - + ).isOkOr: + return err(error) return ok("") proc waku_relay_subscribe( @@ -88,20 +77,18 @@ proc waku_relay_subscribe( userData: pointer, pubSubTopic: cstring, ) {.ffi.} = - echo "Subscribing to topic: " & $pubSubTopic & " ..." proc onReceivedMessage(ctx: ptr FFIContext[LogosDelivery]): WakuRelayHandler = return proc(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async.} = callEventCallback(ctx, "onReceivedMessage"): $JsonMessageEvent.new(pubsubTopic, msg) - var cb = onReceivedMessage(ctx) - - ctx.myLib[].waku.node.subscribe( - (kind: SubscriptionKind.PubsubSub, topic: $pubsubTopic), - handler = WakuRelayHandler(cb), + ( + await ctx.myLib[].waku.relaySubscribe( + PubsubTopic($pubSubTopic), WakuRelayHandler(onReceivedMessage(ctx)) + ) ).isOkOr: error "SUBSCRIBE failed", error = error - return err($error) + return err(error) return ok("") proc waku_relay_unsubscribe( @@ -110,12 +97,9 @@ proc waku_relay_unsubscribe( userData: pointer, pubSubTopic: cstring, ) {.ffi.} = - ctx.myLib[].waku.node.unsubscribe( - (kind: SubscriptionKind.PubsubSub, topic: $pubsubTopic) - ).isOkOr: + (await ctx.myLib[].waku.relayUnsubscribe(PubsubTopic($pubSubTopic))).isOkOr: error "UNSUBSCRIBE failed", error = error - return err($error) - + return err(error) return ok("") proc waku_relay_publish( @@ -126,31 +110,30 @@ proc waku_relay_publish( jsonWakuMessage: cstring, timeoutMs: cuint, ) {.ffi.} = - var - # https://rfc.vac.dev/spec/36/#extern-char-waku_relay_publishchar-messagejson-char-pubsubtopic-int-timeoutms - jsonMessage: JsonMessage + var jsonMessage: JsonMessage try: let jsonContent = parseJson($jsonWakuMessage) jsonMessage = JsonMessage.fromJsonNode(jsonContent).valueOr: raise newException(JsonParsingError, $error) except JsonParsingError as exc: - return err(fmt"Error parsing json message: {exc.msg}") + return err("Error parsing json message: " & exc.msg) let msg = json_message_event.toWakuMessage(jsonMessage).valueOr: return err("Problem building the WakuMessage: " & $error) - (await ctx.myLib[].waku.node.wakuRelay.publish($pubsubTopic, msg)).isOkOr: + let msgHash = ( + await ctx.myLib[].waku.relayPublish(PubsubTopic($pubSubTopic), msg, uint32(timeoutMs)) + ).valueOr: error "PUBLISH failed", error = error - return err($error) - - let msgHash = computeMessageHash($pubSubTopic, msg).to0xHex + return err(error) return ok(msgHash) proc waku_default_pubsub_topic( ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer ) {.ffi.} = - # https://rfc.vac.dev/spec/36/#extern-char-waku_default_pubsub_topic - return ok(DefaultPubsubTopic) + let topic = (await ctx.myLib[].waku.defaultPubsubTopic()).valueOr: + return err(error) + return ok(string(topic)) proc waku_content_topic( ctx: ptr FFIContext[LogosDelivery], @@ -161,9 +144,13 @@ proc waku_content_topic( contentTopicName: cstring, encoding: cstring, ) {.ffi.} = - # https://rfc.vac.dev/spec/36/#extern-char-waku_content_topicchar-applicationname-unsigned-int-applicationversion-char-contenttopicname-char-encoding - - return ok(fmt"/{$appName}/{$appVersion}/{$contentTopicName}/{$encoding}") + let topic = ( + await ctx.myLib[].waku.buildContentTopic( + $appName, uint32(appVersion), $contentTopicName, $encoding + ) + ).valueOr: + return err(error) + return ok(string(topic)) proc waku_pubsub_topic( ctx: ptr FFIContext[LogosDelivery], @@ -171,5 +158,6 @@ proc waku_pubsub_topic( userData: pointer, topicName: cstring, ) {.ffi.} = - # https://rfc.vac.dev/spec/36/#extern-char-waku_pubsub_topicchar-name-char-encoding - return ok(fmt"/waku/2/{$topicName}") + let topic = (await ctx.myLib[].waku.buildPubsubTopic($topicName)).valueOr: + return err(error) + return ok(string(topic)) diff --git a/library/kernel_api/protocols/store_api.nim b/library/kernel_api/protocols/store_api.nim index c6356bf6a..75d43fee1 100644 --- a/library/kernel_api/protocols/store_api.nim +++ b/library/kernel_api/protocols/store_api.nim @@ -1,13 +1,10 @@ -import logos_delivery/waku/compat/option_valueor -import std/[json, sugar, strutils, options] -import chronos, chronicles, results, stew/byteutils, ffi +import std/[json, sugar, options] +import chronos, chronicles, results, ffi import - logos_delivery/waku/waku, + logos_delivery, library/utils, - logos_delivery/waku/waku_core/peers, logos_delivery/waku/waku_core/message/digest, logos_delivery/waku/waku_store/common, - logos_delivery/waku/waku_store/client, logos_delivery/waku/common/paging, library/declare_lib @@ -83,13 +80,10 @@ proc waku_store_query( let storeQueryRequest = ?fromJsonNode(jsonContentRes.get()) - let peer = peers.parsePeerInfo(($peerAddr).split(",")).valueOr: - return err("StoreRequest failed to parse peer addr: " & $error) - let queryResponse = ( - await ctx.myLib[].waku.node.wakuStoreClient.query(storeQueryRequest, peer) + await ctx.myLib[].waku.storeQuery(storeQueryRequest, $peerAddr, int(timeoutMs)) ).valueOr: - return err("StoreRequest failed store query: " & $error) + return err("StoreRequest failed store query: " & error) let res = $(%*(queryResponse.toHex())) return ok(res) ## returning the response in json format diff --git a/library/logos_delivery_api/node_api.nim b/library/logos_delivery_api/node_api.nim index 354ab3a98..0a5436bc8 100644 --- a/library/logos_delivery_api/node_api.nim +++ b/library/logos_delivery_api/node_api.nim @@ -3,9 +3,8 @@ import chronos, chronicles, results, ffi import logos_delivery, logos_delivery/waku/node/waku_node, - logos_delivery/waku/events/message_events, logos_delivery/api/types, - logos_delivery/waku/events/[message_events, health_events], + logos_delivery/waku/api/events/health_events, tools/confutils/conf_from_json, ../declare_lib, ../json_event diff --git a/logos_delivery/channels/api/channel_lifecycle.nim b/logos_delivery/channels/api/channel_lifecycle.nim new file mode 100644 index 000000000..b50ba1b63 --- /dev/null +++ b/logos_delivery/channels/api/channel_lifecycle.nim @@ -0,0 +1,89 @@ +## Reliable Channel layer API — channel lifecycle +## (createReliableChannel / closeChannel). +import std/[options, tables] +import results, chronos, chronicles + +import logos_delivery/api/types +import logos_delivery/channels/reliable_channel_manager +import logos_delivery/channels/reliable_channel +import logos_delivery/waku/persistency/sds_persistency + +# ReliableChannel, SendHandler, config and wire-version markers. +export reliable_channel + +const SdsJobId = "sds" + ## One persistency job shared by every channel's SDS state; rows are + ## keyed by channelId. + +proc sdsPersistence(): Option[Persistence] = + ## SDS backend from the Persistency singleton; memory-only fallback when + ## it is unavailable (e.g. unit tests). + let p = Persistency.instance().valueOr: + info "SDS persistence disabled, running memory-only", reason = $error + return none(Persistence) + let job = p.openJob(SdsJobId).valueOr: + warn "SDS persistence disabled, could not open persistency job", + jobId = SdsJobId, reason = $error + return none(Persistence) + return some(newSdsPersistence(job)) + +proc createReliableChannel*( + self: ReliableChannelManager, + channelId: ChannelId, + contentTopic: ContentTopic, + senderId: SdsParticipantID, + sendHandler: SendHandler = nil, +): Result[ChannelId, string] = + ## Spec entry point. The `sendHandler` and `rng` the channel needs are + ## sourced from the owning `ReliableChannelManager` rather than passed + ## per call. Encryption is wired up through the `Encrypt`/`Decrypt` + ## request brokers — the application installs its own providers + ## (or `setNoopEncryption()`) before traffic flows. + ## + ## `sendHandler` defaults to the manager's default (constructed at mount + ## from `MessagingClient.send`); tests pass a fake to bypass the network. + if self.channels.hasKey(channelId): + return err("channel already exists: " & channelId) + + let segConfig = SegmentationConfig( + segmentSizeBytes: DefaultSegmentSizeBytes, + enableReedSolomon: false, + persistence: nil, + ) + let sdsConfig = SdsConfig( + acknowledgementTimeoutMs: DefaultAcknowledgementTimeoutMs, + maxRetransmissions: DefaultMaxRetransmissions, + causalHistorySize: DefaultCausalHistorySize, + persistence: sdsPersistence(), + ) + let rateConfig = RateLimitConfig( + epochPeriodSec: DefaultEpochPeriodSec, messagesPerEpoch: DefaultMessagesPerEpoch + ) + + let effectiveSendHandler = if sendHandler.isNil(): self.sendHandler else: sendHandler + + let chn = ReliableChannel.new( + sendHandler = effectiveSendHandler, + channelId = channelId, + contentTopic = contentTopic, + senderId = senderId, + segConfig = segConfig, + sdsConfig = sdsConfig, + rateConfig = rateConfig, + brokerCtx = self.brokerCtx, + ) + + self.channels[channelId] = chn + return ok(channelId) + +proc closeChannel*( + self: ReliableChannelManager, channelId: ChannelId +): Future[Result[void, string]] {.async: (raises: []).} = + ## Stops the channel's SDS loops and releases the channel. Persisted SDS + ## state survives, so re-creating the channel restores it. + let chn = self.channels.getOrDefault(channelId) + if chn.isNil(): + return err("unknown channel: " & channelId) + self.channels.del(channelId) + await chn.stop() + return ok() diff --git a/logos_delivery/channels/events.nim b/logos_delivery/channels/api/events.nim similarity index 80% rename from logos_delivery/channels/events.nim rename to logos_delivery/channels/api/events.nim index 5f69095e4..685059053 100644 --- a/logos_delivery/channels/events.nim +++ b/logos_delivery/channels/api/events.nim @@ -1,20 +1,20 @@ -## Reliable Channel event types emitted to API consumers. +## Reliable Channel layer API — event surface. ## ## Lifecycle events for individual segments (sent / propagated / errored) ## are the same as the network-level ones the MessagingClient already ## emits — `requestId` is shared across layers — so we just re-export -## `waku/events/message_events` and avoid declaring duplicates. +## `messaging/api/events` and avoid declaring duplicates. ## ## Only the channel-level `MessageReceivedEvent` carries data that has ## no analogue in the lower layer (reassembled application payload, ## senderId, channelId), so it lives here. -import logos_delivery/waku/events/message_events as waku_message_events +import logos_delivery/messaging/api/events as messaging_events import brokers/event_broker -import ./types as channel_types +import logos_delivery/channels/types as channel_types -export waku_message_events, channel_types, event_broker +export messaging_events, channel_types, event_broker EventBroker: type ChannelMessageReceivedEvent* = object diff --git a/logos_delivery/channels/api/send.nim b/logos_delivery/channels/api/send.nim new file mode 100644 index 000000000..23c41ac13 --- /dev/null +++ b/logos_delivery/channels/api/send.nim @@ -0,0 +1,21 @@ +## Reliable Channel layer API — channel send operation. +import std/tables +import results, chronos + +import logos_delivery/api/types +import logos_delivery/channels/reliable_channel_manager +import logos_delivery/channels/reliable_channel + +proc send*( + self: ReliableChannelManager, + channelId: ChannelId, + appPayload: seq[byte], + ephemeral: bool = false, +): Future[Result[RequestId, string]] {.async: (raises: []).} = + ## Spec-level entry point. Looks the channel up by id and delegates + ## to `ReliableChannel.send`, which exposes the visible pipeline + ## segmentation -> sds -> rate_limit_manager -> encryption. + let chn = self.channels.getOrDefault(channelId) + if chn.isNil(): + return err("unknown channel: " & channelId) + return await chn.send(appPayload, ephemeral) diff --git a/logos_delivery/channels/reliable_channel.nim b/logos_delivery/channels/reliable_channel.nim index 307dc17a4..39cea259e 100644 --- a/logos_delivery/channels/reliable_channel.nim +++ b/logos_delivery/channels/reliable_channel.nim @@ -25,7 +25,7 @@ import logos_delivery/api/types import logos_delivery/messaging/delivery_service/send_service import logos_delivery/waku/waku_core/topics -import ./events +import logos_delivery/channels/api/events import ./segmentation/segmentation import ./scalable_data_sync/scalable_data_sync import ./rate_limit_manager/rate_limit_manager diff --git a/logos_delivery/channels/reliable_channel_manager.nim b/logos_delivery/channels/reliable_channel_manager.nim index 29feab0b9..3eafe880e 100644 --- a/logos_delivery/channels/reliable_channel_manager.nim +++ b/logos_delivery/channels/reliable_channel_manager.nim @@ -13,21 +13,14 @@ import stew/byteutils import brokers/broker_context -import logos_delivery/waku/events/message_events as waku_message_events import logos_delivery/messaging/messaging_client +import logos_delivery/messaging/api/send import logos_delivery/api/types -import logos_delivery/waku/waku_core/topics -import logos_delivery/waku/persistency/sds_persistency import ./reliable_channel -import ./encryption/noop_encryption export reliable_channel -const SdsJobId = "sds" - ## One persistency job shared by every channel's SDS state; rows are - ## keyed by channelId. - type ReliableChannelManagerConf* = object ## Per-layer config object for the reliable @@ -35,13 +28,13 @@ type ## will move here in a follow-up PR); kept so each layer owns its own config. ReliableChannelManager* = ref object - channels: Table[ChannelId, ReliableChannel] + channels*: Table[ChannelId, ReliableChannel] ## read by `channels/api.nim` messagingClient: MessagingClient ## The channel layer chains onto messaging. - sendHandler: SendHandler + sendHandler*: SendHandler ## Default egress dispatch for channels created through this manager. ## Built in `new` as a closure over `MessagingClient.send` so the channel ## layer itself stays callable-only. - brokerCtx: BrokerContext + brokerCtx*: BrokerContext proc new*( T: type ReliableChannelManager, @@ -82,96 +75,6 @@ proc stop*(self: ReliableChannelManager) {.async.} = await chn.stop() self.channels.clear() -proc sdsPersistence(): Option[Persistence] = - ## SDS backend from the Persistency singleton; memory-only fallback when - ## it is unavailable (e.g. unit tests). - let p = Persistency.instance().valueOr: - info "SDS persistence disabled, running memory-only", reason = $error - return none(Persistence) - let job = p.openJob(SdsJobId).valueOr: - warn "SDS persistence disabled, could not open persistency job", - jobId = SdsJobId, reason = $error - return none(Persistence) - return some(newSdsPersistence(job)) - -proc createReliableChannel*( - self: ReliableChannelManager, - channelId: ChannelId, - contentTopic: ContentTopic, - senderId: SdsParticipantID, - sendHandler: SendHandler = nil, -): Result[ChannelId, string] = - ## Spec entry point. The `sendHandler` and `rng` the channel needs are - ## sourced from the owning `ReliableChannelManager` rather than passed - ## per call. Encryption is wired up through the `Encrypt`/`Decrypt` - ## request brokers — the application installs its own providers - ## (or `setNoopEncryption()`) before traffic flows. - ## - ## Segmentation, SDS and rate-limit configs will eventually be read - ## from the node's `NodeConfig`. Defaults for now. - ## - ## `sendHandler` defaults to the manager's default (constructed at mount - ## from `MessagingClient.send`); tests pass a fake to bypass the network. - if self.channels.hasKey(channelId): - return err("channel already exists: " & channelId) - - let segConfig = SegmentationConfig( - segmentSizeBytes: DefaultSegmentSizeBytes, - enableReedSolomon: false, - persistence: nil, - ) - let sdsConfig = SdsConfig( - acknowledgementTimeoutMs: DefaultAcknowledgementTimeoutMs, - maxRetransmissions: DefaultMaxRetransmissions, - causalHistorySize: DefaultCausalHistorySize, - persistence: sdsPersistence(), - ) - let rateConfig = RateLimitConfig( - epochPeriodSec: DefaultEpochPeriodSec, messagesPerEpoch: DefaultMessagesPerEpoch - ) - - let effectiveSendHandler = if sendHandler.isNil(): self.sendHandler else: sendHandler - - let chn = ReliableChannel.new( - sendHandler = effectiveSendHandler, - channelId = channelId, - contentTopic = contentTopic, - senderId = senderId, - segConfig = segConfig, - sdsConfig = sdsConfig, - rateConfig = rateConfig, - brokerCtx = self.brokerCtx, - ) - - self.channels[channelId] = chn - return ok(channelId) - -proc closeChannel*( - self: ReliableChannelManager, channelId: ChannelId -): Future[Result[void, string]] {.async: (raises: []).} = - ## Stops the channel's SDS loops and releases the channel. Persisted SDS - ## state survives, so re-creating the channel restores it. - let chn = self.channels.getOrDefault(channelId) - if chn.isNil(): - return err("unknown channel: " & channelId) - self.channels.del(channelId) - await chn.stop() - return ok() - -proc send*( - self: ReliableChannelManager, - channelId: ChannelId, - appPayload: seq[byte], - ephemeral: bool = false, -): Future[Result[RequestId, string]] {.async: (raises: []).} = - ## Spec-level entry point. Looks the channel up by id and delegates - ## to `ReliableChannel.send`, which exposes the visible pipeline - ## segmentation -> sds -> rate_limit_manager -> encryption. - let chn = self.channels.getOrDefault(channelId) - if chn.isNil(): - return err("unknown channel: " & channelId) - return await chn.send(appPayload, ephemeral) - ## Inbound messages are not handed to the manager by direct call. Each ## `ReliableChannel` installs its own `MessageReceivedEvent` listener ## in `ReliableChannel.new`, filters by spec marker and `contentTopic`, diff --git a/logos_delivery/logos_delivery.nim b/logos_delivery/logos_delivery.nim index f61202b9a..8f22b397f 100644 --- a/logos_delivery/logos_delivery.nim +++ b/logos_delivery/logos_delivery.nim @@ -11,12 +11,42 @@ import results, chronos, chronicles +# Each layer has a core module (type + new/start/stop) and an api/ folder whose +# modules each implement a differentiated set of operations, plus an events +# surface. The concentrator re-exports them so library consumers get the full +# surface from `import logos_delivery`. (The per-layer `events` modules share a +# stem, so they are imported under aliases.) + +# Waku layer import logos_delivery/waku/waku export waku +import + logos_delivery/waku/api/[ + topics, relay, filter, lightpush, store, peer_manager, discovery, debug, health, + ping, + ] +export + topics, relay, filter, lightpush, store, peer_manager, discovery, debug, health, ping +import logos_delivery/waku/api/events/[message_events, health_events] +export message_events, health_events + +# Messaging layer import logos_delivery/messaging/messaging_client export messaging_client +import logos_delivery/messaging/api/[subscription, send] +export subscription, send +import logos_delivery/messaging/api/events as messaging_api_events +export messaging_api_events + +# Reliable Channel layer import logos_delivery/channels/reliable_channel_manager export reliable_channel_manager +import logos_delivery/channels/api/channel_lifecycle +export channel_lifecycle +import logos_delivery/channels/api/send as channel_send +export channel_send +import logos_delivery/channels/api/events as channels_api_events +export channels_api_events import logos_delivery/waku/factory/waku_conf import logos_delivery/waku/factory/app_callbacks diff --git a/logos_delivery/waku/events/message_events.nim b/logos_delivery/messaging/api/events.nim similarity index 73% rename from logos_delivery/waku/events/message_events.nim rename to logos_delivery/messaging/api/events.nim index 2e4bece80..89ec704b9 100644 --- a/logos_delivery/waku/events/message_events.nim +++ b/logos_delivery/messaging/api/events.nim @@ -1,7 +1,8 @@ +## Messaging layer API — event surface (messaging-level message events). import brokers/event_broker import logos_delivery/api/types -import logos_delivery/waku/[waku_core/message, waku_core/topics] -export types +import logos_delivery/waku/waku_core/message +export event_broker, types EventBroker: # Event emitted when a message is sent to the network @@ -27,9 +28,3 @@ EventBroker: type MessageReceivedEvent* = object messageHash*: string message*: WakuMessage - -EventBroker: - # Internal event emitted when a message arrives from the network via any protocol - type MessageSeenEvent* = object - topic*: PubsubTopic - message*: WakuMessage diff --git a/logos_delivery/messaging/api/send.nim b/logos_delivery/messaging/api/send.nim new file mode 100644 index 000000000..cc6312471 --- /dev/null +++ b/logos_delivery/messaging/api/send.nim @@ -0,0 +1,34 @@ +## Messaging layer API — send operation. +import results, chronos, chronicles + +import logos_delivery/api/types +import logos_delivery/messaging/messaging_client +import logos_delivery/waku/node/[waku_node, subscription_manager] +import logos_delivery/messaging/delivery_service/send_service +import logos_delivery/messaging/delivery_service/send_service/delivery_task + +proc send*( + self: MessagingClient, envelope: MessageEnvelope +): Future[Result[RequestId, string]] {.async.} = + ## High-level messaging API send. Auto-subscribes to the content topic + ## (so the local node sees its own gossipsub broadcast), builds a + ## `DeliveryTask`, and hands it to the send service. Returns the request + ## id the caller can correlate with `MessageSentEvent` / `MessageErrorEvent`. + ?self.checkApiAvailability() + + let isSubbed = + self.node.subscriptionManager.isSubscribed(envelope.contentTopic).valueOr(false) + if not isSubbed: + info "Auto-subscribing to topic on send", contentTopic = envelope.contentTopic + self.node.subscriptionManager.subscribe(envelope.contentTopic).isOkOr: + warn "Failed to auto-subscribe", error = error + return err("Failed to auto-subscribe before sending: " & error) + + let requestId = RequestId.new(self.node.rng) + + let deliveryTask = DeliveryTask.new(requestId, envelope, self.node.brokerCtx).valueOr: + return err("MessagingClient.send: Failed to create delivery task: " & error) + + asyncSpawn self.sendService.send(deliveryTask) + + return ok(requestId) diff --git a/logos_delivery/messaging/api/subscription.nim b/logos_delivery/messaging/api/subscription.nim new file mode 100644 index 000000000..35b8c7e53 --- /dev/null +++ b/logos_delivery/messaging/api/subscription.nim @@ -0,0 +1,18 @@ +## Messaging layer API — subscription operations. +import results, chronos + +import logos_delivery/api/types +import logos_delivery/messaging/messaging_client +import logos_delivery/waku/node/[waku_node, subscription_manager] + +proc subscribe*( + self: MessagingClient, contentTopic: ContentTopic +): Future[Result[void, string]] {.async.} = + ?self.checkApiAvailability() + return self.node.subscriptionManager.subscribe(contentTopic) + +proc unsubscribe*( + self: MessagingClient, contentTopic: ContentTopic +): Result[void, string] = + ?self.checkApiAvailability() + return self.node.subscriptionManager.unsubscribe(contentTopic) diff --git a/logos_delivery/messaging/delivery_service/recv_service/recv_service.nim b/logos_delivery/messaging/delivery_service/recv_service/recv_service.nim index 90bdb0839..a6e9701d9 100644 --- a/logos_delivery/messaging/delivery_service/recv_service/recv_service.nim +++ b/logos_delivery/messaging/delivery_service/recv_service/recv_service.nim @@ -13,11 +13,12 @@ import waku_store/client, waku_store/common, waku_filter_v2/client, - events/message_events, - events/health_events, + api/events/message_events, + api/events/health_events, waku_node, node/subscription_manager, ] +import logos_delivery/messaging/api/events const MaxMessageLife = chronos.minutes(7) ## Max time we will keep track of rx messages diff --git a/logos_delivery/messaging/delivery_service/send_service/send_service.nim b/logos_delivery/messaging/delivery_service/send_service/send_service.nim index 00f2ff672..5e21ac4ba 100644 --- a/logos_delivery/messaging/delivery_service/send_service/send_service.nim +++ b/logos_delivery/messaging/delivery_service/send_service/send_service.nim @@ -18,8 +18,8 @@ import waku_rln_relay/rln_relay, waku_lightpush/client, waku_lightpush/callbacks, - events/message_events, ] +import logos_delivery/messaging/api/events logScope: topics = "send service" diff --git a/logos_delivery/messaging/messaging_client.nim b/logos_delivery/messaging/messaging_client.nim index 96cd13eb1..a92d045f5 100644 --- a/logos_delivery/messaging/messaging_client.nim +++ b/logos_delivery/messaging/messaging_client.nim @@ -1,10 +1,10 @@ +## Messaging layer core: the `MessagingClient` type plus its construction and +## lifecycle. The public operations (subscribe / unsubscribe / send) live in +## `messaging/api.nim`. import results, chronos -import chronicles import - logos_delivery/api/types, - logos_delivery/waku/node/[waku_node, subscription_manager], - logos_delivery/messaging/delivery_service/[recv_service, send_service], - logos_delivery/messaging/delivery_service/send_service/delivery_task + logos_delivery/waku/node/waku_node, + logos_delivery/messaging/delivery_service/[recv_service, send_service] type MessagingClientConf* = object @@ -14,7 +14,7 @@ type useP2PReliability*: bool MessagingClient* = ref object - node: WakuNode + node*: WakuNode ## Waku core driven by this layer; read by `messaging/api.nim`. sendService*: SendService recvService*: RecvService started: bool @@ -43,46 +43,9 @@ proc stop*(self: MessagingClient) {.async.} = await self.recvService.stopRecvService() self.started = false -proc checkApiAvailability(self: MessagingClient): Result[void, string] = +proc checkApiAvailability*(self: MessagingClient): Result[void, string] = + ## Shared guard for the api operation module. if self.isNil(): return err("MessagingClient is not initialized") return ok() - -proc subscribe*( - self: MessagingClient, contentTopic: ContentTopic -): Future[Result[void, string]] {.async.} = - ?checkApiAvailability(self) - - return self.node.subscriptionManager.subscribe(contentTopic) - -proc unsubscribe*( - self: MessagingClient, contentTopic: ContentTopic -): Result[void, string] = - ?checkApiAvailability(self) - - return self.node.subscriptionManager.unsubscribe(contentTopic) - -proc send*( - self: MessagingClient, envelope: MessageEnvelope -): Future[Result[RequestId, string]] {.async.} = - ## High-level messaging API send. Auto-subscribes to the content topic - ## (so the local node sees its own gossipsub broadcast), builds a - ## `DeliveryTask`, and hands it to the send service. Returns the request - ## id the caller can correlate with `MessageSentEvent` / `MessageErrorEvent`. - let isSubbed = - self.node.subscriptionManager.isSubscribed(envelope.contentTopic).valueOr(false) - if not isSubbed: - info "Auto-subscribing to topic on send", contentTopic = envelope.contentTopic - self.node.subscriptionManager.subscribe(envelope.contentTopic).isOkOr: - warn "Failed to auto-subscribe", error = error - return err("Failed to auto-subscribe before sending: " & error) - - let requestId = RequestId.new(self.node.rng) - - let deliveryTask = DeliveryTask.new(requestId, envelope, self.node.brokerCtx).valueOr: - return err("MessagingClient.send: Failed to create delivery task: " & error) - - asyncSpawn self.sendService.send(deliveryTask) - - return ok(requestId) diff --git a/logos_delivery/waku/api/debug.nim b/logos_delivery/waku/api/debug.nim new file mode 100644 index 000000000..10582316b --- /dev/null +++ b/logos_delivery/waku/api/debug.nim @@ -0,0 +1,36 @@ +## Waku layer API — debug / info operations. +{.push raises: [].} + +import results, chronos, chronicles, metrics +import eth/p2p/discoveryv5/enr + +import logos_delivery/waku/waku +import logos_delivery/waku/[waku_core, node/waku_node] + +proc version*(self: Waku): Future[Result[string, string]] {.async.} = + return ok(WakuNodeVersionString) + +proc listenAddresses*(self: Waku): Future[Result[seq[string], string]] {.async.} = + try: + return ok(self.node.info().listenAddresses) + except CatchableError as e: + return err(e.msg) + +proc myEnr*(self: Waku): Future[Result[string, string]] {.async.} = + try: + return ok(self.node.enr.toURI()) + except CatchableError as e: + return err(e.msg) + +proc myPeerId*(self: Waku): Future[Result[string, string]] {.async.} = + try: + return ok($self.node.peerId()) + except CatchableError as e: + return err(e.msg) + +proc metrics*(self: Waku): Future[Result[string, string]] {.async.} = + {.gcsafe.}: + try: + return ok(defaultRegistry.toText()) + except CatchableError as e: + return err(e.msg) diff --git a/logos_delivery/waku/api/discovery.nim b/logos_delivery/waku/api/discovery.nim new file mode 100644 index 000000000..fefeb3d5d --- /dev/null +++ b/logos_delivery/waku/api/discovery.nim @@ -0,0 +1,76 @@ +## Waku layer API — discovery operations (DNS, discv5, peer exchange). +{.push raises: [].} + +import std/[net, sequtils] +import results, chronos, chronicles + +import logos_delivery/waku/waku +import + logos_delivery/waku/[ + waku_core, + node/waku_node, + node/waku_node/peer_exchange, + discovery/waku_dnsdisc, + discovery/waku_discv5, + ] + +proc dnsDiscovery*( + self: Waku, enrTreeUrl: string, nameServer: string, timeoutMs: int +): Future[Result[seq[string], string]] {.async.} = + try: + let dnsNameServers = @[parseIpAddress(nameServer)] + let discoveredPeers = ( + await retrieveDynamicBootstrapNodes(enrTreeUrl, dnsNameServers) + ).valueOr: + return err("failed discovering peers from DNS: " & $error) + + var multiAddresses = newSeq[string]() + for discPeer in discoveredPeers: + for address in discPeer.addrs: + multiAddresses.add($address & "/p2p/" & $discPeer) + + return ok(multiAddresses) + except CatchableError as e: + return err(e.msg) + +proc discv5UpdateBootnodes*( + self: Waku, bootnodes: string +): Future[Result[bool, string]] {.async.} = + ## `bootnodes` is a JSON array of ENRs, e.g. `["enr:...", "enr:..."]`. + try: + if self.wakuDiscv5.isNil(): + return err("discv5 not started") + self.wakuDiscv5.updateBootstrapRecords(bootnodes).isOkOr: + return err("error in discv5UpdateBootnodes: " & $error) + return ok(true) + except CatchableError as e: + return err(e.msg) + +proc startDiscv5*(self: Waku): Future[Result[bool, string]] {.async.} = + try: + if self.wakuDiscv5.isNil(): + return err("discv5 not started") + (await self.wakuDiscv5.start()).isOkOr: + return err("error starting discv5: " & $error) + return ok(true) + except CatchableError as e: + return err(e.msg) + +proc stopDiscv5*(self: Waku): Future[Result[bool, string]] {.async.} = + try: + if self.wakuDiscv5.isNil(): + return err("discv5 not started") + await self.wakuDiscv5.stop() + return ok(true) + except CatchableError as e: + return err(e.msg) + +proc peerExchangeRequest*( + self: Waku, numPeers: uint64 +): Future[Result[int, string]] {.async.} = + try: + let numPeersRecv = (await self.node.fetchPeerExchangePeers(numPeers)).valueOr: + return err("failed peer exchange: " & $error) + return ok(numPeersRecv) + except CatchableError as e: + return err(e.msg) diff --git a/logos_delivery/waku/events/discovery_events.nim b/logos_delivery/waku/api/events/discovery_events.nim similarity index 100% rename from logos_delivery/waku/events/discovery_events.nim rename to logos_delivery/waku/api/events/discovery_events.nim diff --git a/logos_delivery/waku/api/events/events.nim b/logos_delivery/waku/api/events/events.nim new file mode 100644 index 000000000..1340d60ba --- /dev/null +++ b/logos_delivery/waku/api/events/events.nim @@ -0,0 +1,8 @@ +import + ./[ + message_events, filter_subscribe_events, health_events, peer_events, + discovery_events, + ] + +export + message_events, filter_subscribe_events, health_events, peer_events, discovery_events diff --git a/logos_delivery/waku/events/delivery_events.nim b/logos_delivery/waku/api/events/filter_subscribe_events.nim similarity index 100% rename from logos_delivery/waku/events/delivery_events.nim rename to logos_delivery/waku/api/events/filter_subscribe_events.nim diff --git a/logos_delivery/waku/events/health_events.nim b/logos_delivery/waku/api/events/health_events.nim similarity index 100% rename from logos_delivery/waku/events/health_events.nim rename to logos_delivery/waku/api/events/health_events.nim diff --git a/logos_delivery/waku/api/events/message_events.nim b/logos_delivery/waku/api/events/message_events.nim new file mode 100644 index 000000000..638d4f38a --- /dev/null +++ b/logos_delivery/waku/api/events/message_events.nim @@ -0,0 +1,10 @@ +import brokers/event_broker +import logos_delivery/api/types +import logos_delivery/waku/[waku_core/message, waku_core/topics] +export event_broker, types + +EventBroker: + # Internal event emitted when a message arrives from the network via any protocol + type MessageSeenEvent* = object + topic*: PubsubTopic + message*: WakuMessage diff --git a/logos_delivery/waku/events/peer_events.nim b/logos_delivery/waku/api/events/peer_events.nim similarity index 100% rename from logos_delivery/waku/events/peer_events.nim rename to logos_delivery/waku/api/events/peer_events.nim diff --git a/logos_delivery/waku/api/filter.nim b/logos_delivery/waku/api/filter.nim new file mode 100644 index 000000000..06c78213f --- /dev/null +++ b/logos_delivery/waku/api/filter.nim @@ -0,0 +1,88 @@ +## Waku layer API — filter (light client) operations. +import logos_delivery/waku/compat/option_valueor +{.push raises: [].} + +import std/options +import results, chronos, chronicles + +import logos_delivery/waku/waku +import + logos_delivery/waku/[ + waku_core, + waku_core/subscription/push_handler, + node/waku_node, + node/waku_node/filter, + node/peer_manager, + waku_filter_v2/client, + waku_filter_v2/common, + ] + +const FilterOpTimeout = 5.seconds + +proc filterSubscribe*( + self: Waku, + pubsubTopic: PubsubTopic, + contentTopics: seq[ContentTopic], + pushHandler: FilterPushHandler, +): Future[Result[bool, string]] {.async.} = + ## Registers `pushHandler` for incoming filtered messages, selects a filter + ## service peer, and subscribes. + try: + if self.node.wakuFilterClient.isNil(): + return err("wakuFilterClient is not mounted") + + self.node.wakuFilterClient.registerPushHandler(pushHandler) + + let peer = self.node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr: + return err("could not find peer with WakuFilterSubscribeCodec when subscribing") + + let subFut = self.node.filterSubscribe(some(pubsubTopic), contentTopics, peer) + if not await subFut.withTimeout(FilterOpTimeout): + return err("filter subscription timed out") + subFut.read().isOkOr: + return err($error) + + return ok(true) + except CatchableError as e: + return err(e.msg) + +proc filterUnsubscribe*( + self: Waku, pubsubTopic: PubsubTopic, contentTopics: seq[ContentTopic] +): Future[Result[bool, string]] {.async.} = + ## Selects a filter service peer and unsubscribes the given content topics. + try: + if self.node.wakuFilterClient.isNil(): + return err("wakuFilterClient is not mounted") + + let peer = self.node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr: + return err("could not find peer with WakuFilterSubscribeCodec when unsubscribing") + + let unsubFut = self.node.filterUnsubscribe(some(pubsubTopic), contentTopics, peer) + if not await unsubFut.withTimeout(FilterOpTimeout): + return err("filter un-subscription timed out") + unsubFut.read().isOkOr: + return err($error) + + return ok(true) + except CatchableError as e: + return err(e.msg) + +proc filterUnsubscribeAll*(self: Waku): Future[Result[bool, string]] {.async.} = + ## Selects a filter service peer and unsubscribes from everything. + try: + if self.node.wakuFilterClient.isNil(): + return err("wakuFilterClient is not mounted") + + let peer = self.node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr: + return + err("could not find peer with WakuFilterSubscribeCodec when unsubscribing all") + + let unsubFut = self.node.filterUnsubscribeAll(peer) + if not await unsubFut.withTimeout(FilterOpTimeout): + return err("filter un-subscription all timed out") + unsubFut.read().isOkOr: + return err($error) + + return ok(true) + except CatchableError as e: + return err(e.msg) diff --git a/logos_delivery/waku/api/health.nim b/logos_delivery/waku/api/health.nim new file mode 100644 index 000000000..7faeebe4d --- /dev/null +++ b/logos_delivery/waku/api/health.nim @@ -0,0 +1,13 @@ +## Waku layer API — health / connectivity. +{.push raises: [].} + +import results, chronos, chronicles + +import logos_delivery/waku/waku +import logos_delivery/waku/[node/health_monitor, node/health_monitor/online_monitor] + +proc isOnline*(self: Waku): Future[Result[bool, string]] {.async.} = + try: + return ok(self.healthMonitor.onlineMonitor.amIOnline()) + except CatchableError as e: + return err(e.msg) diff --git a/logos_delivery/waku/api/lightpush.nim b/logos_delivery/waku/api/lightpush.nim new file mode 100644 index 000000000..6208a2643 --- /dev/null +++ b/logos_delivery/waku/api/lightpush.nim @@ -0,0 +1,37 @@ +## Waku layer API — lightpush (light client publish) operations. +import logos_delivery/waku/compat/option_valueor +{.push raises: [].} + +import results, chronos, chronicles + +import logos_delivery/waku/waku +import + logos_delivery/waku/[ + waku_core, + waku_core/codecs, + node/waku_node, + node/peer_manager, + waku_lightpush_legacy/client, + ] + +proc lightpushPublish*( + self: Waku, pubsubTopic: PubsubTopic, message: WakuMessage +): Future[Result[string, string]] {.async.} = + ## Selects a lightpush service peer and publishes; returns the message hash. + try: + if self.node.wakuLegacyLightpushClient.isNil(): + return err("wakuLegacyLightpushClient is not mounted") + + let remotePeer = self.node.peerManager.selectPeer(WakuLightPushCodec).valueOr: + return err("failed to lightpublish message, no suitable remote peers") + + let msgHashHex = ( + await self.node.wakuLegacyLightpushClient.publish( + pubsubTopic, message, remotePeer + ) + ).valueOr: + return err($error) + + return ok(msgHashHex) + except CatchableError as e: + return err(e.msg) diff --git a/logos_delivery/waku/api/peer_manager.nim b/logos_delivery/waku/api/peer_manager.nim new file mode 100644 index 000000000..26de46594 --- /dev/null +++ b/logos_delivery/waku/api/peer_manager.nim @@ -0,0 +1,112 @@ +## Waku layer API — peer management operations. +{.push raises: [].} + +import std/[options, sequtils, strutils] +import results, chronos, chronicles +import libp2p/[peerid, peerstore] + +import logos_delivery/waku/waku +import logos_delivery/waku/[waku_core, node/waku_node, node/peer_manager] + +type PeerConnInfo* = object ## structured connected-peer info for the api boundary + peerId*: string + protocols*: seq[string] + addresses*: seq[string] + +proc connect*( + self: Waku, peers: seq[string], timeoutMs: uint32 +): Future[Result[bool, string]] {.async.} = + try: + await self.node.connectToNodes(peers.mapIt(strip(it)), source = "static") + return ok(true) + except CatchableError as e: + return err(e.msg) + +proc disconnectPeerById*( + self: Waku, peerId: string +): Future[Result[bool, string]] {.async.} = + try: + let pId = PeerId.init(peerId).valueOr: + return err($error) + await self.node.peerManager.disconnectNode(pId) + return ok(true) + except CatchableError as e: + return err(e.msg) + +proc disconnectAllPeers*(self: Waku): Future[Result[bool, string]] {.async.} = + try: + await self.node.peerManager.disconnectAllPeers() + return ok(true) + except CatchableError as e: + return err(e.msg) + +proc dialPeer*( + self: Waku, peerAddr: string, protocol: string, timeoutMs: int +): Future[Result[bool, string]] {.async.} = + try: + let remotePeerInfo = parsePeerInfo(peerAddr).valueOr: + return err($error) + let conn = await self.node.peerManager.dialPeer(remotePeerInfo, protocol) + if conn.isNone(): + return err("failed dialing peer") + return ok(true) + except CatchableError as e: + return err(e.msg) + +proc dialPeerById*( + self: Waku, peerId: string, protocol: string, timeoutMs: int +): Future[Result[bool, string]] {.async.} = + try: + let pId = PeerId.init(peerId).valueOr: + return err($error) + let conn = await self.node.peerManager.dialPeer(pId, protocol) + if conn.isNone(): + return err("failed dialing peer") + return ok(true) + except CatchableError as e: + return err(e.msg) + +proc peerIdsFromPeerstore*(self: Waku): Future[Result[seq[string], string]] {.async.} = + try: + return ok(self.node.peerManager.switch.peerStore.peers().mapIt($it.peerId)) + except CatchableError as e: + return err(e.msg) + +proc connectedPeersInfo*( + self: Waku +): Future[Result[seq[PeerConnInfo], string]] {.async.} = + ## Structured info (protocols, addresses) for every connected peer. + try: + var infos: seq[PeerConnInfo] + for peer in self.node.peerManager.switch.peerStore.peers(): + if peer.connectedness == Connected: + infos.add( + PeerConnInfo( + peerId: $peer.peerId, + protocols: peer.protocols, + addresses: peer.addrs.mapIt($it), + ) + ) + return ok(infos) + except CatchableError as e: + return err(e.msg) + +proc connectedPeers*(self: Waku): Future[Result[seq[string], string]] {.async.} = + try: + let (inPeerIds, outPeerIds) = self.node.peerManager.connectedPeers() + return ok(concat(inPeerIds, outPeerIds).mapIt($it)) + except CatchableError as e: + return err(e.msg) + +proc peerIdsByProtocol*( + self: Waku, protocol: string +): Future[Result[seq[string], string]] {.async.} = + try: + return ok( + self.node.peerManager.switch.peerStore + .peers(protocol) + .filterIt(it.connectedness == Connected) + .mapIt($it.peerId) + ) + except CatchableError as e: + return err(e.msg) diff --git a/logos_delivery/waku/api/ping.nim b/logos_delivery/waku/api/ping.nim new file mode 100644 index 000000000..9cf2dfc33 --- /dev/null +++ b/logos_delivery/waku/api/ping.nim @@ -0,0 +1,45 @@ +## Waku layer API — ping operation. +{.push raises: [].} + +import results, chronos, chronicles +import libp2p/protocols/ping +import libp2p/switch + +import logos_delivery/waku/waku +import logos_delivery/waku/[waku_core, node/waku_node, node/waku_node/ping] + +proc pingPeer*( + self: Waku, peerAddr: string, timeoutMs: int +): Future[Result[int64, string]] {.async.} = + ## Pings the peer; `timeoutMs <= 0` means no timeout. Returns RTT in nanos. + try: + let peerInfo = parsePeerInfo(peerAddr).valueOr: + return err("pingPeer failed to parse peer addr: " & $error) + + proc doPing(): Future[Result[Duration, string]] {.async.} = + try: + let conn = + await self.node.switch.dial(peerInfo.peerId, peerInfo.addrs, PingCodec) + defer: + await conn.close() + let rtt = await self.node.libp2pPing.ping(conn) + if rtt == 0.nanos: + return err("could not ping peer: rtt-0") + return ok(rtt) + except CatchableError as e: + return err("could not ping peer: " & e.msg) + + let pingFut = doPing() + let rtt: Duration = + if timeoutMs <= 0: + (await pingFut).valueOr: + return err(error) + else: + if not await pingFut.withTimeout(chronos.milliseconds(timeoutMs)): + return err("ping timed out") + pingFut.read().valueOr: + return err(error) + + return ok(rtt.nanos) + except CatchableError as e: + return err(e.msg) diff --git a/logos_delivery/waku/api/relay.nim b/logos_delivery/waku/api/relay.nim new file mode 100644 index 000000000..930221300 --- /dev/null +++ b/logos_delivery/waku/api/relay.nim @@ -0,0 +1,132 @@ +## Waku layer API — relay (gossipsub) operations. +{.push raises: [].} + +import std/sequtils +import results, chronos, chronicles, secp256k1, stew/byteutils + +import logos_delivery/waku/waku +import + logos_delivery/waku/[ + waku_core, + node/waku_node, + node/waku_node/relay, + node/subscription_manager, + waku_relay/protocol, + factory/waku_conf, + factory/validator_signed, + ] + +proc relayPublish*( + self: Waku, pubsubTopic: PubsubTopic, message: WakuMessage, timeoutMs: uint32 +): Future[Result[string, string]] {.async.} = + ## Publishes `message` and returns its message hash (0x-hex). + try: + if self.node.wakuRelay.isNil(): + return err("relayPublish: WakuRelay not mounted") + + (await self.node.wakuRelay.publish(pubsubTopic, message)).isOkOr: + return err($error) + + return ok(computeMessageHash(pubsubTopic, message).to0xHex) + except CatchableError as e: + return err(e.msg) + +proc relaySubscribe*( + self: Waku, + pubsubTopic: PubsubTopic, + handler: WakuRelayHandler = WakuRelayHandler(nil), +): Future[Result[bool, string]] {.async.} = + ## Subscribes to `pubsubTopic`. `handler` (optional) is invoked per message; + ## pass nil to subscribe without a message callback. + try: + if self.node.wakuRelay.isNil(): + return err("relaySubscribe: WakuRelay not mounted") + + self.node.subscribe((kind: SubscriptionKind.PubsubSub, topic: pubsubTopic), handler).isOkOr: + return err($error) + + return ok(true) + except CatchableError as e: + return err(e.msg) + +proc relayUnsubscribe*( + self: Waku, pubsubTopic: PubsubTopic +): Future[Result[bool, string]] {.async.} = + try: + if self.node.wakuRelay.isNil(): + return err("relayUnsubscribe: WakuRelay not mounted") + + self.node.unsubscribe((kind: SubscriptionKind.PubsubSub, topic: pubsubTopic)).isOkOr: + return err($error) + + return ok(true) + except CatchableError as e: + return err(e.msg) + +proc relayAddProtectedShard*( + self: Waku, clusterId: uint16, shardId: uint16, publicKey: string +): Future[Result[bool, string]] {.async.} = + try: + if self.node.wakuRelay.isNil(): + return err("relayAddProtectedShard: WakuRelay not mounted") + + let pubKey = SkPublicKey.fromHex(publicKey).valueOr: + return err("relayAddProtectedShard: invalid public key: " & $error) + + let protectedShard = ProtectedShard(shard: shardId, key: pubKey) + self.node.wakuRelay.addSignedShardsValidator(@[protectedShard], clusterId) + return ok(true) + except CatchableError as e: + return err(e.msg) + +proc relayConnectedPeers*( + self: Waku, pubsubTopic: PubsubTopic +): Future[Result[seq[string], string]] {.async.} = + try: + if self.node.wakuRelay.isNil(): + return err("relayConnectedPeers: WakuRelay not mounted") + + let connPeers = self.node.wakuRelay.getConnectedPeers(pubsubTopic).valueOr: + return err($error) + + return ok(connPeers.mapIt($it)) + except CatchableError as e: + return err(e.msg) + +proc relayPeersInMesh*( + self: Waku, pubsubTopic: PubsubTopic +): Future[Result[seq[string], string]] {.async.} = + try: + if self.node.wakuRelay.isNil(): + return err("relayPeersInMesh: WakuRelay not mounted") + + let meshPeers = self.node.wakuRelay.getPeersInMesh(pubsubTopic).valueOr: + return err($error) + + return ok(meshPeers.mapIt($it)) + except CatchableError as e: + return err(e.msg) + +proc relayNumPeersInMesh*( + self: Waku, pubsubTopic: PubsubTopic +): Future[Result[int, string]] {.async.} = + try: + if self.node.wakuRelay.isNil(): + return err("relayNumPeersInMesh: WakuRelay not mounted") + let n = self.node.wakuRelay.getNumPeersInMesh(pubsubTopic).valueOr: + return err($error) + return ok(n) + except CatchableError as e: + return err(e.msg) + +proc relayNumConnectedPeers*( + self: Waku, pubsubTopic: PubsubTopic +): Future[Result[int, string]] {.async.} = + try: + if self.node.wakuRelay.isNil(): + return err("relayNumConnectedPeers: WakuRelay not mounted") + let n = self.node.wakuRelay.getNumConnectedPeers(pubsubTopic).valueOr: + return err($error) + return ok(n) + except CatchableError as e: + return err(e.msg) diff --git a/logos_delivery/waku/api/store.nim b/logos_delivery/waku/api/store.nim new file mode 100644 index 000000000..5095dbb0d --- /dev/null +++ b/logos_delivery/waku/api/store.nim @@ -0,0 +1,29 @@ +## Waku layer API — store (historical query) operations. +{.push raises: [].} + +import results, chronos, chronicles + +import logos_delivery/waku/waku +import + logos_delivery/waku/[waku_core, node/waku_node, waku_store/common, waku_store/client] + +proc storeQuery*( + self: Waku, request: StoreQueryRequest, peer: string, timeoutMs: int +): Future[Result[StoreQueryResponse, string]] {.async.} = + try: + if self.node.wakuStoreClient.isNil(): + return err("wakuStoreClient is not mounted") + + let remotePeer = parsePeerInfo(peer).valueOr: + return err("storeQuery failed to parse peer addr: " & $error) + + let queryFut = self.node.wakuStoreClient.query(request, remotePeer) + if not await queryFut.withTimeout(timeoutMs.milliseconds): + return err("storeQuery timed out") + + let queryResponse = queryFut.read().valueOr: + return err("storeQuery failed: " & $error) + + return ok(queryResponse) + except CatchableError as e: + return err(e.msg) diff --git a/logos_delivery/waku/api/topics.nim b/logos_delivery/waku/api/topics.nim new file mode 100644 index 000000000..223291169 --- /dev/null +++ b/logos_delivery/waku/api/topics.nim @@ -0,0 +1,27 @@ +## Waku layer API — topic construction. +{.push raises: [].} + +import std/strformat +import results, chronos + +import logos_delivery/waku/waku +import logos_delivery/waku/waku_core + +proc buildContentTopic*( + self: Waku, appName: string, appVersion: uint32, name: string, encoding: string +): Future[Result[ContentTopic, string]] {.async.} = + try: + return ok(ContentTopic(fmt"/{appName}/{appVersion}/{name}/{encoding}")) + except CatchableError as e: + return err(e.msg) + +proc buildPubsubTopic*( + self: Waku, topicName: string +): Future[Result[PubsubTopic, string]] {.async.} = + try: + return ok(PubsubTopic(fmt"/waku/2/{topicName}")) + except CatchableError as e: + return err(e.msg) + +proc defaultPubsubTopic*(self: Waku): Future[Result[PubsubTopic, string]] {.async.} = + return ok(DefaultPubsubTopic) diff --git a/logos_delivery/waku/discovery/waku_kademlia.nim b/logos_delivery/waku/discovery/waku_kademlia.nim index 9b0e43abc..e6775530c 100644 --- a/logos_delivery/waku/discovery/waku_kademlia.nim +++ b/logos_delivery/waku/discovery/waku_kademlia.nim @@ -21,7 +21,7 @@ import import logos_delivery/waku/waku_core, logos_delivery/waku/node/peer_manager, - logos_delivery/waku/events/discovery_events + logos_delivery/waku/api/events/discovery_events logScope: topics = "waku service discovery" diff --git a/logos_delivery/waku/events/events.nim b/logos_delivery/waku/events/events.nim deleted file mode 100644 index 130d7c018..000000000 --- a/logos_delivery/waku/events/events.nim +++ /dev/null @@ -1,9 +0,0 @@ -import - ./[ - message_events, delivery_events, health_events, peer_events, lifecycle_events, - discovery_events, - ] - -export - message_events, delivery_events, health_events, peer_events, lifecycle_events, - discovery_events diff --git a/logos_delivery/waku/factory/app_callbacks.nim b/logos_delivery/waku/factory/app_callbacks.nim index f1d3369be..0945b56bf 100644 --- a/logos_delivery/waku/factory/app_callbacks.nim +++ b/logos_delivery/waku/factory/app_callbacks.nim @@ -1,5 +1,9 @@ import ../waku_relay, ../node/peer_manager, ../node/health_monitor/connection_status +# Re-export the modules that define the handler types below, so that consumers +# of `AppCallbacks` (e.g. the FFI library) can construct the handlers. +export waku_relay, peer_manager, connection_status + type AppCallbacks* = ref object relayHandler*: WakuRelayHandler topicHealthChangeHandler*: TopicHealthChangeHandler diff --git a/logos_delivery/waku/factory/node_factory.nim b/logos_delivery/waku/factory/node_factory.nim index 30e37850a..c4f5ab32d 100644 --- a/logos_delivery/waku/factory/node_factory.nim +++ b/logos_delivery/waku/factory/node_factory.nim @@ -39,7 +39,7 @@ import ../node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations, ../waku_lightpush_legacy/common, ../common/rate_limit/setting, - ../events/discovery_events + ../api/events/discovery_events ## Peer persistence diff --git a/logos_delivery/waku/node/health_monitor/node_health_monitor.nim b/logos_delivery/waku/node/health_monitor/node_health_monitor.nim index 465598794..d55499d1a 100644 --- a/logos_delivery/waku/node/health_monitor/node_health_monitor.nim +++ b/logos_delivery/waku/node/health_monitor/node_health_monitor.nim @@ -12,8 +12,8 @@ import logos_delivery/waku/[ waku_relay, waku_rln_relay, - events/health_events, - events/peer_events, + api/events/health_events, + api/events/peer_events, node/waku_node, node/node_telemetry, node/peer_manager, diff --git a/logos_delivery/waku/node/peer_manager/peer_manager.nim b/logos_delivery/waku/node/peer_manager/peer_manager.nim index 60bd2acb1..31c6133ab 100644 --- a/logos_delivery/waku/node/peer_manager/peer_manager.nim +++ b/logos_delivery/waku/node/peer_manager/peer_manager.nim @@ -20,7 +20,7 @@ import waku_relay/protocol, waku_enr/sharding, waku_enr/capabilities, - events/peer_events, + api/events/peer_events, common/nimchronos, common/enr, common/callbacks, diff --git a/logos_delivery/waku/node/subscription_manager.nim b/logos_delivery/waku/node/subscription_manager.nim index 15b582ea6..69f67ab3b 100644 --- a/logos_delivery/waku/node/subscription_manager.nim +++ b/logos_delivery/waku/node/subscription_manager.nim @@ -15,9 +15,9 @@ import waku_filter_v2/common as filter_common, waku_filter_v2/client as filter_client, waku_filter_v2/protocol as filter_protocol, - events/health_events, - events/message_events, - events/peer_events, + api/events/health_events, + api/events/message_events, + api/events/peer_events, requests/health_requests, node/peer_manager, node/health_monitor/topic_health, diff --git a/logos_delivery/waku/node/waku_node.nim b/logos_delivery/waku/node/waku_node.nim index 2ad7dc601..a329226cc 100644 --- a/logos_delivery/waku/node/waku_node.nim +++ b/logos_delivery/waku/node/waku_node.nim @@ -59,9 +59,9 @@ import waku_mix, requests/node_requests, requests/health_requests, - events/health_events, - events/message_events, - events/peer_events, + api/events/health_events, + api/events/message_events, + api/events/peer_events, ], logos_delivery/waku/discovery/waku_kademlia, logos_delivery/waku/net/[bound_ports, net_config], diff --git a/logos_delivery/waku/node/waku_node/relay.nim b/logos_delivery/waku/node/waku_node/relay.nim index 57904dc94..f2a2772e0 100644 --- a/logos_delivery/waku/node/waku_node/relay.nim +++ b/logos_delivery/waku/node/waku_node/relay.nim @@ -32,7 +32,7 @@ import node/waku_node, node/subscription_manager, node/peer_manager, - events/message_events, + api/events/message_events, ] export waku_relay.WakuRelayHandler diff --git a/logos_delivery/waku/waku.nim b/logos_delivery/waku/waku.nim index 2c18c5f63..4669deabd 100644 --- a/logos_delivery/waku/waku.nim +++ b/logos_delivery/waku/waku.nim @@ -63,8 +63,6 @@ logScope: # Git version in git describe format (defined at compile time) const git_version* {.strdefine.} = "n/a" -const FilterOpTimeout = 5.seconds - type Waku* = ref object stateInfo*: WakuStateInfo conf*: WakuConf @@ -574,418 +572,4 @@ proc stop*(waku: Waku): Future[Result[void, string]] {.async: (raises: []).} = return ok() -## Kernel API realization -## -# --- topic construction --- -proc buildContentTopic*( - self: Waku, appName: string, appVersion: uint32, name: string, encoding: string -): Future[Result[ContentTopic, string]] {.async.} = - try: - return ok(ContentTopic(fmt"/{appName}/{appVersion}/{name}/{encoding}")) - except CatchableError as e: - return err(e.msg) - -proc buildPubsubTopic*( - self: Waku, topicName: string -): Future[Result[PubsubTopic, string]] {.async.} = - try: - return ok(PubsubTopic(fmt"/waku/2/{topicName}")) - except CatchableError as e: - return err(e.msg) - -proc defaultPubsubTopic*(self: Waku): Future[Result[PubsubTopic, string]] {.async.} = - return ok(DefaultPubsubTopic) - -# --- relay --- -proc relayPublish*( - self: Waku, pubsubTopic: PubsubTopic, message: WakuMessage, timeoutMs: uint32 -): Future[Result[int, string]] {.async.} = - try: - if self.node.wakuRelay.isNil(): - return err("relayPublish: WakuRelay not mounted") - - let numPeers = (await self.node.wakuRelay.publish(pubsubTopic, message)).valueOr: - return err($error) - - return ok(numPeers) - except CatchableError as e: - return err(e.msg) - -proc relaySubscribe*( - self: Waku, pubsubTopic: PubsubTopic -): Future[Result[bool, string]] {.async.} = - try: - if self.node.wakuRelay.isNil(): - return err("relaySubscribe: WakuRelay not mounted") - - self.node.subscribe( - (kind: SubscriptionKind.PubsubSub, topic: pubsubTopic), WakuRelayHandler(nil) - ).isOkOr: - return err($error) - - return ok(true) - except CatchableError as e: - return err(e.msg) - -proc relayUnsubscribe*( - self: Waku, pubsubTopic: PubsubTopic -): Future[Result[bool, string]] {.async.} = - try: - if self.node.wakuRelay.isNil(): - return err("relayUnsubscribe: WakuRelay not mounted") - - self.node.unsubscribe((kind: SubscriptionKind.PubsubSub, topic: pubsubTopic)).isOkOr: - return err($error) - - return ok(true) - except CatchableError as e: - return err(e.msg) - -proc relayAddProtectedShard*( - self: Waku, clusterId: uint16, shardId: uint16, publicKey: string -): Future[Result[bool, string]] {.async.} = - try: - if self.node.wakuRelay.isNil(): - return err("relayAddProtectedShard: WakuRelay not mounted") - - let pubKey = SkPublicKey.fromHex(publicKey).valueOr: - return err("relayAddProtectedShard: invalid public key: " & $error) - - let protectedShard = ProtectedShard(shard: shardId, key: pubKey) - self.node.wakuRelay.addSignedShardsValidator(@[protectedShard], clusterId) - return ok(true) - except CatchableError as e: - return err(e.msg) - -proc relayConnectedPeers*( - self: Waku, pubsubTopic: PubsubTopic -): Future[Result[seq[string], string]] {.async.} = - try: - if self.node.wakuRelay.isNil(): - return err("relayConnectedPeers: WakuRelay not mounted") - - let connPeers = self.node.wakuRelay.getConnectedPeers(pubsubTopic).valueOr: - return err($error) - - return ok(connPeers.mapIt($it)) - except CatchableError as e: - return err(e.msg) - -proc relayPeersInMesh*( - self: Waku, pubsubTopic: PubsubTopic -): Future[Result[seq[string], string]] {.async.} = - try: - if self.node.wakuRelay.isNil(): - return err("relayPeersInMesh: WakuRelay not mounted") - - let meshPeers = self.node.wakuRelay.getPeersInMesh(pubsubTopic).valueOr: - return err($error) - - return ok(meshPeers.mapIt($it)) - except CatchableError as e: - return err(e.msg) - -# --- filter --- -proc filterSubscribe*( - self: Waku, - pubsubTopic: Option[PubsubTopic], - contentTopics: seq[ContentTopic], - peer: string, -): Future[Result[bool, string]] {.async.} = - try: - if self.node.wakuFilterClient.isNil(): - return err("wakuFilterClient is not mounted") - - let subFut = self.node.filterSubscribe(pubsubTopic, contentTopics, peer) - if not await subFut.withTimeout(FilterOpTimeout): - return err("filter subscription timed out") - subFut.read().isOkOr: - return err($error) - - return ok(true) - except CatchableError as e: - return err(e.msg) - -proc filterUnsubscribe*( - self: Waku, - pubsubTopic: Option[PubsubTopic], - contentTopics: seq[ContentTopic], - peer: string, -): Future[Result[bool, string]] {.async.} = - try: - if self.node.wakuFilterClient.isNil(): - return err("wakuFilterClient is not mounted") - - let unsubFut = self.node.filterUnsubscribe(pubsubTopic, contentTopics, peer) - if not await unsubFut.withTimeout(FilterOpTimeout): - return err("filter un-subscription timed out") - unsubFut.read().isOkOr: - return err($error) - - return ok(true) - except CatchableError as e: - return err(e.msg) - -proc filterUnsubscribeAll*( - self: Waku, peer: string -): Future[Result[bool, string]] {.async.} = - try: - if self.node.wakuFilterClient.isNil(): - return err("wakuFilterClient is not mounted") - - let unsubFut = self.node.filterUnsubscribeAll(peer) - if not await unsubFut.withTimeout(FilterOpTimeout): - return err("filter un-subscription all timed out") - unsubFut.read().isOkOr: - return err($error) - - return ok(true) - except CatchableError as e: - return err(e.msg) - -# --- lightpush --- -proc lightpushPublish*( - self: Waku, pubsubTopic: PubsubTopic, message: WakuMessage, peer: string -): Future[Result[string, string]] {.async.} = - try: - if self.node.wakuLegacyLightpushClient.isNil(): - return err("wakuLegacyLightpushClient is not mounted") - - let remotePeer = parsePeerInfo(peer).valueOr: - return err("lightpushPublish failed to parse peer addr: " & $error) - - let msgHashHex = ( - await self.node.wakuLegacyLightpushClient.publish( - pubsubTopic, message, remotePeer - ) - ).valueOr: - return err($error) - - return ok(msgHashHex) - except CatchableError as e: - return err(e.msg) - -# --- store --- -proc storeQuery*( - self: Waku, request: StoreQueryRequest, peer: string, timeoutMs: int -): Future[Result[StoreQueryResponse, string]] {.async.} = - try: - if self.node.wakuStoreClient.isNil(): - return err("wakuStoreClient is not mounted") - - let remotePeer = parsePeerInfo(peer).valueOr: - return err("storeQuery failed to parse peer addr: " & $error) - - let queryFut = self.node.wakuStoreClient.query(request, remotePeer) - if not await queryFut.withTimeout(timeoutMs.milliseconds): - return err("storeQuery timed out") - - let queryResponse = queryFut.read().valueOr: - return err("storeQuery failed: " & $error) - - return ok(queryResponse) - except CatchableError as e: - return err(e.msg) - -# --- peer management --- -proc connect*( - self: Waku, peers: seq[string], timeoutMs: uint32 -): Future[Result[bool, string]] {.async.} = - try: - await self.node.connectToNodes(peers.mapIt(strip(it)), source = "static") - return ok(true) - except CatchableError as e: - return err(e.msg) - -proc disconnectPeerById*( - self: Waku, peerId: string -): Future[Result[bool, string]] {.async.} = - try: - let pId = PeerId.init(peerId).valueOr: - return err($error) - await self.node.peerManager.disconnectNode(pId) - return ok(true) - except CatchableError as e: - return err(e.msg) - -proc disconnectAllPeers*(self: Waku): Future[Result[bool, string]] {.async.} = - try: - await self.node.peerManager.disconnectAllPeers() - return ok(true) - except CatchableError as e: - return err(e.msg) - -proc dialPeer*( - self: Waku, peerAddr: string, protocol: string, timeoutMs: int -): Future[Result[bool, string]] {.async.} = - try: - let remotePeerInfo = parsePeerInfo(peerAddr).valueOr: - return err($error) - let conn = await self.node.peerManager.dialPeer(remotePeerInfo, protocol) - if conn.isNone(): - return err("failed dialing peer") - return ok(true) - except CatchableError as e: - return err(e.msg) - -proc dialPeerById*( - self: Waku, peerId: string, protocol: string, timeoutMs: int -): Future[Result[bool, string]] {.async.} = - try: - let pId = PeerId.init(peerId).valueOr: - return err($error) - let conn = await self.node.peerManager.dialPeer(pId, protocol) - if conn.isNone(): - return err("failed dialing peer") - return ok(true) - except CatchableError as e: - return err(e.msg) - -proc peerIdsFromPeerstore*(self: Waku): Future[Result[seq[string], string]] {.async.} = - try: - return ok(self.node.peerManager.switch.peerStore.peers().mapIt($it.peerId)) - except CatchableError as e: - return err(e.msg) - -proc connectedPeersInfo*(self: Waku): Future[Result[seq[string], string]] {.async.} = - try: - return ok( - self.node.peerManager.switch.peerStore - .peers() - .filterIt(it.connectedness == Connected) - .mapIt($it.peerId) - ) - except CatchableError as e: - return err(e.msg) - -proc connectedPeers*(self: Waku): Future[Result[seq[string], string]] {.async.} = - try: - let (inPeerIds, outPeerIds) = self.node.peerManager.connectedPeers() - return ok(concat(inPeerIds, outPeerIds).mapIt($it)) - except CatchableError as e: - return err(e.msg) - -proc peerIdsByProtocol*( - self: Waku, protocol: string -): Future[Result[seq[string], string]] {.async.} = - try: - return ok( - self.node.peerManager.switch.peerStore - .peers(protocol) - .filterIt(it.connectedness == Connected) - .mapIt($it.peerId) - ) - except CatchableError as e: - return err(e.msg) - -# --- discovery --- -proc dnsDiscovery*( - self: Waku, enrTreeUrl: string, nameServer: string, timeoutMs: int -): Future[Result[seq[string], string]] {.async.} = - try: - let dnsNameServers = @[parseIpAddress(nameServer)] - let discoveredPeers = ( - await retrieveDynamicBootstrapNodes(enrTreeUrl, dnsNameServers) - ).valueOr: - return err("failed discovering peers from DNS: " & $error) - - var multiAddresses = newSeq[string]() - for discPeer in discoveredPeers: - for address in discPeer.addrs: - multiAddresses.add($address & "/p2p/" & $discPeer) - - return ok(multiAddresses) - except CatchableError as e: - return err(e.msg) - -proc discv5UpdateBootnodes*( - self: Waku, bootnodes: seq[string] -): Future[Result[bool, string]] {.async.} = - try: - if self.wakuDiscv5.isNil(): - return err("discv5 not started") - let jsonArray = "[" & bootnodes.mapIt("\"" & it & "\"").join(",") & "]" - self.wakuDiscv5.updateBootstrapRecords(jsonArray).isOkOr: - return err("error in discv5UpdateBootnodes: " & $error) - return ok(true) - except CatchableError as e: - return err(e.msg) - -proc startDiscv5*(self: Waku): Future[Result[bool, string]] {.async.} = - try: - if self.wakuDiscv5.isNil(): - return err("discv5 not started") - (await self.wakuDiscv5.start()).isOkOr: - return err("error starting discv5: " & $error) - return ok(true) - except CatchableError as e: - return err(e.msg) - -proc stopDiscv5*(self: Waku): Future[Result[bool, string]] {.async.} = - try: - if self.wakuDiscv5.isNil(): - return err("discv5 not started") - await self.wakuDiscv5.stop() - return ok(true) - except CatchableError as e: - return err(e.msg) - -proc peerExchangeRequest*( - self: Waku, numPeers: uint64 -): Future[Result[int, string]] {.async.} = - try: - let numPeersRecv = (await self.node.fetchPeerExchangePeers(numPeers)).valueOr: - return err("failed peer exchange: " & $error) - return ok(numPeersRecv) - except CatchableError as e: - return err(e.msg) - -# --- debug / info --- -proc version*(self: Waku): Future[Result[string, string]] {.async.} = - return ok(WakuNodeVersionString) - -proc listenAddresses*(self: Waku): Future[Result[seq[string], string]] {.async.} = - try: - return ok(self.node.info().listenAddresses) - except CatchableError as e: - return err(e.msg) - -proc myEnr*(self: Waku): Future[Result[string, string]] {.async.} = - try: - return ok(self.node.enr.toURI()) - except CatchableError as e: - return err(e.msg) - -proc myPeerId*(self: Waku): Future[Result[string, string]] {.async.} = - try: - return ok($self.node.peerId()) - except CatchableError as e: - return err(e.msg) - -proc metrics*(self: Waku): Future[Result[string, string]] {.async.} = - {.gcsafe.}: - try: - return ok(defaultRegistry.toText()) - except CatchableError as e: - return err(e.msg) - -proc pingPeer*( - self: Waku, peerAddr: string, timeoutMs: int -): Future[Result[int64, string]] {.async.} = - try: - let peerInfo = parsePeerInfo(peerAddr).valueOr: - return err("pingPeer failed to parse peer addr: " & $error) - - let conn = await self.node.switch.dial(peerInfo.peerId, peerInfo.addrs, PingCodec) - defer: - await conn.close() - let pingRTT = await self.node.libp2pPing.ping(conn) - - if pingRTT == 0.nanos: - return err("could not ping peer: rtt-0") - - return ok(pingRTT.nanos) - except CatchableError as e: - return err(e.msg) - {.pop.} diff --git a/logos_delivery/waku/waku_filter_v2/client.nim b/logos_delivery/waku/waku_filter_v2/client.nim index ddae363a3..51ed18af3 100644 --- a/logos_delivery/waku/waku_filter_v2/client.nim +++ b/logos_delivery/waku/waku_filter_v2/client.nim @@ -14,7 +14,7 @@ import brokers/broker_context import - logos_delivery/waku/[node/peer_manager, waku_core, events/delivery_events], + logos_delivery/waku/[node/peer_manager, waku_core, api/events/filter_subscribe_events], ./common, ./protocol_metrics, ./rpc_codec, diff --git a/logos_delivery/waku/waku_relay/protocol.nim b/logos_delivery/waku/waku_relay/protocol.nim index e677ec5a0..409b02929 100644 --- a/logos_delivery/waku/waku_relay/protocol.nim +++ b/logos_delivery/waku/waku_relay/protocol.nim @@ -24,9 +24,9 @@ import logos_delivery/waku/waku_core, logos_delivery/waku/node/health_monitor/topic_health, logos_delivery/waku/requests/health_requests, - logos_delivery/waku/events/health_events, + logos_delivery/waku/api/events/health_events, ./message_id, - logos_delivery/waku/events/peer_events + logos_delivery/waku/api/events/peer_events from logos_delivery/waku/waku_core/codecs import WakuRelayCodec export WakuRelayCodec diff --git a/tests/api/test_api_health.nim b/tests/api/test_api_health.nim index 83efedd18..27030d432 100644 --- a/tests/api/test_api_health.nim +++ b/tests/api/test_api_health.nim @@ -12,7 +12,7 @@ import [topic_health, health_status, protocol_health, health_report], logos_delivery/waku/requests/health_requests, logos_delivery/waku/requests/node_requests, - logos_delivery/waku/events/health_events, + logos_delivery/waku/api/events/health_events, logos_delivery/waku/common/waku_protocol, logos_delivery/waku/factory/waku_conf import tools/confutils/cli_args diff --git a/tests/api/test_api_receive.nim b/tests/api/test_api_receive.nim index 41c0f0477..b1e7635d8 100644 --- a/tests/api/test_api_receive.nim +++ b/tests/api/test_api_receive.nim @@ -14,8 +14,7 @@ import logos_delivery/waku/[ waku_node, waku_core, - events/message_events, - events/health_events, + api/events/health_events, waku_relay/protocol, waku_archive, waku_archive/common as archive_common, diff --git a/tests/api/test_api_subscription.nim b/tests/api/test_api_subscription.nim index bf2851b02..ca9d2c9b3 100644 --- a/tests/api/test_api_subscription.nim +++ b/tests/api/test_api_subscription.nim @@ -12,7 +12,6 @@ import logos_delivery/waku/[ waku_node, waku_core, - events/message_events, waku_relay/protocol, node/waku_node/filter, node/subscription_manager, diff --git a/tests/channels/test_reliable_channel_send_receive.nim b/tests/channels/test_reliable_channel_send_receive.nim index b3d9a8f00..17ceb56ad 100644 --- a/tests/channels/test_reliable_channel_send_receive.nim +++ b/tests/channels/test_reliable_channel_send_receive.nim @@ -10,7 +10,7 @@ import ../testlib/[common, wakucore, wakunode, testasync] import logos_delivery import logos_delivery/waku/[waku_node, waku_core] import logos_delivery/waku/factory/waku_conf -import logos_delivery/waku/events/message_events as waku_message_events +import logos_delivery/messaging/api/events as waku_message_events import tools/confutils/cli_args import logos_delivery/channels/reliable_channel_manager diff --git a/tests/node/test_wakunode_health_monitor.nim b/tests/node/test_wakunode_health_monitor.nim index 04a39455a..acf0bc09a 100644 --- a/tests/node/test_wakunode_health_monitor.nim +++ b/tests/node/test_wakunode_health_monitor.nim @@ -19,8 +19,8 @@ import node/waku_node/store, node/waku_node/lightpush, node/waku_node/filter, - events/health_events, - events/peer_events, + api/events/health_events, + api/events/peer_events, waku_archive, ]