NagyZoltanPeter a7f893555d
Integrate api-shape phase2 (#3989) + api interfaces (#3975) (#3999)
* Reshape per-layer API into api/ folders and thin the FFI over them

Each layer now separates its constructible core from its public surface:

  - core module (waku.nim / messaging_client.nim /
    reliable_channel_manager.nim): the type plus new/start/stop and the
    private construction helpers.
  - api/ folder: one module per differentiated set of operations
    (waku: topics/relay/filter/lightpush/store/peer_manager/discovery/
    debug/health) plus an events surface.

The waku api is reshaped to be the complete operation surface the C
bindings need, so the library no longer reaches into node internals:
relayPublish returns the message hash, relaySubscribe takes an optional
handler, filter/lightpush auto-select the service peer, connectedPeersInfo
returns structured data, pingPeer honours the timeout, plus
relayNumPeersInMesh / relayNumConnectedPeers / isOnline. library/ is now a
thin C-ABI shim: each {.ffi.} proc only marshals cstring/JSON/callbacks and
delegates to ctx.myLib[].waku.<op> (or messagingClient.<op>).
app_callbacks re-exports the modules defining its handler types, which the
included FFI files previously relied on by leakage.

Events move next to the surface that owns them, with each dependency kept
pointing the right way:

  - waku/events/ relocated under waku/api/events/.
  - channel events live in channels/api/events.nim.
  - the four messaging-level message events move to messaging/api/events;
    MessageSeenEvent stays in waku because it is emitted by waku core, so
    moving it would make waku depend on the messaging layer.
  - delivery_events renamed to filter_subscribe_events to match the
    OnFilterSubscribe/Unsubscribe events it actually declares.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* Add reliable-channel FFI ops + events (nim-ffi v0.1.3)

Expose the reliable-channel layer through the v0.1.3 FFI:
- channel_create / channel_send / channel_close call the
  ReliableChannelManager api (createReliableChannel / send / closeChannel),
  marshalling channel id + base64 payload + ephemeral by hand
- channel message received / sent / errored are surfaced by listening to the
  channel-layer broker events in start_node and forwarding them through
  callEventCallback (received payload base64-encoded), dropped in stop_node

Stays on nim-ffi v0.1.3 (no typed/CBOR rewrite).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* Expose reliable-channel ops in the stable C header (#3851)

The library already ships as a single .so with a tiered header surface
(liblogosdelivery.h = stable Messaging/Reliable-Channels, liblogosdelivery_kernel.h
= advanced Kernel). Per that tiering, the reliable-channel ops belong on the
stable surface, so declare channel_create / channel_send / channel_close in
liblogosdelivery.h and document the channel lifecycle events delivered through
the event callback.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* Graft PR#3975 interface layer onto decomposed foundation (events deduped)

Add IKernel/IMessagingClient/IReliableChannelManager/ILogosDelivery interface
classes under logos_delivery/api/. The EventBroker types PR#3975 hoisted into
these files already exist in PR#3989's decomposed */api/events/ modules, so the
interface files re-export those modules instead of redefining the types
(avoids 8 duplicate EventBroker definitions). api/types.nim kept at the
foundation version (ChannelId stays in channels/types.nim, which the decomposed
modules import).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* Wire impl classes to interfaces (inherit; relocate SendHandler)

- Waku : IKernel, MessagingClient : IMessagingClient,
  ReliableChannelManager : IReliableChannelManager.
- The operation procs already live in PR#3989's decomposed */api/ modules and
  stay as plain procs (nothing dispatches through the interface types, so no
  method-ization is needed).
- SendHandler now lives in reliable_channel_manager_api.nim (its PR#3975 home);
  removed the duplicate from reliable_channel.nim, which re-exports the
  interface module so channels/api/{channel_lifecycle,send} still see it.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* Wire LogosDelivery to ILogosDelivery orchestrator interface

LogosDelivery : ILogosDelivery; start/stop/isOnline become method overrides.
Peripheral PR#3975 edits (lightpush/store clients, self_req_handlers,
statistics) are import-reorg artifacts of deleting waku/utils/requests.nim,
which the decomposed structure keeps -- so they are intentionally not ported.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* Dedup EventConnectionStatusChange (re-export from health_events)

9th duplicate EventBroker type: defined in both logos_delivery_api.nim and the
decomposed waku/api/events/health_events.nim. The interface file now re-exports
it. liblogosdelivery builds clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* Move events back into interface-class source files (restore #3975 placement)

Reverses the earlier dedup-by-re-export: event TYPE definitions now live in the
interface classes, and the emptied decomposed event files are removed.

- MessageSeenEvent            -> logos_delivery/api/kernel_api.nim
- Message{Sent,Error,Propagated,Received}Event -> api/messaging_client_api.nim
- ChannelMessage{Received,Sent,Error}Event     -> api/reliable_channel_manager_api.nim
- EventConnectionStatusChange -> api/logos_delivery_api.nim

Deleted (became empty after the move):
- logos_delivery/waku/api/events/message_events.nim
- logos_delivery/messaging/api/events.nim
- logos_delivery/channels/api/events.nim
health_events.nim keeps its two remaining events (content/shard topic health).

Rewiring: each layer re-exports its interface module (waku->kernel_api,
messaging_client->messaging_client_api, reliable_channel->reliable_channel_manager_api,
which also re-exports messaging_client_api). Deep emitters/listeners
(subscription_manager, waku_node, waku_node/relay, node_health_monitor,
recv_service, send_service) import the owning interface module directly.
kernel_api stays below node level (types/topics/message/store-common) so the
node->kernel_api imports are acyclic. liblogosdelivery builds.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* nph formatting

---------

Co-authored-by: Ivan FB <ivansete@status.im>
Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-30 11:51:22 +02:00

697 lines
23 KiB
Nim
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logos_delivery/waku/compat/option_valueor
{.push raises: [].}
import
std/[options, tables, strutils, sequtils, os, net, random, sets],
chronos,
chronicles,
metrics,
results,
eth/keys,
nimcrypto,
bearssl/rand,
stew/byteutils,
eth/p2p/discoveryv5/enr,
libp2p/crypto/crypto,
libp2p/crypto/curve25519,
libp2p/[multiaddress, multicodec],
libp2p/protocols/ping,
libp2p/protocols/pubsub/gossipsub,
libp2p/protocols/pubsub/rpc/messages,
libp2p/builders,
libp2p/transports/transport,
libp2p/transports/tcptransport,
libp2p/transports/wstransport,
libp2p/utility,
libp2p/utils/offsettedseq,
libp2p_mix,
libp2p_mix/mix_protocol,
brokers/broker_context,
brokers/request_broker
import
logos_delivery/waku/[
waku_core,
waku_core/topics/sharding,
waku_relay,
waku_archive,
waku_store/protocol as store,
waku_store/client as store_client,
waku_store/common as store_common,
waku_store/resume,
waku_store_sync,
waku_filter_v2,
waku_filter_v2/client as filter_client,
waku_metadata,
waku_rendezvous/protocol,
waku_rendezvous/client as rendezvous_client,
waku_rendezvous/waku_peer_record,
waku_lightpush_legacy/client as legacy_ligntpuhs_client,
waku_lightpush_legacy as legacy_lightpush_protocol,
waku_lightpush/client as ligntpuhs_client,
waku_lightpush as lightpush_protocol,
waku_enr,
waku_peer_exchange,
rln,
common/rate_limit/setting,
common/callbacks,
common/nimchronos,
waku_mix,
requests/node_requests,
requests/health_requests,
api/events/health_events,
api/events/peer_events,
],
logos_delivery/api/kernel_api, # MessageSeenEvent
logos_delivery/waku/discovery/waku_kademlia,
logos_delivery/waku/net/[bound_ports, net_config],
./peer_manager,
./health_monitor/health_status,
./health_monitor/topic_health,
./node_telemetry,
./shard_subscription,
./edge_filter_sub_state
export shard_subscription, edge_filter_sub_state
logScope:
topics = "waku node"
# randomize initializes sdt/random's random number generator
# if not called, the outcome of randomization procedures will be the same in every run
randomize()
# TODO: Move to application instance (e.g., `WakuNode2`)
# Git version in git describe format (defined compile time)
const git_version* {.strdefine.} = "n/a"
# Default clientId
const clientId* = "Nimbus Waku v2 node"
const WakuNodeVersionString* = "version / git commit hash: " & git_version
type
# TODO: Move to application instance (e.g., `WakuNode2`)
WakuInfo* = object # NOTE One for simplicity, can extend later as needed
listenAddresses*: seq[string]
enrUri*: string #multiaddrStrings*: seq[string]
mixPubKey*: Option[string]
# NOTE based on Eth2Node in NBC eth2_network.nim
WakuNode* = ref object
peerManager*: PeerManager
switch*: Switch
wakuRelay*: WakuRelay
wakuArchive*: waku_archive.WakuArchive
wakuStore*: store.WakuStore
wakuStoreClient*: store_client.WakuStoreClient
wakuStoreResume*: StoreResume
wakuStoreReconciliation*: SyncReconciliation
wakuStoreTransfer*: SyncTransfer
wakuFilter*: waku_filter_v2.WakuFilter
wakuFilterClient*: filter_client.WakuFilterClient
rln*: Rln
wakuLegacyLightPush*: WakuLegacyLightPush
wakuLegacyLightpushClient*: WakuLegacyLightPushClient
wakuLightPush*: WakuLightPush
wakuLightpushClient*: WakuLightPushClient
wakuPeerExchange*: WakuPeerExchange
wakuPeerExchangeClient*: WakuPeerExchangeClient
wakuMetadata*: WakuMetadata
wakuAutoSharding*: Option[Sharding]
enr*: enr.Record
libp2pPing*: Ping
rng*: crypto.Rng
brokerCtx*: BrokerContext
wakuRendezvous*: WakuRendezVous
wakuRendezvousClient*: rendezvous_client.WakuRendezVousClient
announcedAddresses*: seq[MultiAddress]
extMultiAddrsOnly*: bool # When true, skip automatic IP address replacement
started*: bool # Indicates that node has started listening
topicSubscriptionQueue*: AsyncEventQueue[SubscriptionEvent]
rateLimitSettings*: ProtocolRateLimitSettings
legacyAppHandlers*: Table[PubsubTopic, WakuRelayHandler]
## Kernel API Relay appHandlers (if any)
subscriptionManager*: SubscriptionManager
wakuMix*: WakuMix
wakuKademlia*: WakuKademlia
ports*: BoundPorts
relayReconnectFut*: Future[void]
SubscriptionManager* = ref object of RootObj
node*: WakuNode
shards*: Table[PubsubTopic, ShardSubscription]
edgeFilterSubStates*: Table[PubsubTopic, EdgeFilterSubState]
edgeFilterWakeup*: AsyncEvent
edgeFilterSubLoopFut*: Future[void]
edgeFilterConnectionLoopFut*: Future[void]
peerEventListener*: WakuPeerEventListener
ownsEdgeShardHealthProvider*: bool
ownsEdgeFilterPeerCountProvider*: bool
import ./subscription_manager
proc deduceRelayShard(
node: WakuNode,
contentTopic: ContentTopic,
pubsubTopicOp: Option[PubsubTopic] = none[PubsubTopic](),
): Result[RelayShard, string] =
let pubsubTopic = pubsubTopicOp.valueOr:
if node.wakuAutoSharding.isNone():
return err("Pubsub topic must be specified when static sharding is enabled.")
let shard = node.wakuAutoSharding.get().getShard(contentTopic).valueOr:
let msg = "Deducing shard failed: " & error
return err(msg)
return ok(shard)
let shard = RelayShard.parse(pubsubTopic).valueOr:
return err("Invalid topic:" & pubsubTopic & " " & $error)
return ok(shard)
proc getShardsGetter(node: WakuNode, configuredShards: seq[uint16]): GetShards =
return proc(): seq[uint16] {.closure, gcsafe, raises: [].} =
# fetch pubsubTopics subscribed to relay and convert them to shards
if node.wakuRelay.isNil():
# If relay is not mounted, return configured shards
return configuredShards
let subscribedTopics = node.wakuRelay.subscribedTopics()
# If relay hasn't subscribed to any topics yet, return configured shards
if subscribedTopics.len == 0:
return configuredShards
let relayShards = topicsToRelayShards(subscribedTopics).valueOr:
error "could not convert relay topics to shards",
error = $error, topics = subscribedTopics
# Fall back to configured shards on error
return configuredShards
if relayShards.isSome():
let shards = relayShards.get().shardIds
return shards
return configuredShards
proc getCapabilitiesGetter(node: WakuNode): GetCapabilities =
return proc(): seq[Capabilities] {.closure, gcsafe, raises: [].} =
if node.wakuRelay.isNil():
return @[]
return node.enr.getCapabilities()
proc getWakuPeerRecordGetter(node: WakuNode): GetWakuPeerRecord =
return proc(): WakuPeerRecord {.closure, gcsafe, raises: [].} =
var mixKey: string
if not node.wakuMix.isNil():
mixKey = node.wakuMix.pubKey.to0xHex()
return WakuPeerRecord.init(
peerId = node.switch.peerInfo.peerId,
addresses = node.announcedAddresses,
mixKey = mixKey,
)
proc new*(
T: type WakuNode,
netConfig: NetConfig,
enr: enr.Record,
switch: Switch,
peerManager: PeerManager,
rateLimitSettings: ProtocolRateLimitSettings = DefaultProtocolRateLimit,
# TODO: make this argument required after tests are updated
rng: crypto.Rng = crypto.newRng(),
): T {.raises: [Defect, LPError, IOError, TLSStreamProtocolError].} =
## Creates a Waku Node instance.
info "Initializing networking", addrs = $netConfig.announcedAddresses
let brokerCtx = globalBrokerContext()
let queue = newAsyncEventQueue[SubscriptionEvent](0)
let node = WakuNode(
peerManager: peerManager,
switch: switch,
rng: rng,
brokerCtx: brokerCtx,
enr: enr,
announcedAddresses: netConfig.announcedAddresses,
topicSubscriptionQueue: queue,
rateLimitSettings: rateLimitSettings,
ports: BoundPorts.init(),
)
peerManager.setShardGetter(node.getShardsGetter(@[]))
node.subscriptionManager = SubscriptionManager.new(node)
return node
proc peerInfo*(node: WakuNode): PeerInfo =
node.switch.peerInfo
proc peerId*(node: WakuNode): PeerId =
node.peerInfo.peerId
# TODO: Move to application instance (e.g., `WakuNode2`)
# TODO: Extend with more relevant info: topics, peers, memory usage, online time, etc
proc info*(node: WakuNode): WakuInfo =
## Returns information about the Node, such as what multiaddress it can be reached at.
let peerInfo = node.switch.peerInfo
var listenStr: seq[string]
for address in node.announcedAddresses:
var fulladdr = $address & "/p2p/" & $peerInfo.peerId
listenStr &= fulladdr
let enrUri = node.enr.toUri()
var wakuInfo = WakuInfo(listenAddresses: listenStr, enrUri: enrUri)
if not node.wakuMix.isNil():
let keyStr = node.wakuMix.pubKey.to0xHex()
wakuInfo.mixPubKey = some(keyStr)
info "node info", wakuInfo
return wakuInfo
proc connectToNodes*(
node: WakuNode, nodes: seq[RemotePeerInfo] | seq[string], source = "api"
) {.async.} =
## `source` indicates source of node addrs (static config, api call, discovery, etc)
# NOTE Connects to the node without a give protocol, which automatically creates streams for relay
await peer_manager.connectToNodes(node.peerManager, nodes, source = source)
proc disconnectNode*(node: WakuNode, remotePeer: RemotePeerInfo) {.async.} =
await peer_manager.disconnectNode(node.peerManager, remotePeer)
proc mountMetadata*(
node: WakuNode, clusterId: uint32, shards: seq[uint16]
): Result[void, string] =
if not node.wakuMetadata.isNil():
return err("Waku metadata already mounted, skipping")
let metadata = WakuMetadata.new(clusterId, node.getShardsGetter(shards))
node.wakuMetadata = metadata
node.peerManager.wakuMetadata = metadata
let catchRes = catch:
node.switch.mount(node.wakuMetadata, protocolMatcher(WakuMetadataCodec))
catchRes.isOkOr:
return err(error.msg)
return ok()
## Waku AutoSharding
proc mountAutoSharding*(
node: WakuNode, clusterId: uint16, shardCount: uint32
): Result[void, string] =
info "mounting auto sharding", clusterId = clusterId, shardCount = shardCount
node.wakuAutoSharding =
some(Sharding(clusterId: clusterId, shardCountGenZero: shardCount))
return ok()
proc getMixNodePoolSize*(node: WakuNode): int =
return node.wakuMix.poolSize()
proc mountMix*(
node: WakuNode,
clusterId: uint16,
mixPrivKey: Curve25519Key,
mixnodes: seq[MixNodePubInfo],
): Future[Result[void, string]] {.async.} =
info "mounting mix protocol", nodeId = node.info #TODO log the config used
if node.announcedAddresses.len == 0:
return err("Trying to mount mix without having announced addresses")
let localaddrStr = node.announcedAddresses[0].toString().valueOr:
return err("Failed to convert multiaddress to string.")
info "local addr", localaddr = localaddrStr
node.wakuMix = WakuMix.new(
localaddrStr, node.peerManager, clusterId, mixPrivKey, mixnodes
).valueOr:
error "Waku Mix protocol initialization failed", err = error
return
#TODO: should we do the below only for exit node? Also, what if multiple protocols use mix?
node.wakuMix.registerDestReadBehavior(WakuLightPushCodec, readLp(int(-1)))
let catchRes = catch:
node.switch.mount(node.wakuMix)
catchRes.isOkOr:
return err(error.msg)
return ok()
proc mountKademlia*(
node: WakuNode, config: KademliaDiscoveryConf
): Result[void, string] =
if not node.wakuKademlia.isNil():
return err("WakuKademlia already mounted, skipping")
let wk = WakuKademlia.new(
node.switch, node.peerManager, config.bootstrapNodes, config.servicesToAdvertise,
config.servicesToDiscover, config.randomLookupInterval,
config.serviceLookupInterval, node.rng, config.kadDhtConfig, config.discoConfig,
config.clientMode, config.xprPublishing,
).valueOr:
return err("failed to create service discovery: " & error)
node.wakuKademlia = wk
let mountRes = catch:
node.switch.mount(wk.protocol)
mountRes.isOkOr:
return err("failed to mount service discovery: " & error.msg)
return ok()
## Waku Sync
proc mountStoreSync*(
node: WakuNode,
cluster: uint16,
shards: seq[uint16],
contentTopics: seq[string],
storeSyncRange: uint32,
storeSyncInterval: uint32,
storeSyncRelayJitter: uint32,
): Future[Result[void, string]] {.async.} =
let idsChannel = newAsyncQueue[(SyncID, PubsubTopic, ContentTopic)](0)
let wantsChannel = newAsyncQueue[(PeerId)](0)
let needsChannel = newAsyncQueue[(PeerId, WakuMessageHash)](0)
let pubsubTopics = shards.mapIt($RelayShard(clusterId: cluster, shardId: it))
let recon = ?await SyncReconciliation.new(
pubsubTopics, contentTopics, node.peerManager, node.wakuArchive,
storeSyncRange.seconds, storeSyncInterval.seconds, storeSyncRelayJitter.seconds,
idsChannel, wantsChannel, needsChannel,
)
node.wakuStoreReconciliation = recon
let reconMountRes = catch:
node.switch.mount(
node.wakuStoreReconciliation, protocolMatcher(WakuReconciliationCodec)
)
reconMountRes.isOkOr:
return err(error.msg)
let transfer = SyncTransfer.new(
node.peerManager, node.wakuArchive, idsChannel, wantsChannel, needsChannel
)
node.wakuStoreTransfer = transfer
let transMountRes = catch:
node.switch.mount(node.wakuStoreTransfer, protocolMatcher(WakuTransferCodec))
transMountRes.isOkOr:
return err(error.msg)
return ok()
proc reconnectRelayPeers*(node: WakuNode) {.async.} =
## Reconnect to previously-seen WakuRelay peers.
if node.wakuRelay.isNil():
return
if not node.peerManager.switch.peerStore.hasPeers(protocolMatcher(WakuRelayCodec)):
return
info "Found previous WakuRelay peers. Reconnecting."
let backoffPeriod =
node.wakuRelay.parameters.pruneBackoff + chronos.seconds(BackoffSlackTime)
await node.peerManager.reconnectPeers(WakuRelayCodec, backoffPeriod)
proc selectRandomPeers*(peers: seq[PeerId], numRandomPeers: int): seq[PeerId] =
var randomPeers = peers
shuffle(randomPeers)
return randomPeers[0 ..< min(len(randomPeers), numRandomPeers)]
proc mountRendezvousClient*(node: WakuNode, clusterId: uint16) {.async: (raises: []).} =
info "mounting rendezvous client"
node.wakuRendezvousClient = rendezvous_client.WakuRendezVousClient.new(
node.switch, node.peerManager, clusterId
).valueOr:
error "initializing waku rendezvous client failed", error = error
return
if node.started:
await node.wakuRendezvousClient.start()
proc mountRendezvous*(
node: WakuNode, clusterId: uint16, shards: seq[RelayShard] = @[]
) {.async: (raises: []).} =
info "mounting rendezvous discovery protocol"
let configuredShards = shards.mapIt(it.shardId)
node.wakuRendezvous = WakuRendezVous.new(
node.switch,
node.peerManager,
clusterId,
node.getShardsGetter(configuredShards),
node.getCapabilitiesGetter(),
node.getWakuPeerRecordGetter(),
).valueOr:
error "initializing waku rendezvous failed", error = error
return
if node.started:
try:
await node.wakuRendezvous.start()
except CancelledError as exc:
error "failed to start wakuRendezvous", error = exc.msg
try:
node.switch.mount(node.wakuRendezvous, protocolMatcher(WakuRendezVousCodec))
except LPError:
error "failed to mount wakuRendezvous", error = getCurrentExceptionMsg()
proc isBindIpWithZeroPort(inputMultiAdd: MultiAddress): bool =
let inputStr = $inputMultiAdd
if inputStr.contains("0.0.0.0/tcp/0") or inputStr.contains("127.0.0.1/tcp/0"):
return true
return false
proc updateAnnouncedAddrWithPrimaryIpAddr*(node: WakuNode): Result[void, string] =
# Skip automatic IP replacement if extMultiAddrsOnly is set
# This respects the user's explicitly configured announced addresses
if node.extMultiAddrsOnly:
return ok()
let peerInfo = node.switch.peerInfo
var announcedStr = ""
var listenStr = ""
var localIp = "0.0.0.0"
try:
localIp = $getPrimaryIPAddr()
except Exception as e:
warn "Could not retrieve localIp", msg = e.msg
info "PeerInfo", peerId = peerInfo.peerId, addrs = peerInfo.addrs
## Update the WakuNode addresses
var newAnnouncedAddresses = newSeq[MultiAddress](0)
for address in node.announcedAddresses:
## Replace "0.0.0.0" or "127.0.0.1" with the localIp
let newAddr = ($address).replace("0.0.0.0", localIp).replace("127.0.0.1", localIp)
let fulladdr = "[" & $newAddr & "/p2p/" & $peerInfo.peerId & "]"
announcedStr &= fulladdr
let newMultiAddr = MultiAddress.init(newAddr).valueOr:
return err("error in updateAnnouncedAddrWithPrimaryIpAddr: " & $error)
newAnnouncedAddresses.add(newMultiAddr)
node.announcedAddresses = newAnnouncedAddresses
## Update the Switch addresses
node.switch.peerInfo.addrs = newAnnouncedAddresses
for transport in node.switch.transports:
for address in transport.addrs:
let fulladdr = "[" & $address & "/p2p/" & $peerInfo.peerId & "]"
listenStr &= fulladdr
info "Listening on",
full = listenStr, localIp = localIp, switchAddress = $(node.switch.peerInfo.addrs)
info "Announcing addresses", full = announcedStr
info "DNS: discoverable ENR ", enr = node.enr.toUri()
return ok()
proc startProvidersAndListeners*(node: WakuNode) =
RequestRelayShard.setProvider(
node.brokerCtx,
proc(
pubsubTopic: Option[PubsubTopic], contentTopic: ContentTopic
): Result[RequestRelayShard, string] =
let shard = node.deduceRelayShard(contentTopic, pubsubTopic).valueOr:
return err($error)
return ok(RequestRelayShard(relayShard: shard)),
).isOkOr:
error "Can't set provider for RequestRelayShard", error = error
RequestShardTopicsHealth.setProvider(
node.brokerCtx,
proc(topics: seq[PubsubTopic]): Result[RequestShardTopicsHealth, string] =
var response: RequestShardTopicsHealth
for shard in topics:
# Health resolution order:
# 1. Relay topicsHealth (computed from gossipsub mesh state)
# 2. If relay is active but topicsHealth hasn't computed yet, UNHEALTHY
# 3. Otherwise, ask edge filter (via broker; no-op if no provider set)
var healthStatus = TopicHealth.NOT_SUBSCRIBED
if not node.wakuRelay.isNil:
healthStatus =
node.wakuRelay.topicsHealth.getOrDefault(shard, TopicHealth.NOT_SUBSCRIBED)
if healthStatus == TopicHealth.NOT_SUBSCRIBED:
if not node.wakuRelay.isNil and node.wakuRelay.isSubscribed(shard):
healthStatus = TopicHealth.UNHEALTHY
else:
let edgeRes = RequestEdgeShardHealth.request(node.brokerCtx, shard)
if edgeRes.isOk():
healthStatus = edgeRes.get().health
response.topicHealth.add((shard, healthStatus))
return ok(response),
).isOkOr:
error "Can't set provider for RequestShardTopicsHealth", error = error
RequestContentTopicsHealth.setProvider(
node.brokerCtx,
proc(topics: seq[ContentTopic]): Result[RequestContentTopicsHealth, string] =
var response: RequestContentTopicsHealth
for contentTopic in topics:
var topicHealth = TopicHealth.NOT_SUBSCRIBED
let shardResult = node.deduceRelayShard(contentTopic, none[PubsubTopic]())
if shardResult.isOk():
let shardObj = shardResult.get()
let pubsubTopic = $shardObj
if not isNil(node.wakuRelay):
topicHealth = node.wakuRelay.topicsHealth.getOrDefault(
pubsubTopic, TopicHealth.NOT_SUBSCRIBED
)
if topicHealth == TopicHealth.NOT_SUBSCRIBED:
let edgeRes = RequestEdgeShardHealth.request(node.brokerCtx, pubsubTopic)
if edgeRes.isOk():
topicHealth = edgeRes.get().health
response.contentTopicHealth.add((topic: contentTopic, health: topicHealth))
return ok(response),
).isOkOr:
error "Can't set provider for RequestContentTopicsHealth", error = error
proc stopProvidersAndListeners*(node: WakuNode) =
RequestRelayShard.clearProvider(node.brokerCtx)
RequestContentTopicsHealth.clearProvider(node.brokerCtx)
RequestShardTopicsHealth.clearProvider(node.brokerCtx)
proc start*(node: WakuNode) {.async.} =
## Starts a created Waku Node and
## all its mounted protocols.
waku_version.set(1, labelValues = [git_version])
info "Starting Waku node", version = git_version
var zeroPortPresent = false
for address in node.announcedAddresses:
if isBindIpWithZeroPort(address):
zeroPortPresent = true
if not node.wakuStoreResume.isNil():
await node.wakuStoreResume.start()
if not node.wakuRendezvousClient.isNil():
await node.wakuRendezvousClient.start()
## The switch uses this mapper to update peer info addrs
## with announced addrs after start
let addressMapper = proc(
listenAddrs: seq[MultiAddress]
): Future[seq[MultiAddress]] {.gcsafe, async: (raises: [CancelledError]).} =
return node.announcedAddresses
node.switch.peerInfo.addressMappers.add(addressMapper)
## The switch will update addresses after start using the addressMapper
## NOTE: This will dispatch gossipsub start to the WakuRelay.start method override
await node.switch.start()
# Reconnect to known relay peers in the background; it waits a prune backoff
# and must not block startup.
node.relayReconnectFut = node.reconnectRelayPeers()
node.started = true
if not node.wakuKademlia.isNil():
await node.wakuKademlia.start()
if not node.wakuFilterClient.isNil():
node.wakuFilterClient.registerPushHandler(
proc(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
MessageSeenEvent.emit(node.brokerCtx, pubsubTopic, msg)
)
node.startProvidersAndListeners()
node.subscriptionManager.start().isOkOr:
error "failed to start subscription manager", error = error
if not zeroPortPresent:
updateAnnouncedAddrWithPrimaryIpAddr(node).isOkOr:
error "failed update announced addr", error = $error
else:
info "Listening port is dynamically allocated, address and ENR generation postponed"
info "Node started successfully"
proc stop*(node: WakuNode) {.async.} =
## By stopping the switch we are stopping all the underlying mounted protocols
# Cancel the background relay reconnection (may still be in its backoff wait).
if not node.relayReconnectFut.isNil():
await node.relayReconnectFut.cancelAndWait()
await node.subscriptionManager.stop()
node.stopProvidersAndListeners()
if not node.wakuKademlia.isNil():
await node.wakuKademlia.stop()
## NOTE: This will dispatch gossipsub stop to the WakuRelay.stop method override
await node.switch.stop()
node.peerManager.stop()
if not node.rln.isNil():
try:
await node.rln.stop() ## this can raise an exception
except Exception:
error "exception stopping the node", error = getCurrentExceptionMsg()
if not node.wakuArchive.isNil():
await node.wakuArchive.stopWait()
if not node.wakuStoreResume.isNil():
await node.wakuStoreResume.stopWait()
if not node.wakuPeerExchangeClient.isNil() and
not node.wakuPeerExchangeClient.pxLoopHandle.isNil():
await node.wakuPeerExchangeClient.pxLoopHandle.cancelAndWait()
if not node.wakuRendezvousClient.isNil():
await node.wakuRendezvousClient.stopWait()
node.started = false
proc isReady*(node: WakuNode): Future[bool] {.async: (raises: [Exception]).} =
if node.rln == nil:
return true
return await node.rln.isReady()
## TODO: add other protocol `isReady` checks