mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-06-27 20:10:02 +00:00
Reshape per-layer API into api/ folders and thin the FFI over them
Each layer now separates its constructible core from its public surface:
- core module (waku.nim / messaging_client.nim /
reliable_channel_manager.nim): the type plus new/start/stop and the
private construction helpers.
- api/ folder: one module per differentiated set of operations
(waku: topics/relay/filter/lightpush/store/peer_manager/discovery/
debug/health) plus an events surface.
The waku api is reshaped to be the complete operation surface the C
bindings need, so the library no longer reaches into node internals:
relayPublish returns the message hash, relaySubscribe takes an optional
handler, filter/lightpush auto-select the service peer, connectedPeersInfo
returns structured data, pingPeer honours the timeout, plus
relayNumPeersInMesh / relayNumConnectedPeers / isOnline. library/ is now a
thin C-ABI shim: each {.ffi.} proc only marshals cstring/JSON/callbacks and
delegates to ctx.myLib[].waku.<op> (or messagingClient.<op>).
app_callbacks re-exports the modules defining its handler types, which the
included FFI files previously relied on by leakage.
Events move next to the surface that owns them, with each dependency kept
pointing the right way:
- waku/events/ relocated under waku/api/events/.
- channel events live in channels/api/events.nim.
- the four messaging-level message events move to messaging/api/events;
MessageSeenEvent stays in waku because it is emitted by waku core, so
moving it would make waku depend on the messaging layer.
- delivery_events renamed to filter_subscribe_events to match the
OnFilterSubscribe/Unsubscribe events it actually declares.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
parent
6d35800fce
commit
aca652008a
@ -1,53 +1,46 @@
|
||||
import std/json
|
||||
import
|
||||
chronicles,
|
||||
chronos,
|
||||
results,
|
||||
eth/p2p/discoveryv5/enr,
|
||||
strutils,
|
||||
libp2p/peerid,
|
||||
metrics,
|
||||
ffi
|
||||
import
|
||||
logos_delivery/waku/waku,
|
||||
logos_delivery/waku/node/waku_node,
|
||||
logos_delivery/waku/node/health_monitor,
|
||||
library/declare_lib
|
||||
|
||||
proc getMultiaddresses(node: WakuNode): seq[string] =
|
||||
return node.info().listenAddresses
|
||||
|
||||
proc getMetrics(): string =
|
||||
{.gcsafe.}:
|
||||
return defaultRegistry.toText() ## defaultRegistry is {.global.} in metrics module
|
||||
import std/strutils
|
||||
import chronos, results, ffi
|
||||
import logos_delivery, library/declare_lib
|
||||
|
||||
proc waku_version(
|
||||
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
|
||||
) {.ffi.} =
|
||||
return ok(WakuNodeVersionString)
|
||||
let v = (await ctx.myLib[].waku.version()).valueOr:
|
||||
return err(error)
|
||||
return ok(v)
|
||||
|
||||
proc waku_listen_addresses(
|
||||
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
|
||||
) {.ffi.} =
|
||||
## returns a comma-separated string of the listen addresses
|
||||
return ok(ctx.myLib[].waku.node.getMultiaddresses().join(","))
|
||||
let addrs = (await ctx.myLib[].waku.listenAddresses()).valueOr:
|
||||
return err(error)
|
||||
return ok(addrs.join(","))
|
||||
|
||||
proc waku_get_my_enr(
|
||||
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
|
||||
) {.ffi.} =
|
||||
return ok(ctx.myLib[].waku.node.enr.toURI())
|
||||
let enrUri = (await ctx.myLib[].waku.myEnr()).valueOr:
|
||||
return err(error)
|
||||
return ok(enrUri)
|
||||
|
||||
proc waku_get_my_peerid(
|
||||
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
|
||||
) {.ffi.} =
|
||||
return ok($ctx.myLib[].waku.node.peerId())
|
||||
let peerId = (await ctx.myLib[].waku.myPeerId()).valueOr:
|
||||
return err(error)
|
||||
return ok(peerId)
|
||||
|
||||
proc waku_get_metrics(
|
||||
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
|
||||
) {.ffi.} =
|
||||
return ok(getMetrics())
|
||||
let m = (await ctx.myLib[].waku.metrics()).valueOr:
|
||||
return err(error)
|
||||
return ok(m)
|
||||
|
||||
proc waku_is_online(
|
||||
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
|
||||
) {.ffi.} =
|
||||
return ok($ctx.myLib[].waku.healthMonitor.onlineMonitor.amIOnline())
|
||||
let online = (await ctx.myLib[].waku.isOnline()).valueOr:
|
||||
return err(error)
|
||||
return ok($online)
|
||||
|
||||
@ -1,42 +1,6 @@
|
||||
import logos_delivery/waku/compat/option_valueor
|
||||
import std/json
|
||||
import chronos, chronicles, results, strutils, libp2p/multiaddress, ffi
|
||||
import
|
||||
logos_delivery/waku/waku,
|
||||
logos_delivery/waku/discovery/waku_dnsdisc,
|
||||
logos_delivery/waku/discovery/waku_discv5,
|
||||
logos_delivery/waku/waku_core/peers,
|
||||
logos_delivery/waku/waku_node,
|
||||
library/declare_lib
|
||||
|
||||
proc retrieveBootstrapNodes(
|
||||
enrTreeUrl: string, ipDnsServer: string
|
||||
): Future[Result[seq[string], string]] {.async.} =
|
||||
let dnsNameServers = @[parseIpAddress(ipDnsServer)]
|
||||
let discoveredPeers: seq[RemotePeerInfo] = (
|
||||
await retrieveDynamicBootstrapNodes(enrTreeUrl, dnsNameServers)
|
||||
).valueOr:
|
||||
return err("failed discovering peers from DNS: " & $error)
|
||||
|
||||
var multiAddresses = newSeq[string]()
|
||||
|
||||
for discPeer in discoveredPeers:
|
||||
for address in discPeer.addrs:
|
||||
multiAddresses.add($address & "/p2p/" & $discPeer)
|
||||
|
||||
return ok(multiAddresses)
|
||||
|
||||
proc updateDiscv5BootstrapNodes(nodes: string, waku: Waku): Result[void, string] =
|
||||
waku.wakuDiscv5.updateBootstrapRecords(nodes).isOkOr:
|
||||
return err("error in updateDiscv5BootstrapNodes: " & $error)
|
||||
return ok()
|
||||
|
||||
proc performPeerExchangeRequestTo*(
|
||||
numPeers: uint64, waku: Waku
|
||||
): Future[Result[int, string]] {.async.} =
|
||||
let numPeersRecv = (await waku.node.fetchPeerExchangePeers(numPeers)).valueOr:
|
||||
return err($error)
|
||||
return ok(numPeersRecv)
|
||||
import std/strutils
|
||||
import chronos, chronicles, results, ffi
|
||||
import logos_delivery, library/declare_lib
|
||||
|
||||
proc waku_discv5_update_bootnodes(
|
||||
ctx: ptr FFIContext[LogosDelivery],
|
||||
@ -46,11 +10,9 @@ proc waku_discv5_update_bootnodes(
|
||||
) {.ffi.} =
|
||||
## Updates the bootnode list used for discovering new peers via DiscoveryV5
|
||||
## bootnodes - JSON array containing the bootnode ENRs i.e. `["enr:...", "enr:..."]`
|
||||
|
||||
updateDiscv5BootstrapNodes($bootnodes, ctx.myLib[].waku).isOkOr:
|
||||
(await ctx.myLib[].waku.discv5UpdateBootnodes($bootnodes)).isOkOr:
|
||||
error "UPDATE_DISCV5_BOOTSTRAP_NODES failed", error = error
|
||||
return err($error)
|
||||
|
||||
return err(error)
|
||||
return ok("discovery request processed correctly")
|
||||
|
||||
proc waku_dns_discovery(
|
||||
@ -61,26 +23,28 @@ proc waku_dns_discovery(
|
||||
nameDnsServer: cstring,
|
||||
timeoutMs: cint,
|
||||
) {.ffi.} =
|
||||
let nodes = (await retrieveBootstrapNodes($enrTreeUrl, $nameDnsServer)).valueOr:
|
||||
let nodes = (
|
||||
await ctx.myLib[].waku.dnsDiscovery($enrTreeUrl, $nameDnsServer, int(timeoutMs))
|
||||
).valueOr:
|
||||
error "GET_BOOTSTRAP_NODES failed", error = error
|
||||
return err($error)
|
||||
|
||||
return err(error)
|
||||
## returns a comma-separated string of bootstrap nodes' multiaddresses
|
||||
return ok(nodes.join(","))
|
||||
|
||||
proc waku_start_discv5(
|
||||
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
|
||||
) {.ffi.} =
|
||||
(await ctx.myLib[].waku.wakuDiscv5.start()).isOkOr:
|
||||
(await ctx.myLib[].waku.startDiscv5()).isOkOr:
|
||||
error "START_DISCV5 failed", error = error
|
||||
return err("error starting discv5: " & $error)
|
||||
|
||||
return err(error)
|
||||
return ok("discv5 started correctly")
|
||||
|
||||
proc waku_stop_discv5(
|
||||
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
|
||||
) {.ffi.} =
|
||||
await ctx.myLib[].waku.wakuDiscv5.stop()
|
||||
(await ctx.myLib[].waku.stopDiscv5()).isOkOr:
|
||||
error "STOP_DISCV5 failed", error = error
|
||||
return err(error)
|
||||
return ok("discv5 stopped correctly")
|
||||
|
||||
proc waku_peer_exchange_request(
|
||||
@ -89,8 +53,7 @@ proc waku_peer_exchange_request(
|
||||
userData: pointer,
|
||||
numPeers: uint64,
|
||||
) {.ffi.} =
|
||||
let numValidPeers = (await performPeerExchangeRequestTo(numPeers, ctx.myLib[].waku)).valueOr:
|
||||
let numValidPeers = (await ctx.myLib[].waku.peerExchangeRequest(numPeers)).valueOr:
|
||||
error "waku_peer_exchange_request failed", error = error
|
||||
return err("failed peer exchange: " & $error)
|
||||
|
||||
return err(error)
|
||||
return ok($numValidPeers)
|
||||
|
||||
@ -1,11 +1,6 @@
|
||||
import logos_delivery/waku/compat/option_valueor
|
||||
import std/[sequtils, strutils, tables]
|
||||
import chronicles, chronos, results, options, json, ffi
|
||||
import
|
||||
logos_delivery/waku/waku,
|
||||
logos_delivery/waku/node/waku_node,
|
||||
logos_delivery/waku/node/peer_manager,
|
||||
library/declare_lib
|
||||
import std/[strutils, tables, json]
|
||||
import chronicles, chronos, results, ffi
|
||||
import logos_delivery, library/declare_lib
|
||||
|
||||
type PeerInfo = object
|
||||
protocols: seq[string]
|
||||
@ -15,11 +10,9 @@ proc waku_get_peerids_from_peerstore(
|
||||
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
|
||||
) {.ffi.} =
|
||||
## returns a comma-separated string of peerIDs
|
||||
let peerIDs = ctx.myLib[].waku.node.peerManager.switch.peerStore
|
||||
.peers()
|
||||
.mapIt($it.peerId)
|
||||
.join(",")
|
||||
return ok(peerIDs)
|
||||
let peerIds = (await ctx.myLib[].waku.peerIdsFromPeerstore()).valueOr:
|
||||
return err(error)
|
||||
return ok(peerIds.join(","))
|
||||
|
||||
proc waku_connect(
|
||||
ctx: ptr FFIContext[LogosDelivery],
|
||||
@ -28,8 +21,9 @@ proc waku_connect(
|
||||
peerMultiAddr: cstring,
|
||||
timeoutMs: cuint,
|
||||
) {.ffi.} =
|
||||
let peers = ($peerMultiAddr).split(",").mapIt(strip(it))
|
||||
await ctx.myLib[].waku.node.connectToNodes(peers, source = "static")
|
||||
let peers = ($peerMultiAddr).split(",")
|
||||
(await ctx.myLib[].waku.connect(peers, uint32(timeoutMs))).isOkOr:
|
||||
return err(error)
|
||||
return ok("")
|
||||
|
||||
proc waku_disconnect_peer_by_id(
|
||||
@ -38,16 +32,16 @@ proc waku_disconnect_peer_by_id(
|
||||
userData: pointer,
|
||||
peerId: cstring,
|
||||
) {.ffi.} =
|
||||
let pId = PeerId.init($peerId).valueOr:
|
||||
error "DISCONNECT_PEER_BY_ID failed", error = $error
|
||||
return err($error)
|
||||
await ctx.myLib[].waku.node.peerManager.disconnectNode(pId)
|
||||
(await ctx.myLib[].waku.disconnectPeerById($peerId)).isOkOr:
|
||||
error "DISCONNECT_PEER_BY_ID failed", error = error
|
||||
return err(error)
|
||||
return ok("")
|
||||
|
||||
proc waku_disconnect_all_peers(
|
||||
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
|
||||
) {.ffi.} =
|
||||
await ctx.myLib[].waku.node.peerManager.disconnectAllPeers()
|
||||
(await ctx.myLib[].waku.disconnectAllPeers()).isOkOr:
|
||||
return err(error)
|
||||
return ok("")
|
||||
|
||||
proc waku_dial_peer(
|
||||
@ -58,14 +52,9 @@ proc waku_dial_peer(
|
||||
protocol: cstring,
|
||||
timeoutMs: cuint,
|
||||
) {.ffi.} =
|
||||
let remotePeerInfo = parsePeerInfo($peerMultiAddr).valueOr:
|
||||
error "DIAL_PEER failed", error = $error
|
||||
return err($error)
|
||||
let conn = await ctx.myLib[].waku.node.peerManager.dialPeer(remotePeerInfo, $protocol)
|
||||
if conn.isNone():
|
||||
let msg = "failed dialing peer"
|
||||
error "DIAL_PEER failed", error = msg, peerId = $remotePeerInfo.peerId
|
||||
return err(msg)
|
||||
(await ctx.myLib[].waku.dialPeer($peerMultiAddr, $protocol, int(timeoutMs))).isOkOr:
|
||||
error "DIAL_PEER failed", error = error
|
||||
return err(error)
|
||||
return ok("")
|
||||
|
||||
proc waku_dial_peer_by_id(
|
||||
@ -76,47 +65,32 @@ proc waku_dial_peer_by_id(
|
||||
protocol: cstring,
|
||||
timeoutMs: cuint,
|
||||
) {.ffi.} =
|
||||
let pId = PeerId.init($peerId).valueOr:
|
||||
error "DIAL_PEER_BY_ID failed", error = $error
|
||||
return err($error)
|
||||
let conn = await ctx.myLib[].waku.node.peerManager.dialPeer(pId, $protocol)
|
||||
if conn.isNone():
|
||||
let msg = "failed dialing peer"
|
||||
error "DIAL_PEER_BY_ID failed", error = msg, peerId = $peerId
|
||||
return err(msg)
|
||||
|
||||
(await ctx.myLib[].waku.dialPeerById($peerId, $protocol, int(timeoutMs))).isOkOr:
|
||||
error "DIAL_PEER_BY_ID failed", error = error
|
||||
return err(error)
|
||||
return ok("")
|
||||
|
||||
proc waku_get_connected_peers_info(
|
||||
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
|
||||
) {.ffi.} =
|
||||
## returns a JSON string mapping peerIDs to objects with protocols and addresses
|
||||
let peers = (await ctx.myLib[].waku.connectedPeersInfo()).valueOr:
|
||||
return err(error)
|
||||
|
||||
var peersMap = initTable[string, PeerInfo]()
|
||||
let peers = ctx.myLib[].waku.node.peerManager.switch.peerStore.peers().filterIt(
|
||||
it.connectedness == Connected
|
||||
)
|
||||
|
||||
# Build a map of peer IDs to peer info objects
|
||||
for peer in peers:
|
||||
let peerIdStr = $peer.peerId
|
||||
peersMap[peerIdStr] =
|
||||
PeerInfo(protocols: peer.protocols, addresses: peer.addrs.mapIt($it))
|
||||
peersMap[peer.peerId] =
|
||||
PeerInfo(protocols: peer.protocols, addresses: peer.addresses)
|
||||
|
||||
# Convert the map to JSON string
|
||||
let jsonObj = %*peersMap
|
||||
let jsonStr = $jsonObj
|
||||
return ok(jsonStr)
|
||||
return ok($(%*peersMap))
|
||||
|
||||
proc waku_get_connected_peers(
|
||||
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
|
||||
) {.ffi.} =
|
||||
## returns a comma-separated string of peerIDs
|
||||
let
|
||||
(inPeerIds, outPeerIds) = ctx.myLib[].waku.node.peerManager.connectedPeers()
|
||||
connectedPeerids = concat(inPeerIds, outPeerIds)
|
||||
|
||||
return ok(connectedPeerids.mapIt($it).join(","))
|
||||
let peerIds = (await ctx.myLib[].waku.connectedPeers()).valueOr:
|
||||
return err(error)
|
||||
return ok(peerIds.join(","))
|
||||
|
||||
proc waku_get_peerids_by_protocol(
|
||||
ctx: ptr FFIContext[LogosDelivery],
|
||||
@ -125,9 +99,6 @@ proc waku_get_peerids_by_protocol(
|
||||
protocol: cstring,
|
||||
) {.ffi.} =
|
||||
## returns a comma-separated string of peerIDs that mount the given protocol
|
||||
let connectedPeers = ctx.myLib[].waku.node.peerManager.switch.peerStore
|
||||
.peers($protocol)
|
||||
.filterIt(it.connectedness == Connected)
|
||||
.mapIt($it.peerId)
|
||||
.join(",")
|
||||
return ok(connectedPeers)
|
||||
let peerIds = (await ctx.myLib[].waku.peerIdsByProtocol($protocol)).valueOr:
|
||||
return err(error)
|
||||
return ok(peerIds.join(","))
|
||||
|
||||
@ -1,7 +1,5 @@
|
||||
import std/[json, strutils]
|
||||
import chronos, results, ffi
|
||||
import libp2p/[protocols/ping, switch, multiaddress, multicodec]
|
||||
import logos_delivery/waku/[waku, waku_core/peers, node/waku_node], library/declare_lib
|
||||
import logos_delivery, library/declare_lib
|
||||
|
||||
proc waku_ping_peer(
|
||||
ctx: ptr FFIContext[LogosDelivery],
|
||||
@ -10,35 +8,6 @@ proc waku_ping_peer(
|
||||
peerAddr: cstring,
|
||||
timeoutMs: cuint,
|
||||
) {.ffi.} =
|
||||
let peerInfo = peers.parsePeerInfo(($peerAddr).split(",")).valueOr:
|
||||
return err("PingRequest failed to parse peer addr: " & $error)
|
||||
|
||||
let timeout = chronos.milliseconds(timeoutMs)
|
||||
proc ping(): Future[Result[Duration, string]] {.async, gcsafe.} =
|
||||
try:
|
||||
let conn = await ctx.myLib[].waku.node.switch.dial(
|
||||
peerInfo.peerId, peerInfo.addrs, PingCodec
|
||||
)
|
||||
defer:
|
||||
await conn.close()
|
||||
|
||||
let pingRTT = await ctx.myLib[].waku.node.libp2pPing.ping(conn)
|
||||
if pingRTT == 0.nanos:
|
||||
return err("could not ping peer: rtt-0")
|
||||
return ok(pingRTT)
|
||||
except CatchableError as exc:
|
||||
return err("could not ping peer: " & exc.msg)
|
||||
|
||||
let pingFuture = ping()
|
||||
let pingRTT: Duration =
|
||||
if timeout == chronos.milliseconds(0): # No timeout expected
|
||||
(await pingFuture).valueOr:
|
||||
return err("ping failed, no timeout expected: " & error)
|
||||
else:
|
||||
let timedOut = not (await pingFuture.withTimeout(timeout))
|
||||
if timedOut:
|
||||
return err("ping timed out")
|
||||
pingFuture.read().valueOr:
|
||||
return err("failed to read ping future: " & error)
|
||||
|
||||
return ok($(pingRTT.nanos))
|
||||
let rttNanos = (await ctx.myLib[].waku.pingPeer($peerAddr, int(timeoutMs))).valueOr:
|
||||
return err(error)
|
||||
return ok($rttNanos)
|
||||
|
||||
@ -1,29 +1,14 @@
|
||||
import logos_delivery/waku/compat/option_valueor
|
||||
import options, std/[strutils, sequtils]
|
||||
import std/[strutils, sequtils]
|
||||
import chronicles, chronos, results, ffi
|
||||
import
|
||||
logos_delivery/waku/waku_filter_v2/client,
|
||||
logos_delivery,
|
||||
logos_delivery/waku/waku_core/message/message,
|
||||
logos_delivery/waku/waku,
|
||||
logos_delivery/waku/waku_relay,
|
||||
logos_delivery/waku/waku_filter_v2/common,
|
||||
logos_delivery/waku/waku_core/subscription/push_handler,
|
||||
logos_delivery/waku/node/peer_manager/peer_manager,
|
||||
logos_delivery/waku/waku_node,
|
||||
logos_delivery/waku/waku_core/topics/pubsub_topic,
|
||||
logos_delivery/waku/waku_core/topics/content_topic,
|
||||
library/events/json_message_event,
|
||||
library/declare_lib
|
||||
|
||||
const FilterOpTimeout = 5.seconds
|
||||
|
||||
proc checkFilterClientMounted(waku: Waku): Result[string, string] =
|
||||
if waku.node.wakuFilterClient.isNil():
|
||||
let errorMsg = "wakuFilterClient is not mounted"
|
||||
error "fail filter process", error = errorMsg
|
||||
return err(errorMsg)
|
||||
return ok("")
|
||||
|
||||
proc waku_filter_subscribe(
|
||||
ctx: ptr FFIContext[LogosDelivery],
|
||||
callback: FFICallBack,
|
||||
@ -31,33 +16,20 @@ proc waku_filter_subscribe(
|
||||
pubSubTopic: cstring,
|
||||
contentTopics: cstring,
|
||||
) {.ffi.} =
|
||||
proc onReceivedMessage(ctx: ptr FFIContext): WakuRelayHandler =
|
||||
proc onReceivedMessage(ctx: ptr FFIContext[LogosDelivery]): FilterPushHandler =
|
||||
return proc(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async.} =
|
||||
callEventCallback(ctx, "onReceivedMessage"):
|
||||
$JsonMessageEvent.new(pubsubTopic, msg)
|
||||
|
||||
checkFilterClientMounted(ctx.myLib[].waku).isOkOr:
|
||||
return err($error)
|
||||
|
||||
var filterPushEventCallback = FilterPushHandler(onReceivedMessage(ctx))
|
||||
ctx.myLib[].waku.node.wakuFilterClient.registerPushHandler(filterPushEventCallback)
|
||||
|
||||
let peer = ctx.myLib[].waku.node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr:
|
||||
let errorMsg = "could not find peer with WakuFilterSubscribeCodec when subscribing"
|
||||
error "fail filter subscribe", error = errorMsg
|
||||
return err(errorMsg)
|
||||
|
||||
let subFut = ctx.myLib[].waku.node.filterSubscribe(
|
||||
some(PubsubTopic($pubsubTopic)),
|
||||
($contentTopics).split(",").mapIt(ContentTopic(it)),
|
||||
peer,
|
||||
)
|
||||
if not await subFut.withTimeout(FilterOpTimeout):
|
||||
let errorMsg = "filter subscription timed out"
|
||||
error "fail filter unsubscribe", error = errorMsg
|
||||
|
||||
return err(errorMsg)
|
||||
|
||||
(
|
||||
await ctx.myLib[].waku.filterSubscribe(
|
||||
PubsubTopic($pubSubTopic),
|
||||
($contentTopics).split(",").mapIt(ContentTopic(it)),
|
||||
FilterPushHandler(onReceivedMessage(ctx)),
|
||||
)
|
||||
).isOkOr:
|
||||
error "fail filter subscribe", error = error
|
||||
return err(error)
|
||||
return ok("")
|
||||
|
||||
proc waku_filter_unsubscribe(
|
||||
@ -67,43 +39,19 @@ proc waku_filter_unsubscribe(
|
||||
pubSubTopic: cstring,
|
||||
contentTopics: cstring,
|
||||
) {.ffi.} =
|
||||
checkFilterClientMounted(ctx.myLib[].waku).isOkOr:
|
||||
return err($error)
|
||||
|
||||
let peer = ctx.myLib[].waku.node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr:
|
||||
let errorMsg =
|
||||
"could not find peer with WakuFilterSubscribeCodec when unsubscribing"
|
||||
error "fail filter process", error = errorMsg
|
||||
return err(errorMsg)
|
||||
|
||||
let subFut = ctx.myLib[].waku.node.filterUnsubscribe(
|
||||
some(PubsubTopic($pubsubTopic)),
|
||||
($contentTopics).split(",").mapIt(ContentTopic(it)),
|
||||
peer,
|
||||
)
|
||||
if not await subFut.withTimeout(FilterOpTimeout):
|
||||
let errorMsg = "filter un-subscription timed out"
|
||||
error "fail filter unsubscribe", error = errorMsg
|
||||
return err(errorMsg)
|
||||
(
|
||||
await ctx.myLib[].waku.filterUnsubscribe(
|
||||
PubsubTopic($pubSubTopic), ($contentTopics).split(",").mapIt(ContentTopic(it))
|
||||
)
|
||||
).isOkOr:
|
||||
error "fail filter unsubscribe", error = error
|
||||
return err(error)
|
||||
return ok("")
|
||||
|
||||
proc waku_filter_unsubscribe_all(
|
||||
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
|
||||
) {.ffi.} =
|
||||
checkFilterClientMounted(ctx.myLib[].waku).isOkOr:
|
||||
return err($error)
|
||||
|
||||
let peer = ctx.myLib[].waku.node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr:
|
||||
let errorMsg =
|
||||
"could not find peer with WakuFilterSubscribeCodec when unsubscribing all"
|
||||
error "fail filter unsubscribe all", error = errorMsg
|
||||
return err(errorMsg)
|
||||
|
||||
let unsubFut = ctx.myLib[].waku.node.filterUnsubscribeAll(peer)
|
||||
|
||||
if not await unsubFut.withTimeout(FilterOpTimeout):
|
||||
let errorMsg = "filter un-subscription all timed out"
|
||||
error "fail filter unsubscribe all", error = errorMsg
|
||||
|
||||
return err(errorMsg)
|
||||
(await ctx.myLib[].waku.filterUnsubscribeAll()).isOkOr:
|
||||
error "fail filter unsubscribe all", error = error
|
||||
return err(error)
|
||||
return ok("")
|
||||
|
||||
@ -1,14 +1,9 @@
|
||||
import logos_delivery/waku/compat/option_valueor
|
||||
import options, std/[json, strformat]
|
||||
import std/[json, strformat]
|
||||
import chronicles, chronos, results, ffi
|
||||
import
|
||||
logos_delivery/waku/waku_core/message/message,
|
||||
logos_delivery/waku/waku_core/codecs,
|
||||
logos_delivery/waku/waku,
|
||||
logos_delivery,
|
||||
logos_delivery/waku/waku_core/message,
|
||||
logos_delivery/waku/waku_core/topics/pubsub_topic,
|
||||
logos_delivery/waku/waku_lightpush_legacy/client,
|
||||
logos_delivery/waku/node/peer_manager/peer_manager,
|
||||
library/events/json_message_event,
|
||||
library/declare_lib
|
||||
|
||||
@ -19,11 +14,6 @@ proc waku_lightpush_publish(
|
||||
pubSubTopic: cstring,
|
||||
jsonWakuMessage: cstring,
|
||||
) {.ffi.} =
|
||||
if ctx.myLib[].waku.node.wakuLightpushClient.isNil():
|
||||
let errorMsg = "LightpushRequest waku.node.wakuLightpushClient is nil"
|
||||
error "PUBLISH failed", error = errorMsg
|
||||
return err(errorMsg)
|
||||
|
||||
var jsonMessage: JsonMessage
|
||||
try:
|
||||
let jsonContent = parseJson($jsonWakuMessage)
|
||||
@ -35,18 +25,10 @@ proc waku_lightpush_publish(
|
||||
let msg = json_message_event.toWakuMessage(jsonMessage).valueOr:
|
||||
return err("Problem building the WakuMessage: " & $error)
|
||||
|
||||
let peerOpt = ctx.myLib[].waku.node.peerManager.selectPeer(WakuLightPushCodec)
|
||||
if peerOpt.isNone():
|
||||
let errorMsg = "failed to lightpublish message, no suitable remote peers"
|
||||
error "PUBLISH failed", error = errorMsg
|
||||
return err(errorMsg)
|
||||
|
||||
let msgHashHex = (
|
||||
await ctx.myLib[].waku.node.wakuLegacyLightpushClient.publish(
|
||||
$pubsubTopic, msg, peer = peerOpt.get()
|
||||
)
|
||||
await ctx.myLib[].waku.lightpushPublish(PubsubTopic($pubSubTopic), msg)
|
||||
).valueOr:
|
||||
error "PUBLISH failed", error = error
|
||||
return err($error)
|
||||
return err(error)
|
||||
|
||||
return ok(msgHashHex)
|
||||
|
||||
@ -1,17 +1,10 @@
|
||||
import logos_delivery/waku/compat/option_valueor
|
||||
import std/[net, sequtils, strutils, json], strformat
|
||||
import chronicles, chronos, stew/byteutils, results, ffi
|
||||
import std/[strutils, json]
|
||||
import chronicles, chronos, results, ffi
|
||||
import
|
||||
logos_delivery/waku/waku_core/message/message,
|
||||
logos_delivery/waku/factory/validator_signed,
|
||||
logos_delivery/waku/waku,
|
||||
tools/confutils/cli_args,
|
||||
logos_delivery/waku/waku_core/message,
|
||||
logos_delivery,
|
||||
logos_delivery/waku/waku_core/topics/pubsub_topic,
|
||||
logos_delivery/waku/waku_core/topics,
|
||||
logos_delivery/waku/node/waku_node/relay,
|
||||
logos_delivery/waku/waku_core/message,
|
||||
logos_delivery/waku/waku_relay/protocol,
|
||||
logos_delivery/waku/node/peer_manager,
|
||||
library/events/json_message_event,
|
||||
library/declare_lib
|
||||
|
||||
@ -21,11 +14,11 @@ proc waku_relay_get_peers_in_mesh(
|
||||
userData: pointer,
|
||||
pubSubTopic: cstring,
|
||||
) {.ffi.} =
|
||||
let meshPeers = ctx.myLib[].waku.node.wakuRelay.getPeersInMesh($pubsubTopic).valueOr:
|
||||
let peers = (await ctx.myLib[].waku.relayPeersInMesh(PubsubTopic($pubSubTopic))).valueOr:
|
||||
error "LIST_MESH_PEERS failed", error = error
|
||||
return err($error)
|
||||
return err(error)
|
||||
## returns a comma-separated string of peerIDs
|
||||
return ok(meshPeers.mapIt($it).join(","))
|
||||
return ok(peers.join(","))
|
||||
|
||||
proc waku_relay_get_num_peers_in_mesh(
|
||||
ctx: ptr FFIContext[LogosDelivery],
|
||||
@ -33,10 +26,10 @@ proc waku_relay_get_num_peers_in_mesh(
|
||||
userData: pointer,
|
||||
pubSubTopic: cstring,
|
||||
) {.ffi.} =
|
||||
let numPeersInMesh = ctx.myLib[].waku.node.wakuRelay.getNumPeersInMesh($pubsubTopic).valueOr:
|
||||
let n = (await ctx.myLib[].waku.relayNumPeersInMesh(PubsubTopic($pubSubTopic))).valueOr:
|
||||
error "NUM_MESH_PEERS failed", error = error
|
||||
return err($error)
|
||||
return ok($numPeersInMesh)
|
||||
return err(error)
|
||||
return ok($n)
|
||||
|
||||
proc waku_relay_get_connected_peers(
|
||||
ctx: ptr FFIContext[LogosDelivery],
|
||||
@ -45,11 +38,10 @@ proc waku_relay_get_connected_peers(
|
||||
pubSubTopic: cstring,
|
||||
) {.ffi.} =
|
||||
## Returns the list of all connected peers to an specific pubsub topic
|
||||
let connPeers = ctx.myLib[].waku.node.wakuRelay.getConnectedPeers($pubsubTopic).valueOr:
|
||||
let peers = (await ctx.myLib[].waku.relayConnectedPeers(PubsubTopic($pubSubTopic))).valueOr:
|
||||
error "LIST_CONNECTED_PEERS failed", error = error
|
||||
return err($error)
|
||||
## returns a comma-separated string of peerIDs
|
||||
return ok(connPeers.mapIt($it).join(","))
|
||||
return err(error)
|
||||
return ok(peers.join(","))
|
||||
|
||||
proc waku_relay_get_num_connected_peers(
|
||||
ctx: ptr FFIContext[LogosDelivery],
|
||||
@ -57,10 +49,10 @@ proc waku_relay_get_num_connected_peers(
|
||||
userData: pointer,
|
||||
pubSubTopic: cstring,
|
||||
) {.ffi.} =
|
||||
let numConnPeers = ctx.myLib[].waku.node.wakuRelay.getNumConnectedPeers($pubsubTopic).valueOr:
|
||||
let n = (await ctx.myLib[].waku.relayNumConnectedPeers(PubsubTopic($pubSubTopic))).valueOr:
|
||||
error "NUM_CONNECTED_PEERS failed", error = error
|
||||
return err($error)
|
||||
return ok($numConnPeers)
|
||||
return err(error)
|
||||
return ok($n)
|
||||
|
||||
proc waku_relay_add_protected_shard(
|
||||
ctx: ptr FFIContext[LogosDelivery],
|
||||
@ -71,15 +63,12 @@ proc waku_relay_add_protected_shard(
|
||||
publicKey: cstring,
|
||||
) {.ffi.} =
|
||||
## Protects a shard with a public key
|
||||
try:
|
||||
let relayShard = RelayShard(clusterId: uint16(clusterId), shardId: uint16(shardId))
|
||||
let protectedShard = ProtectedShard.parseCmdArg($relayShard & ":" & $publicKey)
|
||||
ctx.myLib[].waku.node.wakuRelay.addSignedShardsValidator(
|
||||
@[protectedShard], uint16(clusterId)
|
||||
(
|
||||
await ctx.myLib[].waku.relayAddProtectedShard(
|
||||
uint16(clusterId), uint16(shardId), $publicKey
|
||||
)
|
||||
except ValueError as exc:
|
||||
return err("ERROR in waku_relay_add_protected_shard: " & exc.msg)
|
||||
|
||||
).isOkOr:
|
||||
return err(error)
|
||||
return ok("")
|
||||
|
||||
proc waku_relay_subscribe(
|
||||
@ -88,20 +77,18 @@ proc waku_relay_subscribe(
|
||||
userData: pointer,
|
||||
pubSubTopic: cstring,
|
||||
) {.ffi.} =
|
||||
echo "Subscribing to topic: " & $pubSubTopic & " ..."
|
||||
proc onReceivedMessage(ctx: ptr FFIContext[LogosDelivery]): WakuRelayHandler =
|
||||
return proc(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async.} =
|
||||
callEventCallback(ctx, "onReceivedMessage"):
|
||||
$JsonMessageEvent.new(pubsubTopic, msg)
|
||||
|
||||
var cb = onReceivedMessage(ctx)
|
||||
|
||||
ctx.myLib[].waku.node.subscribe(
|
||||
(kind: SubscriptionKind.PubsubSub, topic: $pubsubTopic),
|
||||
handler = WakuRelayHandler(cb),
|
||||
(
|
||||
await ctx.myLib[].waku.relaySubscribe(
|
||||
PubsubTopic($pubSubTopic), WakuRelayHandler(onReceivedMessage(ctx))
|
||||
)
|
||||
).isOkOr:
|
||||
error "SUBSCRIBE failed", error = error
|
||||
return err($error)
|
||||
return err(error)
|
||||
return ok("")
|
||||
|
||||
proc waku_relay_unsubscribe(
|
||||
@ -110,12 +97,9 @@ proc waku_relay_unsubscribe(
|
||||
userData: pointer,
|
||||
pubSubTopic: cstring,
|
||||
) {.ffi.} =
|
||||
ctx.myLib[].waku.node.unsubscribe(
|
||||
(kind: SubscriptionKind.PubsubSub, topic: $pubsubTopic)
|
||||
).isOkOr:
|
||||
(await ctx.myLib[].waku.relayUnsubscribe(PubsubTopic($pubSubTopic))).isOkOr:
|
||||
error "UNSUBSCRIBE failed", error = error
|
||||
return err($error)
|
||||
|
||||
return err(error)
|
||||
return ok("")
|
||||
|
||||
proc waku_relay_publish(
|
||||
@ -126,31 +110,30 @@ proc waku_relay_publish(
|
||||
jsonWakuMessage: cstring,
|
||||
timeoutMs: cuint,
|
||||
) {.ffi.} =
|
||||
var
|
||||
# https://rfc.vac.dev/spec/36/#extern-char-waku_relay_publishchar-messagejson-char-pubsubtopic-int-timeoutms
|
||||
jsonMessage: JsonMessage
|
||||
var jsonMessage: JsonMessage
|
||||
try:
|
||||
let jsonContent = parseJson($jsonWakuMessage)
|
||||
jsonMessage = JsonMessage.fromJsonNode(jsonContent).valueOr:
|
||||
raise newException(JsonParsingError, $error)
|
||||
except JsonParsingError as exc:
|
||||
return err(fmt"Error parsing json message: {exc.msg}")
|
||||
return err("Error parsing json message: " & exc.msg)
|
||||
|
||||
let msg = json_message_event.toWakuMessage(jsonMessage).valueOr:
|
||||
return err("Problem building the WakuMessage: " & $error)
|
||||
|
||||
(await ctx.myLib[].waku.node.wakuRelay.publish($pubsubTopic, msg)).isOkOr:
|
||||
let msgHash = (
|
||||
await ctx.myLib[].waku.relayPublish(PubsubTopic($pubSubTopic), msg, uint32(timeoutMs))
|
||||
).valueOr:
|
||||
error "PUBLISH failed", error = error
|
||||
return err($error)
|
||||
|
||||
let msgHash = computeMessageHash($pubSubTopic, msg).to0xHex
|
||||
return err(error)
|
||||
return ok(msgHash)
|
||||
|
||||
proc waku_default_pubsub_topic(
|
||||
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
|
||||
) {.ffi.} =
|
||||
# https://rfc.vac.dev/spec/36/#extern-char-waku_default_pubsub_topic
|
||||
return ok(DefaultPubsubTopic)
|
||||
let topic = (await ctx.myLib[].waku.defaultPubsubTopic()).valueOr:
|
||||
return err(error)
|
||||
return ok(string(topic))
|
||||
|
||||
proc waku_content_topic(
|
||||
ctx: ptr FFIContext[LogosDelivery],
|
||||
@ -161,9 +144,13 @@ proc waku_content_topic(
|
||||
contentTopicName: cstring,
|
||||
encoding: cstring,
|
||||
) {.ffi.} =
|
||||
# https://rfc.vac.dev/spec/36/#extern-char-waku_content_topicchar-applicationname-unsigned-int-applicationversion-char-contenttopicname-char-encoding
|
||||
|
||||
return ok(fmt"/{$appName}/{$appVersion}/{$contentTopicName}/{$encoding}")
|
||||
let topic = (
|
||||
await ctx.myLib[].waku.buildContentTopic(
|
||||
$appName, uint32(appVersion), $contentTopicName, $encoding
|
||||
)
|
||||
).valueOr:
|
||||
return err(error)
|
||||
return ok(string(topic))
|
||||
|
||||
proc waku_pubsub_topic(
|
||||
ctx: ptr FFIContext[LogosDelivery],
|
||||
@ -171,5 +158,6 @@ proc waku_pubsub_topic(
|
||||
userData: pointer,
|
||||
topicName: cstring,
|
||||
) {.ffi.} =
|
||||
# https://rfc.vac.dev/spec/36/#extern-char-waku_pubsub_topicchar-name-char-encoding
|
||||
return ok(fmt"/waku/2/{$topicName}")
|
||||
let topic = (await ctx.myLib[].waku.buildPubsubTopic($topicName)).valueOr:
|
||||
return err(error)
|
||||
return ok(string(topic))
|
||||
|
||||
@ -1,13 +1,10 @@
|
||||
import logos_delivery/waku/compat/option_valueor
|
||||
import std/[json, sugar, strutils, options]
|
||||
import chronos, chronicles, results, stew/byteutils, ffi
|
||||
import std/[json, sugar, options]
|
||||
import chronos, chronicles, results, ffi
|
||||
import
|
||||
logos_delivery/waku/waku,
|
||||
logos_delivery,
|
||||
library/utils,
|
||||
logos_delivery/waku/waku_core/peers,
|
||||
logos_delivery/waku/waku_core/message/digest,
|
||||
logos_delivery/waku/waku_store/common,
|
||||
logos_delivery/waku/waku_store/client,
|
||||
logos_delivery/waku/common/paging,
|
||||
library/declare_lib
|
||||
|
||||
@ -83,13 +80,10 @@ proc waku_store_query(
|
||||
|
||||
let storeQueryRequest = ?fromJsonNode(jsonContentRes.get())
|
||||
|
||||
let peer = peers.parsePeerInfo(($peerAddr).split(",")).valueOr:
|
||||
return err("StoreRequest failed to parse peer addr: " & $error)
|
||||
|
||||
let queryResponse = (
|
||||
await ctx.myLib[].waku.node.wakuStoreClient.query(storeQueryRequest, peer)
|
||||
await ctx.myLib[].waku.storeQuery(storeQueryRequest, $peerAddr, int(timeoutMs))
|
||||
).valueOr:
|
||||
return err("StoreRequest failed store query: " & $error)
|
||||
return err("StoreRequest failed store query: " & error)
|
||||
|
||||
let res = $(%*(queryResponse.toHex()))
|
||||
return ok(res) ## returning the response in json format
|
||||
|
||||
@ -3,9 +3,8 @@ import chronos, chronicles, results, ffi
|
||||
import
|
||||
logos_delivery,
|
||||
logos_delivery/waku/node/waku_node,
|
||||
logos_delivery/waku/events/message_events,
|
||||
logos_delivery/api/types,
|
||||
logos_delivery/waku/events/[message_events, health_events],
|
||||
logos_delivery/waku/api/events/health_events,
|
||||
tools/confutils/conf_from_json,
|
||||
../declare_lib,
|
||||
../json_event
|
||||
|
||||
89
logos_delivery/channels/api/channel_lifecycle.nim
Normal file
89
logos_delivery/channels/api/channel_lifecycle.nim
Normal file
@ -0,0 +1,89 @@
|
||||
## Reliable Channel layer API — channel lifecycle
|
||||
## (createReliableChannel / closeChannel).
|
||||
import std/[options, tables]
|
||||
import results, chronos, chronicles
|
||||
|
||||
import logos_delivery/api/types
|
||||
import logos_delivery/channels/reliable_channel_manager
|
||||
import logos_delivery/channels/reliable_channel
|
||||
import logos_delivery/waku/persistency/sds_persistency
|
||||
|
||||
# ReliableChannel, SendHandler, config and wire-version markers.
|
||||
export reliable_channel
|
||||
|
||||
const SdsJobId = "sds"
|
||||
## One persistency job shared by every channel's SDS state; rows are
|
||||
## keyed by channelId.
|
||||
|
||||
proc sdsPersistence(): Option[Persistence] =
|
||||
## SDS backend from the Persistency singleton; memory-only fallback when
|
||||
## it is unavailable (e.g. unit tests).
|
||||
let p = Persistency.instance().valueOr:
|
||||
info "SDS persistence disabled, running memory-only", reason = $error
|
||||
return none(Persistence)
|
||||
let job = p.openJob(SdsJobId).valueOr:
|
||||
warn "SDS persistence disabled, could not open persistency job",
|
||||
jobId = SdsJobId, reason = $error
|
||||
return none(Persistence)
|
||||
return some(newSdsPersistence(job))
|
||||
|
||||
proc createReliableChannel*(
|
||||
self: ReliableChannelManager,
|
||||
channelId: ChannelId,
|
||||
contentTopic: ContentTopic,
|
||||
senderId: SdsParticipantID,
|
||||
sendHandler: SendHandler = nil,
|
||||
): Result[ChannelId, string] =
|
||||
## Spec entry point. The `sendHandler` and `rng` the channel needs are
|
||||
## sourced from the owning `ReliableChannelManager` rather than passed
|
||||
## per call. Encryption is wired up through the `Encrypt`/`Decrypt`
|
||||
## request brokers — the application installs its own providers
|
||||
## (or `setNoopEncryption()`) before traffic flows.
|
||||
##
|
||||
## `sendHandler` defaults to the manager's default (constructed at mount
|
||||
## from `MessagingClient.send`); tests pass a fake to bypass the network.
|
||||
if self.channels.hasKey(channelId):
|
||||
return err("channel already exists: " & channelId)
|
||||
|
||||
let segConfig = SegmentationConfig(
|
||||
segmentSizeBytes: DefaultSegmentSizeBytes,
|
||||
enableReedSolomon: false,
|
||||
persistence: nil,
|
||||
)
|
||||
let sdsConfig = SdsConfig(
|
||||
acknowledgementTimeoutMs: DefaultAcknowledgementTimeoutMs,
|
||||
maxRetransmissions: DefaultMaxRetransmissions,
|
||||
causalHistorySize: DefaultCausalHistorySize,
|
||||
persistence: sdsPersistence(),
|
||||
)
|
||||
let rateConfig = RateLimitConfig(
|
||||
epochPeriodSec: DefaultEpochPeriodSec, messagesPerEpoch: DefaultMessagesPerEpoch
|
||||
)
|
||||
|
||||
let effectiveSendHandler = if sendHandler.isNil(): self.sendHandler else: sendHandler
|
||||
|
||||
let chn = ReliableChannel.new(
|
||||
sendHandler = effectiveSendHandler,
|
||||
channelId = channelId,
|
||||
contentTopic = contentTopic,
|
||||
senderId = senderId,
|
||||
segConfig = segConfig,
|
||||
sdsConfig = sdsConfig,
|
||||
rateConfig = rateConfig,
|
||||
brokerCtx = self.brokerCtx,
|
||||
)
|
||||
|
||||
self.channels[channelId] = chn
|
||||
return ok(channelId)
|
||||
|
||||
proc closeChannel*(
|
||||
self: ReliableChannelManager, channelId: ChannelId
|
||||
): Future[Result[void, string]] {.async: (raises: []).} =
|
||||
## Stops the channel's SDS loops and releases the channel. Persisted SDS
|
||||
## state survives, so re-creating the channel restores it.
|
||||
let chn = self.channels.getOrDefault(channelId)
|
||||
if chn.isNil():
|
||||
return err("unknown channel: " & channelId)
|
||||
self.channels.del(channelId)
|
||||
await chn.stop()
|
||||
return ok()
|
||||
@ -1,20 +1,20 @@
|
||||
## Reliable Channel event types emitted to API consumers.
|
||||
## Reliable Channel layer API — event surface.
|
||||
##
|
||||
## Lifecycle events for individual segments (sent / propagated / errored)
|
||||
## are the same as the network-level ones the MessagingClient already
|
||||
## emits — `requestId` is shared across layers — so we just re-export
|
||||
## `waku/events/message_events` and avoid declaring duplicates.
|
||||
## `messaging/api/events` and avoid declaring duplicates.
|
||||
##
|
||||
## Only the channel-level `MessageReceivedEvent` carries data that has
|
||||
## no analogue in the lower layer (reassembled application payload,
|
||||
## senderId, channelId), so it lives here.
|
||||
|
||||
import logos_delivery/waku/events/message_events as waku_message_events
|
||||
import logos_delivery/messaging/api/events as messaging_events
|
||||
import brokers/event_broker
|
||||
|
||||
import ./types as channel_types
|
||||
import logos_delivery/channels/types as channel_types
|
||||
|
||||
export waku_message_events, channel_types, event_broker
|
||||
export messaging_events, channel_types, event_broker
|
||||
|
||||
EventBroker:
|
||||
type ChannelMessageReceivedEvent* = object
|
||||
21
logos_delivery/channels/api/send.nim
Normal file
21
logos_delivery/channels/api/send.nim
Normal file
@ -0,0 +1,21 @@
|
||||
## Reliable Channel layer API — channel send operation.
|
||||
import std/tables
|
||||
import results, chronos
|
||||
|
||||
import logos_delivery/api/types
|
||||
import logos_delivery/channels/reliable_channel_manager
|
||||
import logos_delivery/channels/reliable_channel
|
||||
|
||||
proc send*(
|
||||
self: ReliableChannelManager,
|
||||
channelId: ChannelId,
|
||||
appPayload: seq[byte],
|
||||
ephemeral: bool = false,
|
||||
): Future[Result[RequestId, string]] {.async: (raises: []).} =
|
||||
## Spec-level entry point. Looks the channel up by id and delegates
|
||||
## to `ReliableChannel.send`, which exposes the visible pipeline
|
||||
## segmentation -> sds -> rate_limit_manager -> encryption.
|
||||
let chn = self.channels.getOrDefault(channelId)
|
||||
if chn.isNil():
|
||||
return err("unknown channel: " & channelId)
|
||||
return await chn.send(appPayload, ephemeral)
|
||||
@ -25,7 +25,7 @@ import logos_delivery/api/types
|
||||
import logos_delivery/messaging/delivery_service/send_service
|
||||
import logos_delivery/waku/waku_core/topics
|
||||
|
||||
import ./events
|
||||
import logos_delivery/channels/api/events
|
||||
import ./segmentation/segmentation
|
||||
import ./scalable_data_sync/scalable_data_sync
|
||||
import ./rate_limit_manager/rate_limit_manager
|
||||
|
||||
@ -13,21 +13,14 @@ import stew/byteutils
|
||||
|
||||
import brokers/broker_context
|
||||
|
||||
import logos_delivery/waku/events/message_events as waku_message_events
|
||||
import logos_delivery/messaging/messaging_client
|
||||
import logos_delivery/messaging/api/send
|
||||
import logos_delivery/api/types
|
||||
import logos_delivery/waku/waku_core/topics
|
||||
import logos_delivery/waku/persistency/sds_persistency
|
||||
|
||||
import ./reliable_channel
|
||||
import ./encryption/noop_encryption
|
||||
|
||||
export reliable_channel
|
||||
|
||||
const SdsJobId = "sds"
|
||||
## One persistency job shared by every channel's SDS state; rows are
|
||||
## keyed by channelId.
|
||||
|
||||
type
|
||||
ReliableChannelManagerConf* = object
|
||||
## Per-layer config object for the reliable
|
||||
@ -35,13 +28,13 @@ type
|
||||
## will move here in a follow-up PR); kept so each layer owns its own config.
|
||||
|
||||
ReliableChannelManager* = ref object
|
||||
channels: Table[ChannelId, ReliableChannel]
|
||||
channels*: Table[ChannelId, ReliableChannel] ## read by `channels/api.nim`
|
||||
messagingClient: MessagingClient ## The channel layer chains onto messaging.
|
||||
sendHandler: SendHandler
|
||||
sendHandler*: SendHandler
|
||||
## Default egress dispatch for channels created through this manager.
|
||||
## Built in `new` as a closure over `MessagingClient.send` so the channel
|
||||
## layer itself stays callable-only.
|
||||
brokerCtx: BrokerContext
|
||||
brokerCtx*: BrokerContext
|
||||
|
||||
proc new*(
|
||||
T: type ReliableChannelManager,
|
||||
@ -82,96 +75,6 @@ proc stop*(self: ReliableChannelManager) {.async.} =
|
||||
await chn.stop()
|
||||
self.channels.clear()
|
||||
|
||||
proc sdsPersistence(): Option[Persistence] =
|
||||
## SDS backend from the Persistency singleton; memory-only fallback when
|
||||
## it is unavailable (e.g. unit tests).
|
||||
let p = Persistency.instance().valueOr:
|
||||
info "SDS persistence disabled, running memory-only", reason = $error
|
||||
return none(Persistence)
|
||||
let job = p.openJob(SdsJobId).valueOr:
|
||||
warn "SDS persistence disabled, could not open persistency job",
|
||||
jobId = SdsJobId, reason = $error
|
||||
return none(Persistence)
|
||||
return some(newSdsPersistence(job))
|
||||
|
||||
proc createReliableChannel*(
|
||||
self: ReliableChannelManager,
|
||||
channelId: ChannelId,
|
||||
contentTopic: ContentTopic,
|
||||
senderId: SdsParticipantID,
|
||||
sendHandler: SendHandler = nil,
|
||||
): Result[ChannelId, string] =
|
||||
## Spec entry point. The `sendHandler` and `rng` the channel needs are
|
||||
## sourced from the owning `ReliableChannelManager` rather than passed
|
||||
## per call. Encryption is wired up through the `Encrypt`/`Decrypt`
|
||||
## request brokers — the application installs its own providers
|
||||
## (or `setNoopEncryption()`) before traffic flows.
|
||||
##
|
||||
## Segmentation, SDS and rate-limit configs will eventually be read
|
||||
## from the node's `NodeConfig`. Defaults for now.
|
||||
##
|
||||
## `sendHandler` defaults to the manager's default (constructed at mount
|
||||
## from `MessagingClient.send`); tests pass a fake to bypass the network.
|
||||
if self.channels.hasKey(channelId):
|
||||
return err("channel already exists: " & channelId)
|
||||
|
||||
let segConfig = SegmentationConfig(
|
||||
segmentSizeBytes: DefaultSegmentSizeBytes,
|
||||
enableReedSolomon: false,
|
||||
persistence: nil,
|
||||
)
|
||||
let sdsConfig = SdsConfig(
|
||||
acknowledgementTimeoutMs: DefaultAcknowledgementTimeoutMs,
|
||||
maxRetransmissions: DefaultMaxRetransmissions,
|
||||
causalHistorySize: DefaultCausalHistorySize,
|
||||
persistence: sdsPersistence(),
|
||||
)
|
||||
let rateConfig = RateLimitConfig(
|
||||
epochPeriodSec: DefaultEpochPeriodSec, messagesPerEpoch: DefaultMessagesPerEpoch
|
||||
)
|
||||
|
||||
let effectiveSendHandler = if sendHandler.isNil(): self.sendHandler else: sendHandler
|
||||
|
||||
let chn = ReliableChannel.new(
|
||||
sendHandler = effectiveSendHandler,
|
||||
channelId = channelId,
|
||||
contentTopic = contentTopic,
|
||||
senderId = senderId,
|
||||
segConfig = segConfig,
|
||||
sdsConfig = sdsConfig,
|
||||
rateConfig = rateConfig,
|
||||
brokerCtx = self.brokerCtx,
|
||||
)
|
||||
|
||||
self.channels[channelId] = chn
|
||||
return ok(channelId)
|
||||
|
||||
proc closeChannel*(
|
||||
self: ReliableChannelManager, channelId: ChannelId
|
||||
): Future[Result[void, string]] {.async: (raises: []).} =
|
||||
## Stops the channel's SDS loops and releases the channel. Persisted SDS
|
||||
## state survives, so re-creating the channel restores it.
|
||||
let chn = self.channels.getOrDefault(channelId)
|
||||
if chn.isNil():
|
||||
return err("unknown channel: " & channelId)
|
||||
self.channels.del(channelId)
|
||||
await chn.stop()
|
||||
return ok()
|
||||
|
||||
proc send*(
|
||||
self: ReliableChannelManager,
|
||||
channelId: ChannelId,
|
||||
appPayload: seq[byte],
|
||||
ephemeral: bool = false,
|
||||
): Future[Result[RequestId, string]] {.async: (raises: []).} =
|
||||
## Spec-level entry point. Looks the channel up by id and delegates
|
||||
## to `ReliableChannel.send`, which exposes the visible pipeline
|
||||
## segmentation -> sds -> rate_limit_manager -> encryption.
|
||||
let chn = self.channels.getOrDefault(channelId)
|
||||
if chn.isNil():
|
||||
return err("unknown channel: " & channelId)
|
||||
return await chn.send(appPayload, ephemeral)
|
||||
|
||||
## Inbound messages are not handed to the manager by direct call. Each
|
||||
## `ReliableChannel` installs its own `MessageReceivedEvent` listener
|
||||
## in `ReliableChannel.new`, filters by spec marker and `contentTopic`,
|
||||
|
||||
@ -11,12 +11,42 @@
|
||||
|
||||
import results, chronos, chronicles
|
||||
|
||||
# Each layer has a core module (type + new/start/stop) and an api/ folder whose
|
||||
# modules each implement a differentiated set of operations, plus an events
|
||||
# surface. The concentrator re-exports them so library consumers get the full
|
||||
# surface from `import logos_delivery`. (The per-layer `events` modules share a
|
||||
# stem, so they are imported under aliases.)
|
||||
|
||||
# Waku layer
|
||||
import logos_delivery/waku/waku
|
||||
export waku
|
||||
import
|
||||
logos_delivery/waku/api/[
|
||||
topics, relay, filter, lightpush, store, peer_manager, discovery, debug, health,
|
||||
ping,
|
||||
]
|
||||
export
|
||||
topics, relay, filter, lightpush, store, peer_manager, discovery, debug, health, ping
|
||||
import logos_delivery/waku/api/events/[message_events, health_events]
|
||||
export message_events, health_events
|
||||
|
||||
# Messaging layer
|
||||
import logos_delivery/messaging/messaging_client
|
||||
export messaging_client
|
||||
import logos_delivery/messaging/api/[subscription, send]
|
||||
export subscription, send
|
||||
import logos_delivery/messaging/api/events as messaging_api_events
|
||||
export messaging_api_events
|
||||
|
||||
# Reliable Channel layer
|
||||
import logos_delivery/channels/reliable_channel_manager
|
||||
export reliable_channel_manager
|
||||
import logos_delivery/channels/api/channel_lifecycle
|
||||
export channel_lifecycle
|
||||
import logos_delivery/channels/api/send as channel_send
|
||||
export channel_send
|
||||
import logos_delivery/channels/api/events as channels_api_events
|
||||
export channels_api_events
|
||||
|
||||
import logos_delivery/waku/factory/waku_conf
|
||||
import logos_delivery/waku/factory/app_callbacks
|
||||
|
||||
@ -1,7 +1,8 @@
|
||||
## Messaging layer API — event surface (messaging-level message events).
|
||||
import brokers/event_broker
|
||||
import logos_delivery/api/types
|
||||
import logos_delivery/waku/[waku_core/message, waku_core/topics]
|
||||
export types
|
||||
import logos_delivery/waku/waku_core/message
|
||||
export event_broker, types
|
||||
|
||||
EventBroker:
|
||||
# Event emitted when a message is sent to the network
|
||||
@ -27,9 +28,3 @@ EventBroker:
|
||||
type MessageReceivedEvent* = object
|
||||
messageHash*: string
|
||||
message*: WakuMessage
|
||||
|
||||
EventBroker:
|
||||
# Internal event emitted when a message arrives from the network via any protocol
|
||||
type MessageSeenEvent* = object
|
||||
topic*: PubsubTopic
|
||||
message*: WakuMessage
|
||||
34
logos_delivery/messaging/api/send.nim
Normal file
34
logos_delivery/messaging/api/send.nim
Normal file
@ -0,0 +1,34 @@
|
||||
## Messaging layer API — send operation.
|
||||
import results, chronos, chronicles
|
||||
|
||||
import logos_delivery/api/types
|
||||
import logos_delivery/messaging/messaging_client
|
||||
import logos_delivery/waku/node/[waku_node, subscription_manager]
|
||||
import logos_delivery/messaging/delivery_service/send_service
|
||||
import logos_delivery/messaging/delivery_service/send_service/delivery_task
|
||||
|
||||
proc send*(
|
||||
self: MessagingClient, envelope: MessageEnvelope
|
||||
): Future[Result[RequestId, string]] {.async.} =
|
||||
## High-level messaging API send. Auto-subscribes to the content topic
|
||||
## (so the local node sees its own gossipsub broadcast), builds a
|
||||
## `DeliveryTask`, and hands it to the send service. Returns the request
|
||||
## id the caller can correlate with `MessageSentEvent` / `MessageErrorEvent`.
|
||||
?self.checkApiAvailability()
|
||||
|
||||
let isSubbed =
|
||||
self.node.subscriptionManager.isSubscribed(envelope.contentTopic).valueOr(false)
|
||||
if not isSubbed:
|
||||
info "Auto-subscribing to topic on send", contentTopic = envelope.contentTopic
|
||||
self.node.subscriptionManager.subscribe(envelope.contentTopic).isOkOr:
|
||||
warn "Failed to auto-subscribe", error = error
|
||||
return err("Failed to auto-subscribe before sending: " & error)
|
||||
|
||||
let requestId = RequestId.new(self.node.rng)
|
||||
|
||||
let deliveryTask = DeliveryTask.new(requestId, envelope, self.node.brokerCtx).valueOr:
|
||||
return err("MessagingClient.send: Failed to create delivery task: " & error)
|
||||
|
||||
asyncSpawn self.sendService.send(deliveryTask)
|
||||
|
||||
return ok(requestId)
|
||||
18
logos_delivery/messaging/api/subscription.nim
Normal file
18
logos_delivery/messaging/api/subscription.nim
Normal file
@ -0,0 +1,18 @@
|
||||
## Messaging layer API — subscription operations.
|
||||
import results, chronos
|
||||
|
||||
import logos_delivery/api/types
|
||||
import logos_delivery/messaging/messaging_client
|
||||
import logos_delivery/waku/node/[waku_node, subscription_manager]
|
||||
|
||||
proc subscribe*(
|
||||
self: MessagingClient, contentTopic: ContentTopic
|
||||
): Future[Result[void, string]] {.async.} =
|
||||
?self.checkApiAvailability()
|
||||
return self.node.subscriptionManager.subscribe(contentTopic)
|
||||
|
||||
proc unsubscribe*(
|
||||
self: MessagingClient, contentTopic: ContentTopic
|
||||
): Result[void, string] =
|
||||
?self.checkApiAvailability()
|
||||
return self.node.subscriptionManager.unsubscribe(contentTopic)
|
||||
@ -13,11 +13,12 @@ import
|
||||
waku_store/client,
|
||||
waku_store/common,
|
||||
waku_filter_v2/client,
|
||||
events/message_events,
|
||||
events/health_events,
|
||||
api/events/message_events,
|
||||
api/events/health_events,
|
||||
waku_node,
|
||||
node/subscription_manager,
|
||||
]
|
||||
import logos_delivery/messaging/api/events
|
||||
|
||||
const MaxMessageLife = chronos.minutes(7) ## Max time we will keep track of rx messages
|
||||
|
||||
|
||||
@ -18,8 +18,8 @@ import
|
||||
waku_rln_relay/rln_relay,
|
||||
waku_lightpush/client,
|
||||
waku_lightpush/callbacks,
|
||||
events/message_events,
|
||||
]
|
||||
import logos_delivery/messaging/api/events
|
||||
|
||||
logScope:
|
||||
topics = "send service"
|
||||
|
||||
@ -1,10 +1,10 @@
|
||||
## Messaging layer core: the `MessagingClient` type plus its construction and
|
||||
## lifecycle. The public operations (subscribe / unsubscribe / send) live in
|
||||
## `messaging/api.nim`.
|
||||
import results, chronos
|
||||
import chronicles
|
||||
import
|
||||
logos_delivery/api/types,
|
||||
logos_delivery/waku/node/[waku_node, subscription_manager],
|
||||
logos_delivery/messaging/delivery_service/[recv_service, send_service],
|
||||
logos_delivery/messaging/delivery_service/send_service/delivery_task
|
||||
logos_delivery/waku/node/waku_node,
|
||||
logos_delivery/messaging/delivery_service/[recv_service, send_service]
|
||||
|
||||
type
|
||||
MessagingClientConf* = object
|
||||
@ -14,7 +14,7 @@ type
|
||||
useP2PReliability*: bool
|
||||
|
||||
MessagingClient* = ref object
|
||||
node: WakuNode
|
||||
node*: WakuNode ## Waku core driven by this layer; read by `messaging/api.nim`.
|
||||
sendService*: SendService
|
||||
recvService*: RecvService
|
||||
started: bool
|
||||
@ -43,46 +43,9 @@ proc stop*(self: MessagingClient) {.async.} =
|
||||
await self.recvService.stopRecvService()
|
||||
self.started = false
|
||||
|
||||
proc checkApiAvailability(self: MessagingClient): Result[void, string] =
|
||||
proc checkApiAvailability*(self: MessagingClient): Result[void, string] =
|
||||
## Shared guard for the api operation module.
|
||||
if self.isNil():
|
||||
return err("MessagingClient is not initialized")
|
||||
|
||||
return ok()
|
||||
|
||||
proc subscribe*(
|
||||
self: MessagingClient, contentTopic: ContentTopic
|
||||
): Future[Result[void, string]] {.async.} =
|
||||
?checkApiAvailability(self)
|
||||
|
||||
return self.node.subscriptionManager.subscribe(contentTopic)
|
||||
|
||||
proc unsubscribe*(
|
||||
self: MessagingClient, contentTopic: ContentTopic
|
||||
): Result[void, string] =
|
||||
?checkApiAvailability(self)
|
||||
|
||||
return self.node.subscriptionManager.unsubscribe(contentTopic)
|
||||
|
||||
proc send*(
|
||||
self: MessagingClient, envelope: MessageEnvelope
|
||||
): Future[Result[RequestId, string]] {.async.} =
|
||||
## High-level messaging API send. Auto-subscribes to the content topic
|
||||
## (so the local node sees its own gossipsub broadcast), builds a
|
||||
## `DeliveryTask`, and hands it to the send service. Returns the request
|
||||
## id the caller can correlate with `MessageSentEvent` / `MessageErrorEvent`.
|
||||
let isSubbed =
|
||||
self.node.subscriptionManager.isSubscribed(envelope.contentTopic).valueOr(false)
|
||||
if not isSubbed:
|
||||
info "Auto-subscribing to topic on send", contentTopic = envelope.contentTopic
|
||||
self.node.subscriptionManager.subscribe(envelope.contentTopic).isOkOr:
|
||||
warn "Failed to auto-subscribe", error = error
|
||||
return err("Failed to auto-subscribe before sending: " & error)
|
||||
|
||||
let requestId = RequestId.new(self.node.rng)
|
||||
|
||||
let deliveryTask = DeliveryTask.new(requestId, envelope, self.node.brokerCtx).valueOr:
|
||||
return err("MessagingClient.send: Failed to create delivery task: " & error)
|
||||
|
||||
asyncSpawn self.sendService.send(deliveryTask)
|
||||
|
||||
return ok(requestId)
|
||||
|
||||
36
logos_delivery/waku/api/debug.nim
Normal file
36
logos_delivery/waku/api/debug.nim
Normal file
@ -0,0 +1,36 @@
|
||||
## Waku layer API — debug / info operations.
|
||||
{.push raises: [].}
|
||||
|
||||
import results, chronos, chronicles, metrics
|
||||
import eth/p2p/discoveryv5/enr
|
||||
|
||||
import logos_delivery/waku/waku
|
||||
import logos_delivery/waku/[waku_core, node/waku_node]
|
||||
|
||||
proc version*(self: Waku): Future[Result[string, string]] {.async.} =
|
||||
return ok(WakuNodeVersionString)
|
||||
|
||||
proc listenAddresses*(self: Waku): Future[Result[seq[string], string]] {.async.} =
|
||||
try:
|
||||
return ok(self.node.info().listenAddresses)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc myEnr*(self: Waku): Future[Result[string, string]] {.async.} =
|
||||
try:
|
||||
return ok(self.node.enr.toURI())
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc myPeerId*(self: Waku): Future[Result[string, string]] {.async.} =
|
||||
try:
|
||||
return ok($self.node.peerId())
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc metrics*(self: Waku): Future[Result[string, string]] {.async.} =
|
||||
{.gcsafe.}:
|
||||
try:
|
||||
return ok(defaultRegistry.toText())
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
76
logos_delivery/waku/api/discovery.nim
Normal file
76
logos_delivery/waku/api/discovery.nim
Normal file
@ -0,0 +1,76 @@
|
||||
## Waku layer API — discovery operations (DNS, discv5, peer exchange).
|
||||
{.push raises: [].}
|
||||
|
||||
import std/[net, sequtils]
|
||||
import results, chronos, chronicles
|
||||
|
||||
import logos_delivery/waku/waku
|
||||
import
|
||||
logos_delivery/waku/[
|
||||
waku_core,
|
||||
node/waku_node,
|
||||
node/waku_node/peer_exchange,
|
||||
discovery/waku_dnsdisc,
|
||||
discovery/waku_discv5,
|
||||
]
|
||||
|
||||
proc dnsDiscovery*(
|
||||
self: Waku, enrTreeUrl: string, nameServer: string, timeoutMs: int
|
||||
): Future[Result[seq[string], string]] {.async.} =
|
||||
try:
|
||||
let dnsNameServers = @[parseIpAddress(nameServer)]
|
||||
let discoveredPeers = (
|
||||
await retrieveDynamicBootstrapNodes(enrTreeUrl, dnsNameServers)
|
||||
).valueOr:
|
||||
return err("failed discovering peers from DNS: " & $error)
|
||||
|
||||
var multiAddresses = newSeq[string]()
|
||||
for discPeer in discoveredPeers:
|
||||
for address in discPeer.addrs:
|
||||
multiAddresses.add($address & "/p2p/" & $discPeer)
|
||||
|
||||
return ok(multiAddresses)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc discv5UpdateBootnodes*(
|
||||
self: Waku, bootnodes: string
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
## `bootnodes` is a JSON array of ENRs, e.g. `["enr:...", "enr:..."]`.
|
||||
try:
|
||||
if self.wakuDiscv5.isNil():
|
||||
return err("discv5 not started")
|
||||
self.wakuDiscv5.updateBootstrapRecords(bootnodes).isOkOr:
|
||||
return err("error in discv5UpdateBootnodes: " & $error)
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc startDiscv5*(self: Waku): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
if self.wakuDiscv5.isNil():
|
||||
return err("discv5 not started")
|
||||
(await self.wakuDiscv5.start()).isOkOr:
|
||||
return err("error starting discv5: " & $error)
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc stopDiscv5*(self: Waku): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
if self.wakuDiscv5.isNil():
|
||||
return err("discv5 not started")
|
||||
await self.wakuDiscv5.stop()
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc peerExchangeRequest*(
|
||||
self: Waku, numPeers: uint64
|
||||
): Future[Result[int, string]] {.async.} =
|
||||
try:
|
||||
let numPeersRecv = (await self.node.fetchPeerExchangePeers(numPeers)).valueOr:
|
||||
return err("failed peer exchange: " & $error)
|
||||
return ok(numPeersRecv)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
8
logos_delivery/waku/api/events/events.nim
Normal file
8
logos_delivery/waku/api/events/events.nim
Normal file
@ -0,0 +1,8 @@
|
||||
import
|
||||
./[
|
||||
message_events, filter_subscribe_events, health_events, peer_events,
|
||||
discovery_events,
|
||||
]
|
||||
|
||||
export
|
||||
message_events, filter_subscribe_events, health_events, peer_events, discovery_events
|
||||
10
logos_delivery/waku/api/events/message_events.nim
Normal file
10
logos_delivery/waku/api/events/message_events.nim
Normal file
@ -0,0 +1,10 @@
|
||||
import brokers/event_broker
|
||||
import logos_delivery/api/types
|
||||
import logos_delivery/waku/[waku_core/message, waku_core/topics]
|
||||
export event_broker, types
|
||||
|
||||
EventBroker:
|
||||
# Internal event emitted when a message arrives from the network via any protocol
|
||||
type MessageSeenEvent* = object
|
||||
topic*: PubsubTopic
|
||||
message*: WakuMessage
|
||||
88
logos_delivery/waku/api/filter.nim
Normal file
88
logos_delivery/waku/api/filter.nim
Normal file
@ -0,0 +1,88 @@
|
||||
## Waku layer API — filter (light client) operations.
|
||||
import logos_delivery/waku/compat/option_valueor
|
||||
{.push raises: [].}
|
||||
|
||||
import std/options
|
||||
import results, chronos, chronicles
|
||||
|
||||
import logos_delivery/waku/waku
|
||||
import
|
||||
logos_delivery/waku/[
|
||||
waku_core,
|
||||
waku_core/subscription/push_handler,
|
||||
node/waku_node,
|
||||
node/waku_node/filter,
|
||||
node/peer_manager,
|
||||
waku_filter_v2/client,
|
||||
waku_filter_v2/common,
|
||||
]
|
||||
|
||||
const FilterOpTimeout = 5.seconds
|
||||
|
||||
proc filterSubscribe*(
|
||||
self: Waku,
|
||||
pubsubTopic: PubsubTopic,
|
||||
contentTopics: seq[ContentTopic],
|
||||
pushHandler: FilterPushHandler,
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
## Registers `pushHandler` for incoming filtered messages, selects a filter
|
||||
## service peer, and subscribes.
|
||||
try:
|
||||
if self.node.wakuFilterClient.isNil():
|
||||
return err("wakuFilterClient is not mounted")
|
||||
|
||||
self.node.wakuFilterClient.registerPushHandler(pushHandler)
|
||||
|
||||
let peer = self.node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr:
|
||||
return err("could not find peer with WakuFilterSubscribeCodec when subscribing")
|
||||
|
||||
let subFut = self.node.filterSubscribe(some(pubsubTopic), contentTopics, peer)
|
||||
if not await subFut.withTimeout(FilterOpTimeout):
|
||||
return err("filter subscription timed out")
|
||||
subFut.read().isOkOr:
|
||||
return err($error)
|
||||
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc filterUnsubscribe*(
|
||||
self: Waku, pubsubTopic: PubsubTopic, contentTopics: seq[ContentTopic]
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
## Selects a filter service peer and unsubscribes the given content topics.
|
||||
try:
|
||||
if self.node.wakuFilterClient.isNil():
|
||||
return err("wakuFilterClient is not mounted")
|
||||
|
||||
let peer = self.node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr:
|
||||
return err("could not find peer with WakuFilterSubscribeCodec when unsubscribing")
|
||||
|
||||
let unsubFut = self.node.filterUnsubscribe(some(pubsubTopic), contentTopics, peer)
|
||||
if not await unsubFut.withTimeout(FilterOpTimeout):
|
||||
return err("filter un-subscription timed out")
|
||||
unsubFut.read().isOkOr:
|
||||
return err($error)
|
||||
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc filterUnsubscribeAll*(self: Waku): Future[Result[bool, string]] {.async.} =
|
||||
## Selects a filter service peer and unsubscribes from everything.
|
||||
try:
|
||||
if self.node.wakuFilterClient.isNil():
|
||||
return err("wakuFilterClient is not mounted")
|
||||
|
||||
let peer = self.node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr:
|
||||
return
|
||||
err("could not find peer with WakuFilterSubscribeCodec when unsubscribing all")
|
||||
|
||||
let unsubFut = self.node.filterUnsubscribeAll(peer)
|
||||
if not await unsubFut.withTimeout(FilterOpTimeout):
|
||||
return err("filter un-subscription all timed out")
|
||||
unsubFut.read().isOkOr:
|
||||
return err($error)
|
||||
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
13
logos_delivery/waku/api/health.nim
Normal file
13
logos_delivery/waku/api/health.nim
Normal file
@ -0,0 +1,13 @@
|
||||
## Waku layer API — health / connectivity.
|
||||
{.push raises: [].}
|
||||
|
||||
import results, chronos, chronicles
|
||||
|
||||
import logos_delivery/waku/waku
|
||||
import logos_delivery/waku/[node/health_monitor, node/health_monitor/online_monitor]
|
||||
|
||||
proc isOnline*(self: Waku): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
return ok(self.healthMonitor.onlineMonitor.amIOnline())
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
37
logos_delivery/waku/api/lightpush.nim
Normal file
37
logos_delivery/waku/api/lightpush.nim
Normal file
@ -0,0 +1,37 @@
|
||||
## Waku layer API — lightpush (light client publish) operations.
|
||||
import logos_delivery/waku/compat/option_valueor
|
||||
{.push raises: [].}
|
||||
|
||||
import results, chronos, chronicles
|
||||
|
||||
import logos_delivery/waku/waku
|
||||
import
|
||||
logos_delivery/waku/[
|
||||
waku_core,
|
||||
waku_core/codecs,
|
||||
node/waku_node,
|
||||
node/peer_manager,
|
||||
waku_lightpush_legacy/client,
|
||||
]
|
||||
|
||||
proc lightpushPublish*(
|
||||
self: Waku, pubsubTopic: PubsubTopic, message: WakuMessage
|
||||
): Future[Result[string, string]] {.async.} =
|
||||
## Selects a lightpush service peer and publishes; returns the message hash.
|
||||
try:
|
||||
if self.node.wakuLegacyLightpushClient.isNil():
|
||||
return err("wakuLegacyLightpushClient is not mounted")
|
||||
|
||||
let remotePeer = self.node.peerManager.selectPeer(WakuLightPushCodec).valueOr:
|
||||
return err("failed to lightpublish message, no suitable remote peers")
|
||||
|
||||
let msgHashHex = (
|
||||
await self.node.wakuLegacyLightpushClient.publish(
|
||||
pubsubTopic, message, remotePeer
|
||||
)
|
||||
).valueOr:
|
||||
return err($error)
|
||||
|
||||
return ok(msgHashHex)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
112
logos_delivery/waku/api/peer_manager.nim
Normal file
112
logos_delivery/waku/api/peer_manager.nim
Normal file
@ -0,0 +1,112 @@
|
||||
## Waku layer API — peer management operations.
|
||||
{.push raises: [].}
|
||||
|
||||
import std/[options, sequtils, strutils]
|
||||
import results, chronos, chronicles
|
||||
import libp2p/[peerid, peerstore]
|
||||
|
||||
import logos_delivery/waku/waku
|
||||
import logos_delivery/waku/[waku_core, node/waku_node, node/peer_manager]
|
||||
|
||||
type PeerConnInfo* = object ## structured connected-peer info for the api boundary
|
||||
peerId*: string
|
||||
protocols*: seq[string]
|
||||
addresses*: seq[string]
|
||||
|
||||
proc connect*(
|
||||
self: Waku, peers: seq[string], timeoutMs: uint32
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
await self.node.connectToNodes(peers.mapIt(strip(it)), source = "static")
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc disconnectPeerById*(
|
||||
self: Waku, peerId: string
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
let pId = PeerId.init(peerId).valueOr:
|
||||
return err($error)
|
||||
await self.node.peerManager.disconnectNode(pId)
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc disconnectAllPeers*(self: Waku): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
await self.node.peerManager.disconnectAllPeers()
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc dialPeer*(
|
||||
self: Waku, peerAddr: string, protocol: string, timeoutMs: int
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
let remotePeerInfo = parsePeerInfo(peerAddr).valueOr:
|
||||
return err($error)
|
||||
let conn = await self.node.peerManager.dialPeer(remotePeerInfo, protocol)
|
||||
if conn.isNone():
|
||||
return err("failed dialing peer")
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc dialPeerById*(
|
||||
self: Waku, peerId: string, protocol: string, timeoutMs: int
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
let pId = PeerId.init(peerId).valueOr:
|
||||
return err($error)
|
||||
let conn = await self.node.peerManager.dialPeer(pId, protocol)
|
||||
if conn.isNone():
|
||||
return err("failed dialing peer")
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc peerIdsFromPeerstore*(self: Waku): Future[Result[seq[string], string]] {.async.} =
|
||||
try:
|
||||
return ok(self.node.peerManager.switch.peerStore.peers().mapIt($it.peerId))
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc connectedPeersInfo*(
|
||||
self: Waku
|
||||
): Future[Result[seq[PeerConnInfo], string]] {.async.} =
|
||||
## Structured info (protocols, addresses) for every connected peer.
|
||||
try:
|
||||
var infos: seq[PeerConnInfo]
|
||||
for peer in self.node.peerManager.switch.peerStore.peers():
|
||||
if peer.connectedness == Connected:
|
||||
infos.add(
|
||||
PeerConnInfo(
|
||||
peerId: $peer.peerId,
|
||||
protocols: peer.protocols,
|
||||
addresses: peer.addrs.mapIt($it),
|
||||
)
|
||||
)
|
||||
return ok(infos)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc connectedPeers*(self: Waku): Future[Result[seq[string], string]] {.async.} =
|
||||
try:
|
||||
let (inPeerIds, outPeerIds) = self.node.peerManager.connectedPeers()
|
||||
return ok(concat(inPeerIds, outPeerIds).mapIt($it))
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc peerIdsByProtocol*(
|
||||
self: Waku, protocol: string
|
||||
): Future[Result[seq[string], string]] {.async.} =
|
||||
try:
|
||||
return ok(
|
||||
self.node.peerManager.switch.peerStore
|
||||
.peers(protocol)
|
||||
.filterIt(it.connectedness == Connected)
|
||||
.mapIt($it.peerId)
|
||||
)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
45
logos_delivery/waku/api/ping.nim
Normal file
45
logos_delivery/waku/api/ping.nim
Normal file
@ -0,0 +1,45 @@
|
||||
## Waku layer API — ping operation.
|
||||
{.push raises: [].}
|
||||
|
||||
import results, chronos, chronicles
|
||||
import libp2p/protocols/ping
|
||||
import libp2p/switch
|
||||
|
||||
import logos_delivery/waku/waku
|
||||
import logos_delivery/waku/[waku_core, node/waku_node, node/waku_node/ping]
|
||||
|
||||
proc pingPeer*(
|
||||
self: Waku, peerAddr: string, timeoutMs: int
|
||||
): Future[Result[int64, string]] {.async.} =
|
||||
## Pings the peer; `timeoutMs <= 0` means no timeout. Returns RTT in nanos.
|
||||
try:
|
||||
let peerInfo = parsePeerInfo(peerAddr).valueOr:
|
||||
return err("pingPeer failed to parse peer addr: " & $error)
|
||||
|
||||
proc doPing(): Future[Result[Duration, string]] {.async.} =
|
||||
try:
|
||||
let conn =
|
||||
await self.node.switch.dial(peerInfo.peerId, peerInfo.addrs, PingCodec)
|
||||
defer:
|
||||
await conn.close()
|
||||
let rtt = await self.node.libp2pPing.ping(conn)
|
||||
if rtt == 0.nanos:
|
||||
return err("could not ping peer: rtt-0")
|
||||
return ok(rtt)
|
||||
except CatchableError as e:
|
||||
return err("could not ping peer: " & e.msg)
|
||||
|
||||
let pingFut = doPing()
|
||||
let rtt: Duration =
|
||||
if timeoutMs <= 0:
|
||||
(await pingFut).valueOr:
|
||||
return err(error)
|
||||
else:
|
||||
if not await pingFut.withTimeout(chronos.milliseconds(timeoutMs)):
|
||||
return err("ping timed out")
|
||||
pingFut.read().valueOr:
|
||||
return err(error)
|
||||
|
||||
return ok(rtt.nanos)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
132
logos_delivery/waku/api/relay.nim
Normal file
132
logos_delivery/waku/api/relay.nim
Normal file
@ -0,0 +1,132 @@
|
||||
## Waku layer API — relay (gossipsub) operations.
|
||||
{.push raises: [].}
|
||||
|
||||
import std/sequtils
|
||||
import results, chronos, chronicles, secp256k1, stew/byteutils
|
||||
|
||||
import logos_delivery/waku/waku
|
||||
import
|
||||
logos_delivery/waku/[
|
||||
waku_core,
|
||||
node/waku_node,
|
||||
node/waku_node/relay,
|
||||
node/subscription_manager,
|
||||
waku_relay/protocol,
|
||||
factory/waku_conf,
|
||||
factory/validator_signed,
|
||||
]
|
||||
|
||||
proc relayPublish*(
|
||||
self: Waku, pubsubTopic: PubsubTopic, message: WakuMessage, timeoutMs: uint32
|
||||
): Future[Result[string, string]] {.async.} =
|
||||
## Publishes `message` and returns its message hash (0x-hex).
|
||||
try:
|
||||
if self.node.wakuRelay.isNil():
|
||||
return err("relayPublish: WakuRelay not mounted")
|
||||
|
||||
(await self.node.wakuRelay.publish(pubsubTopic, message)).isOkOr:
|
||||
return err($error)
|
||||
|
||||
return ok(computeMessageHash(pubsubTopic, message).to0xHex)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc relaySubscribe*(
|
||||
self: Waku,
|
||||
pubsubTopic: PubsubTopic,
|
||||
handler: WakuRelayHandler = WakuRelayHandler(nil),
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
## Subscribes to `pubsubTopic`. `handler` (optional) is invoked per message;
|
||||
## pass nil to subscribe without a message callback.
|
||||
try:
|
||||
if self.node.wakuRelay.isNil():
|
||||
return err("relaySubscribe: WakuRelay not mounted")
|
||||
|
||||
self.node.subscribe((kind: SubscriptionKind.PubsubSub, topic: pubsubTopic), handler).isOkOr:
|
||||
return err($error)
|
||||
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc relayUnsubscribe*(
|
||||
self: Waku, pubsubTopic: PubsubTopic
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuRelay.isNil():
|
||||
return err("relayUnsubscribe: WakuRelay not mounted")
|
||||
|
||||
self.node.unsubscribe((kind: SubscriptionKind.PubsubSub, topic: pubsubTopic)).isOkOr:
|
||||
return err($error)
|
||||
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc relayAddProtectedShard*(
|
||||
self: Waku, clusterId: uint16, shardId: uint16, publicKey: string
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuRelay.isNil():
|
||||
return err("relayAddProtectedShard: WakuRelay not mounted")
|
||||
|
||||
let pubKey = SkPublicKey.fromHex(publicKey).valueOr:
|
||||
return err("relayAddProtectedShard: invalid public key: " & $error)
|
||||
|
||||
let protectedShard = ProtectedShard(shard: shardId, key: pubKey)
|
||||
self.node.wakuRelay.addSignedShardsValidator(@[protectedShard], clusterId)
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc relayConnectedPeers*(
|
||||
self: Waku, pubsubTopic: PubsubTopic
|
||||
): Future[Result[seq[string], string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuRelay.isNil():
|
||||
return err("relayConnectedPeers: WakuRelay not mounted")
|
||||
|
||||
let connPeers = self.node.wakuRelay.getConnectedPeers(pubsubTopic).valueOr:
|
||||
return err($error)
|
||||
|
||||
return ok(connPeers.mapIt($it))
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc relayPeersInMesh*(
|
||||
self: Waku, pubsubTopic: PubsubTopic
|
||||
): Future[Result[seq[string], string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuRelay.isNil():
|
||||
return err("relayPeersInMesh: WakuRelay not mounted")
|
||||
|
||||
let meshPeers = self.node.wakuRelay.getPeersInMesh(pubsubTopic).valueOr:
|
||||
return err($error)
|
||||
|
||||
return ok(meshPeers.mapIt($it))
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc relayNumPeersInMesh*(
|
||||
self: Waku, pubsubTopic: PubsubTopic
|
||||
): Future[Result[int, string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuRelay.isNil():
|
||||
return err("relayNumPeersInMesh: WakuRelay not mounted")
|
||||
let n = self.node.wakuRelay.getNumPeersInMesh(pubsubTopic).valueOr:
|
||||
return err($error)
|
||||
return ok(n)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc relayNumConnectedPeers*(
|
||||
self: Waku, pubsubTopic: PubsubTopic
|
||||
): Future[Result[int, string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuRelay.isNil():
|
||||
return err("relayNumConnectedPeers: WakuRelay not mounted")
|
||||
let n = self.node.wakuRelay.getNumConnectedPeers(pubsubTopic).valueOr:
|
||||
return err($error)
|
||||
return ok(n)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
29
logos_delivery/waku/api/store.nim
Normal file
29
logos_delivery/waku/api/store.nim
Normal file
@ -0,0 +1,29 @@
|
||||
## Waku layer API — store (historical query) operations.
|
||||
{.push raises: [].}
|
||||
|
||||
import results, chronos, chronicles
|
||||
|
||||
import logos_delivery/waku/waku
|
||||
import
|
||||
logos_delivery/waku/[waku_core, node/waku_node, waku_store/common, waku_store/client]
|
||||
|
||||
proc storeQuery*(
|
||||
self: Waku, request: StoreQueryRequest, peer: string, timeoutMs: int
|
||||
): Future[Result[StoreQueryResponse, string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuStoreClient.isNil():
|
||||
return err("wakuStoreClient is not mounted")
|
||||
|
||||
let remotePeer = parsePeerInfo(peer).valueOr:
|
||||
return err("storeQuery failed to parse peer addr: " & $error)
|
||||
|
||||
let queryFut = self.node.wakuStoreClient.query(request, remotePeer)
|
||||
if not await queryFut.withTimeout(timeoutMs.milliseconds):
|
||||
return err("storeQuery timed out")
|
||||
|
||||
let queryResponse = queryFut.read().valueOr:
|
||||
return err("storeQuery failed: " & $error)
|
||||
|
||||
return ok(queryResponse)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
27
logos_delivery/waku/api/topics.nim
Normal file
27
logos_delivery/waku/api/topics.nim
Normal file
@ -0,0 +1,27 @@
|
||||
## Waku layer API — topic construction.
|
||||
{.push raises: [].}
|
||||
|
||||
import std/strformat
|
||||
import results, chronos
|
||||
|
||||
import logos_delivery/waku/waku
|
||||
import logos_delivery/waku/waku_core
|
||||
|
||||
proc buildContentTopic*(
|
||||
self: Waku, appName: string, appVersion: uint32, name: string, encoding: string
|
||||
): Future[Result[ContentTopic, string]] {.async.} =
|
||||
try:
|
||||
return ok(ContentTopic(fmt"/{appName}/{appVersion}/{name}/{encoding}"))
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc buildPubsubTopic*(
|
||||
self: Waku, topicName: string
|
||||
): Future[Result[PubsubTopic, string]] {.async.} =
|
||||
try:
|
||||
return ok(PubsubTopic(fmt"/waku/2/{topicName}"))
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc defaultPubsubTopic*(self: Waku): Future[Result[PubsubTopic, string]] {.async.} =
|
||||
return ok(DefaultPubsubTopic)
|
||||
@ -21,7 +21,7 @@ import
|
||||
import
|
||||
logos_delivery/waku/waku_core,
|
||||
logos_delivery/waku/node/peer_manager,
|
||||
logos_delivery/waku/events/discovery_events
|
||||
logos_delivery/waku/api/events/discovery_events
|
||||
|
||||
logScope:
|
||||
topics = "waku service discovery"
|
||||
|
||||
@ -1,9 +0,0 @@
|
||||
import
|
||||
./[
|
||||
message_events, delivery_events, health_events, peer_events, lifecycle_events,
|
||||
discovery_events,
|
||||
]
|
||||
|
||||
export
|
||||
message_events, delivery_events, health_events, peer_events, lifecycle_events,
|
||||
discovery_events
|
||||
@ -1,5 +1,9 @@
|
||||
import ../waku_relay, ../node/peer_manager, ../node/health_monitor/connection_status
|
||||
|
||||
# Re-export the modules that define the handler types below, so that consumers
|
||||
# of `AppCallbacks` (e.g. the FFI library) can construct the handlers.
|
||||
export waku_relay, peer_manager, connection_status
|
||||
|
||||
type AppCallbacks* = ref object
|
||||
relayHandler*: WakuRelayHandler
|
||||
topicHealthChangeHandler*: TopicHealthChangeHandler
|
||||
|
||||
@ -39,7 +39,7 @@ import
|
||||
../node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations,
|
||||
../waku_lightpush_legacy/common,
|
||||
../common/rate_limit/setting,
|
||||
../events/discovery_events
|
||||
../api/events/discovery_events
|
||||
|
||||
## Peer persistence
|
||||
|
||||
|
||||
@ -12,8 +12,8 @@ import
|
||||
logos_delivery/waku/[
|
||||
waku_relay,
|
||||
waku_rln_relay,
|
||||
events/health_events,
|
||||
events/peer_events,
|
||||
api/events/health_events,
|
||||
api/events/peer_events,
|
||||
node/waku_node,
|
||||
node/node_telemetry,
|
||||
node/peer_manager,
|
||||
|
||||
@ -20,7 +20,7 @@ import
|
||||
waku_relay/protocol,
|
||||
waku_enr/sharding,
|
||||
waku_enr/capabilities,
|
||||
events/peer_events,
|
||||
api/events/peer_events,
|
||||
common/nimchronos,
|
||||
common/enr,
|
||||
common/callbacks,
|
||||
|
||||
@ -15,9 +15,9 @@ import
|
||||
waku_filter_v2/common as filter_common,
|
||||
waku_filter_v2/client as filter_client,
|
||||
waku_filter_v2/protocol as filter_protocol,
|
||||
events/health_events,
|
||||
events/message_events,
|
||||
events/peer_events,
|
||||
api/events/health_events,
|
||||
api/events/message_events,
|
||||
api/events/peer_events,
|
||||
requests/health_requests,
|
||||
node/peer_manager,
|
||||
node/health_monitor/topic_health,
|
||||
|
||||
@ -59,9 +59,9 @@ import
|
||||
waku_mix,
|
||||
requests/node_requests,
|
||||
requests/health_requests,
|
||||
events/health_events,
|
||||
events/message_events,
|
||||
events/peer_events,
|
||||
api/events/health_events,
|
||||
api/events/message_events,
|
||||
api/events/peer_events,
|
||||
],
|
||||
logos_delivery/waku/discovery/waku_kademlia,
|
||||
logos_delivery/waku/net/[bound_ports, net_config],
|
||||
|
||||
@ -32,7 +32,7 @@ import
|
||||
node/waku_node,
|
||||
node/subscription_manager,
|
||||
node/peer_manager,
|
||||
events/message_events,
|
||||
api/events/message_events,
|
||||
]
|
||||
|
||||
export waku_relay.WakuRelayHandler
|
||||
|
||||
@ -63,8 +63,6 @@ logScope:
|
||||
# Git version in git describe format (defined at compile time)
|
||||
const git_version* {.strdefine.} = "n/a"
|
||||
|
||||
const FilterOpTimeout = 5.seconds
|
||||
|
||||
type Waku* = ref object
|
||||
stateInfo*: WakuStateInfo
|
||||
conf*: WakuConf
|
||||
@ -574,418 +572,4 @@ proc stop*(waku: Waku): Future[Result[void, string]] {.async: (raises: []).} =
|
||||
|
||||
return ok()
|
||||
|
||||
## Kernel API realization
|
||||
##
|
||||
# --- topic construction ---
|
||||
proc buildContentTopic*(
|
||||
self: Waku, appName: string, appVersion: uint32, name: string, encoding: string
|
||||
): Future[Result[ContentTopic, string]] {.async.} =
|
||||
try:
|
||||
return ok(ContentTopic(fmt"/{appName}/{appVersion}/{name}/{encoding}"))
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc buildPubsubTopic*(
|
||||
self: Waku, topicName: string
|
||||
): Future[Result[PubsubTopic, string]] {.async.} =
|
||||
try:
|
||||
return ok(PubsubTopic(fmt"/waku/2/{topicName}"))
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc defaultPubsubTopic*(self: Waku): Future[Result[PubsubTopic, string]] {.async.} =
|
||||
return ok(DefaultPubsubTopic)
|
||||
|
||||
# --- relay ---
|
||||
proc relayPublish*(
|
||||
self: Waku, pubsubTopic: PubsubTopic, message: WakuMessage, timeoutMs: uint32
|
||||
): Future[Result[int, string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuRelay.isNil():
|
||||
return err("relayPublish: WakuRelay not mounted")
|
||||
|
||||
let numPeers = (await self.node.wakuRelay.publish(pubsubTopic, message)).valueOr:
|
||||
return err($error)
|
||||
|
||||
return ok(numPeers)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc relaySubscribe*(
|
||||
self: Waku, pubsubTopic: PubsubTopic
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuRelay.isNil():
|
||||
return err("relaySubscribe: WakuRelay not mounted")
|
||||
|
||||
self.node.subscribe(
|
||||
(kind: SubscriptionKind.PubsubSub, topic: pubsubTopic), WakuRelayHandler(nil)
|
||||
).isOkOr:
|
||||
return err($error)
|
||||
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc relayUnsubscribe*(
|
||||
self: Waku, pubsubTopic: PubsubTopic
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuRelay.isNil():
|
||||
return err("relayUnsubscribe: WakuRelay not mounted")
|
||||
|
||||
self.node.unsubscribe((kind: SubscriptionKind.PubsubSub, topic: pubsubTopic)).isOkOr:
|
||||
return err($error)
|
||||
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc relayAddProtectedShard*(
|
||||
self: Waku, clusterId: uint16, shardId: uint16, publicKey: string
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuRelay.isNil():
|
||||
return err("relayAddProtectedShard: WakuRelay not mounted")
|
||||
|
||||
let pubKey = SkPublicKey.fromHex(publicKey).valueOr:
|
||||
return err("relayAddProtectedShard: invalid public key: " & $error)
|
||||
|
||||
let protectedShard = ProtectedShard(shard: shardId, key: pubKey)
|
||||
self.node.wakuRelay.addSignedShardsValidator(@[protectedShard], clusterId)
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc relayConnectedPeers*(
|
||||
self: Waku, pubsubTopic: PubsubTopic
|
||||
): Future[Result[seq[string], string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuRelay.isNil():
|
||||
return err("relayConnectedPeers: WakuRelay not mounted")
|
||||
|
||||
let connPeers = self.node.wakuRelay.getConnectedPeers(pubsubTopic).valueOr:
|
||||
return err($error)
|
||||
|
||||
return ok(connPeers.mapIt($it))
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc relayPeersInMesh*(
|
||||
self: Waku, pubsubTopic: PubsubTopic
|
||||
): Future[Result[seq[string], string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuRelay.isNil():
|
||||
return err("relayPeersInMesh: WakuRelay not mounted")
|
||||
|
||||
let meshPeers = self.node.wakuRelay.getPeersInMesh(pubsubTopic).valueOr:
|
||||
return err($error)
|
||||
|
||||
return ok(meshPeers.mapIt($it))
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
# --- filter ---
|
||||
proc filterSubscribe*(
|
||||
self: Waku,
|
||||
pubsubTopic: Option[PubsubTopic],
|
||||
contentTopics: seq[ContentTopic],
|
||||
peer: string,
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuFilterClient.isNil():
|
||||
return err("wakuFilterClient is not mounted")
|
||||
|
||||
let subFut = self.node.filterSubscribe(pubsubTopic, contentTopics, peer)
|
||||
if not await subFut.withTimeout(FilterOpTimeout):
|
||||
return err("filter subscription timed out")
|
||||
subFut.read().isOkOr:
|
||||
return err($error)
|
||||
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc filterUnsubscribe*(
|
||||
self: Waku,
|
||||
pubsubTopic: Option[PubsubTopic],
|
||||
contentTopics: seq[ContentTopic],
|
||||
peer: string,
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuFilterClient.isNil():
|
||||
return err("wakuFilterClient is not mounted")
|
||||
|
||||
let unsubFut = self.node.filterUnsubscribe(pubsubTopic, contentTopics, peer)
|
||||
if not await unsubFut.withTimeout(FilterOpTimeout):
|
||||
return err("filter un-subscription timed out")
|
||||
unsubFut.read().isOkOr:
|
||||
return err($error)
|
||||
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc filterUnsubscribeAll*(
|
||||
self: Waku, peer: string
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuFilterClient.isNil():
|
||||
return err("wakuFilterClient is not mounted")
|
||||
|
||||
let unsubFut = self.node.filterUnsubscribeAll(peer)
|
||||
if not await unsubFut.withTimeout(FilterOpTimeout):
|
||||
return err("filter un-subscription all timed out")
|
||||
unsubFut.read().isOkOr:
|
||||
return err($error)
|
||||
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
# --- lightpush ---
|
||||
proc lightpushPublish*(
|
||||
self: Waku, pubsubTopic: PubsubTopic, message: WakuMessage, peer: string
|
||||
): Future[Result[string, string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuLegacyLightpushClient.isNil():
|
||||
return err("wakuLegacyLightpushClient is not mounted")
|
||||
|
||||
let remotePeer = parsePeerInfo(peer).valueOr:
|
||||
return err("lightpushPublish failed to parse peer addr: " & $error)
|
||||
|
||||
let msgHashHex = (
|
||||
await self.node.wakuLegacyLightpushClient.publish(
|
||||
pubsubTopic, message, remotePeer
|
||||
)
|
||||
).valueOr:
|
||||
return err($error)
|
||||
|
||||
return ok(msgHashHex)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
# --- store ---
|
||||
proc storeQuery*(
|
||||
self: Waku, request: StoreQueryRequest, peer: string, timeoutMs: int
|
||||
): Future[Result[StoreQueryResponse, string]] {.async.} =
|
||||
try:
|
||||
if self.node.wakuStoreClient.isNil():
|
||||
return err("wakuStoreClient is not mounted")
|
||||
|
||||
let remotePeer = parsePeerInfo(peer).valueOr:
|
||||
return err("storeQuery failed to parse peer addr: " & $error)
|
||||
|
||||
let queryFut = self.node.wakuStoreClient.query(request, remotePeer)
|
||||
if not await queryFut.withTimeout(timeoutMs.milliseconds):
|
||||
return err("storeQuery timed out")
|
||||
|
||||
let queryResponse = queryFut.read().valueOr:
|
||||
return err("storeQuery failed: " & $error)
|
||||
|
||||
return ok(queryResponse)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
# --- peer management ---
|
||||
proc connect*(
|
||||
self: Waku, peers: seq[string], timeoutMs: uint32
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
await self.node.connectToNodes(peers.mapIt(strip(it)), source = "static")
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc disconnectPeerById*(
|
||||
self: Waku, peerId: string
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
let pId = PeerId.init(peerId).valueOr:
|
||||
return err($error)
|
||||
await self.node.peerManager.disconnectNode(pId)
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc disconnectAllPeers*(self: Waku): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
await self.node.peerManager.disconnectAllPeers()
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc dialPeer*(
|
||||
self: Waku, peerAddr: string, protocol: string, timeoutMs: int
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
let remotePeerInfo = parsePeerInfo(peerAddr).valueOr:
|
||||
return err($error)
|
||||
let conn = await self.node.peerManager.dialPeer(remotePeerInfo, protocol)
|
||||
if conn.isNone():
|
||||
return err("failed dialing peer")
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc dialPeerById*(
|
||||
self: Waku, peerId: string, protocol: string, timeoutMs: int
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
let pId = PeerId.init(peerId).valueOr:
|
||||
return err($error)
|
||||
let conn = await self.node.peerManager.dialPeer(pId, protocol)
|
||||
if conn.isNone():
|
||||
return err("failed dialing peer")
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc peerIdsFromPeerstore*(self: Waku): Future[Result[seq[string], string]] {.async.} =
|
||||
try:
|
||||
return ok(self.node.peerManager.switch.peerStore.peers().mapIt($it.peerId))
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc connectedPeersInfo*(self: Waku): Future[Result[seq[string], string]] {.async.} =
|
||||
try:
|
||||
return ok(
|
||||
self.node.peerManager.switch.peerStore
|
||||
.peers()
|
||||
.filterIt(it.connectedness == Connected)
|
||||
.mapIt($it.peerId)
|
||||
)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc connectedPeers*(self: Waku): Future[Result[seq[string], string]] {.async.} =
|
||||
try:
|
||||
let (inPeerIds, outPeerIds) = self.node.peerManager.connectedPeers()
|
||||
return ok(concat(inPeerIds, outPeerIds).mapIt($it))
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc peerIdsByProtocol*(
|
||||
self: Waku, protocol: string
|
||||
): Future[Result[seq[string], string]] {.async.} =
|
||||
try:
|
||||
return ok(
|
||||
self.node.peerManager.switch.peerStore
|
||||
.peers(protocol)
|
||||
.filterIt(it.connectedness == Connected)
|
||||
.mapIt($it.peerId)
|
||||
)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
# --- discovery ---
|
||||
proc dnsDiscovery*(
|
||||
self: Waku, enrTreeUrl: string, nameServer: string, timeoutMs: int
|
||||
): Future[Result[seq[string], string]] {.async.} =
|
||||
try:
|
||||
let dnsNameServers = @[parseIpAddress(nameServer)]
|
||||
let discoveredPeers = (
|
||||
await retrieveDynamicBootstrapNodes(enrTreeUrl, dnsNameServers)
|
||||
).valueOr:
|
||||
return err("failed discovering peers from DNS: " & $error)
|
||||
|
||||
var multiAddresses = newSeq[string]()
|
||||
for discPeer in discoveredPeers:
|
||||
for address in discPeer.addrs:
|
||||
multiAddresses.add($address & "/p2p/" & $discPeer)
|
||||
|
||||
return ok(multiAddresses)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc discv5UpdateBootnodes*(
|
||||
self: Waku, bootnodes: seq[string]
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
if self.wakuDiscv5.isNil():
|
||||
return err("discv5 not started")
|
||||
let jsonArray = "[" & bootnodes.mapIt("\"" & it & "\"").join(",") & "]"
|
||||
self.wakuDiscv5.updateBootstrapRecords(jsonArray).isOkOr:
|
||||
return err("error in discv5UpdateBootnodes: " & $error)
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc startDiscv5*(self: Waku): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
if self.wakuDiscv5.isNil():
|
||||
return err("discv5 not started")
|
||||
(await self.wakuDiscv5.start()).isOkOr:
|
||||
return err("error starting discv5: " & $error)
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc stopDiscv5*(self: Waku): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
if self.wakuDiscv5.isNil():
|
||||
return err("discv5 not started")
|
||||
await self.wakuDiscv5.stop()
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc peerExchangeRequest*(
|
||||
self: Waku, numPeers: uint64
|
||||
): Future[Result[int, string]] {.async.} =
|
||||
try:
|
||||
let numPeersRecv = (await self.node.fetchPeerExchangePeers(numPeers)).valueOr:
|
||||
return err("failed peer exchange: " & $error)
|
||||
return ok(numPeersRecv)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
# --- debug / info ---
|
||||
proc version*(self: Waku): Future[Result[string, string]] {.async.} =
|
||||
return ok(WakuNodeVersionString)
|
||||
|
||||
proc listenAddresses*(self: Waku): Future[Result[seq[string], string]] {.async.} =
|
||||
try:
|
||||
return ok(self.node.info().listenAddresses)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc myEnr*(self: Waku): Future[Result[string, string]] {.async.} =
|
||||
try:
|
||||
return ok(self.node.enr.toURI())
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc myPeerId*(self: Waku): Future[Result[string, string]] {.async.} =
|
||||
try:
|
||||
return ok($self.node.peerId())
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc metrics*(self: Waku): Future[Result[string, string]] {.async.} =
|
||||
{.gcsafe.}:
|
||||
try:
|
||||
return ok(defaultRegistry.toText())
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc pingPeer*(
|
||||
self: Waku, peerAddr: string, timeoutMs: int
|
||||
): Future[Result[int64, string]] {.async.} =
|
||||
try:
|
||||
let peerInfo = parsePeerInfo(peerAddr).valueOr:
|
||||
return err("pingPeer failed to parse peer addr: " & $error)
|
||||
|
||||
let conn = await self.node.switch.dial(peerInfo.peerId, peerInfo.addrs, PingCodec)
|
||||
defer:
|
||||
await conn.close()
|
||||
let pingRTT = await self.node.libp2pPing.ping(conn)
|
||||
|
||||
if pingRTT == 0.nanos:
|
||||
return err("could not ping peer: rtt-0")
|
||||
|
||||
return ok(pingRTT.nanos)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
{.pop.}
|
||||
|
||||
@ -14,7 +14,7 @@ import
|
||||
brokers/broker_context
|
||||
|
||||
import
|
||||
logos_delivery/waku/[node/peer_manager, waku_core, events/delivery_events],
|
||||
logos_delivery/waku/[node/peer_manager, waku_core, api/events/filter_subscribe_events],
|
||||
./common,
|
||||
./protocol_metrics,
|
||||
./rpc_codec,
|
||||
|
||||
@ -24,9 +24,9 @@ import
|
||||
logos_delivery/waku/waku_core,
|
||||
logos_delivery/waku/node/health_monitor/topic_health,
|
||||
logos_delivery/waku/requests/health_requests,
|
||||
logos_delivery/waku/events/health_events,
|
||||
logos_delivery/waku/api/events/health_events,
|
||||
./message_id,
|
||||
logos_delivery/waku/events/peer_events
|
||||
logos_delivery/waku/api/events/peer_events
|
||||
|
||||
from logos_delivery/waku/waku_core/codecs import WakuRelayCodec
|
||||
export WakuRelayCodec
|
||||
|
||||
@ -12,7 +12,7 @@ import
|
||||
[topic_health, health_status, protocol_health, health_report],
|
||||
logos_delivery/waku/requests/health_requests,
|
||||
logos_delivery/waku/requests/node_requests,
|
||||
logos_delivery/waku/events/health_events,
|
||||
logos_delivery/waku/api/events/health_events,
|
||||
logos_delivery/waku/common/waku_protocol,
|
||||
logos_delivery/waku/factory/waku_conf
|
||||
import tools/confutils/cli_args
|
||||
|
||||
@ -14,8 +14,7 @@ import
|
||||
logos_delivery/waku/[
|
||||
waku_node,
|
||||
waku_core,
|
||||
events/message_events,
|
||||
events/health_events,
|
||||
api/events/health_events,
|
||||
waku_relay/protocol,
|
||||
waku_archive,
|
||||
waku_archive/common as archive_common,
|
||||
|
||||
@ -12,7 +12,6 @@ import
|
||||
logos_delivery/waku/[
|
||||
waku_node,
|
||||
waku_core,
|
||||
events/message_events,
|
||||
waku_relay/protocol,
|
||||
node/waku_node/filter,
|
||||
node/subscription_manager,
|
||||
|
||||
@ -10,7 +10,7 @@ import ../testlib/[common, wakucore, wakunode, testasync]
|
||||
import logos_delivery
|
||||
import logos_delivery/waku/[waku_node, waku_core]
|
||||
import logos_delivery/waku/factory/waku_conf
|
||||
import logos_delivery/waku/events/message_events as waku_message_events
|
||||
import logos_delivery/messaging/api/events as waku_message_events
|
||||
import tools/confutils/cli_args
|
||||
|
||||
import logos_delivery/channels/reliable_channel_manager
|
||||
|
||||
@ -19,8 +19,8 @@ import
|
||||
node/waku_node/store,
|
||||
node/waku_node/lightpush,
|
||||
node/waku_node/filter,
|
||||
events/health_events,
|
||||
events/peer_events,
|
||||
api/events/health_events,
|
||||
api/events/peer_events,
|
||||
waku_archive,
|
||||
]
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user