From 39719e124770d62346ad79cdc1aaad165462e445 Mon Sep 17 00:00:00 2001 From: Darshan <35736874+darshankabariya@users.noreply.github.com> Date: Mon, 6 Apr 2026 15:53:45 +0530 Subject: [PATCH 1/3] increase default timeout to 20s and add debug logging (#3792) --- apps/wakucanary/wakucanary.nim | 40 +++++++++++++++++++++++++++++----- 1 file changed, 34 insertions(+), 6 deletions(-) diff --git a/apps/wakucanary/wakucanary.nim b/apps/wakucanary/wakucanary.nim index bb68f7237..e7b1ff9aa 100644 --- a/apps/wakucanary/wakucanary.nim +++ b/apps/wakucanary/wakucanary.nim @@ -6,12 +6,15 @@ import os import libp2p/protocols/ping, + libp2p/protocols/protocol, libp2p/crypto/[crypto, secp], libp2p/nameresolving/dnsresolver, libp2p/multicodec import ./certsgenerator, - waku/[waku_enr, node/peer_manager, waku_core, waku_node, factory/builder] + waku/[waku_enr, node/peer_manager, waku_core, waku_node, factory/builder], + waku/waku_metadata/protocol, + waku/common/callbacks # protocols and their tag const ProtocolsTable = { @@ -45,7 +48,7 @@ type WakuCanaryConf* = object timeout* {. desc: "Timeout to consider that the connection failed", - defaultValue: chronos.seconds(10), + defaultValue: chronos.seconds(20), name: "timeout", abbr: "t" .}: chronos.Duration @@ -251,12 +254,26 @@ proc main(rng: ref HmacDrbgContext): Future[int] {.async.} = error "failed to mount libp2p ping protocol: " & getCurrentExceptionMsg() quit(QuitFailure) - node.mountMetadata(conf.clusterId, conf.shards).isOkOr: - error "failed to mount metadata protocol", error + # Mount metadata with a custom getter that returns CLI shards directly, + # since the canary doesn't mount relay (which is what the default getter reads from). + # Without this fix, the canary always sends remoteShards=[] in metadata requests. + let cliShards = conf.shards + let shardsGetter: GetShards = proc(): seq[uint16] {.closure, gcsafe, raises: [].} = + return cliShards + + let metadata = WakuMetadata.new(conf.clusterId, shardsGetter) + node.wakuMetadata = metadata + node.peerManager.wakuMetadata = metadata + let mountRes = catch: + node.switch.mount(metadata, protocolMatcher(WakuMetadataCodec)) + mountRes.isOkOr: + error "failed to mount metadata protocol", error = error.msg quit(QuitFailure) await node.start() + debug "Connecting to peer", peer = peer, timeout = conf.timeout + var pingFut: Future[bool] if conf.ping: pingFut = pingNode(node, peer).withTimeout(conf.timeout) @@ -266,8 +283,18 @@ proc main(rng: ref HmacDrbgContext): Future[int] {.async.} = error "Timedout after", timeout = conf.timeout quit(QuitFailure) + # Clean disconnect with defer so the remote node doesn't see + # "Stream Underlying Connection Closed!" when we exit + defer: + debug "Cleanly disconnecting from peer", peerId = peer.peerId + await node.peerManager.disconnectNode(peer.peerId) + await node.stop() + + debug "Connected, checking connection status", peerId = peer.peerId + let lp2pPeerStore = node.switch.peerStore let conStatus = node.peerManager.switch.peerStore[ConnectionBook][peer.peerId] + debug "Connection status", peerId = peer.peerId, conStatus = conStatus var pingSuccess = true if conf.ping: @@ -283,14 +310,15 @@ proc main(rng: ref HmacDrbgContext): Future[int] {.async.} = if conStatus in [Connected, CanConnect]: let nodeProtocols = lp2pPeerStore[ProtoBook][peer.peerId] + debug "Peer protocols", peerId = peer.peerId, protocols = nodeProtocols if not areProtocolsSupported(conf.protocols, nodeProtocols): error "Not all protocols are supported", expected = conf.protocols, supported = nodeProtocols - quit(QuitFailure) + return 1 elif conStatus == CannotConnect: error "Could not connect", peerId = peer.peerId - quit(QuitFailure) + return 1 return 0 when isMainModule: From 56359e49ed60cb78c03f5e9c1519e43fe90deaab Mon Sep 17 00:00:00 2001 From: Fabiana Cecin Date: Mon, 6 Apr 2026 11:08:47 -0300 Subject: [PATCH 2/3] prefer reusing service peers across shards in edge filter reconciliation (#3789) * selectFilterCandidates prefers peers already serving other shards * restructure edgeFilterSubLoop (plan all dials then execute) for safety --- .../delivery_service/subscription_manager.nim | 78 +++++++++++++++---- 1 file changed, 64 insertions(+), 14 deletions(-) diff --git a/waku/node/delivery_service/subscription_manager.nim b/waku/node/delivery_service/subscription_manager.nim index 70d8df7f0..f00d9024c 100644 --- a/waku/node/delivery_service/subscription_manager.nim +++ b/waku/node/delivery_service/subscription_manager.nim @@ -115,7 +115,7 @@ proc subscribePubsubTopics( if isNil(self.node.wakuRelay): return err("subscribePubsubTopics requires a Relay") - var errors: seq[string] = @[] + var errors: seq[string] for shard in shards: if not self.contentTopicSubs.hasKey(shard): @@ -196,6 +196,11 @@ const EdgeFilterLoopInterval = chronos.seconds(30) const EdgeFilterSubLoopDebounce = chronos.seconds(1) ## Debounce delay to coalesce rapid-fire wakeups into a single reconciliation pass. +type EdgeDialTask = object + peer: RemotePeerInfo + shard: PubsubTopic + topics: seq[ContentTopic] + proc updateShardHealth( self: SubscriptionManager, shard: PubsubTopic, state: var EdgeFilterSubState ) = @@ -335,7 +340,7 @@ proc edgeFilterHealthLoop*(self: SubscriptionManager) {.async.} = var alive = initHashSet[PeerId]() if connected.len > 0: - var pingTasks: seq[(PeerId, Future[FilterSubscribeResult])] = @[] + var pingTasks: seq[(PeerId, Future[FilterSubscribeResult])] for peer in connected.values: pingTasks.add( (peer.peerId, self.node.wakuFilterClient.ping(peer, EdgeFilterPingTimeout)) @@ -362,6 +367,36 @@ proc edgeFilterHealthLoop*(self: SubscriptionManager) {.async.} = if changed: self.edgeFilterWakeup.fire() +proc selectFilterCandidates( + self: SubscriptionManager, shard: PubsubTopic, exclude: HashSet[PeerId], needed: int +): seq[RemotePeerInfo] = + ## Select filter service peer candidates for a shard. + + # Start with every filter server peer that can serve the shard + var allCandidates = self.node.peerManager.selectPeers( + filter_common.WakuFilterSubscribeCodec, some(shard) + ) + + # Remove all already used in this shard or being dialed for it + allCandidates.keepItIf(it.peerId notin exclude) + + # Collect peer IDs already tracked on other shards + var trackedOnOther = initHashSet[PeerId]() + for otherShard, otherState in self.edgeFilterSubStates.pairs: + if otherShard != shard: + for peer in otherState.peers: + trackedOnOther.incl(peer.peerId) + + # Prefer peers we already have a connection to first, preserving shuffle + var candidates = + allCandidates.filterIt(it.peerId in trackedOnOther) & + allCandidates.filterIt(it.peerId notin trackedOnOther) + + # We need to return 'needed' peers only + if candidates.len > needed: + candidates.setLen(needed) + return candidates + proc edgeFilterSubLoop*(self: SubscriptionManager) {.async.} = ## Reconciles filter subscriptions with the desired state from SubscriptionManager. var lastSynced = initTable[PubsubTopic, HashSet[ContentTopic]]() @@ -382,6 +417,12 @@ proc edgeFilterSubLoop*(self: SubscriptionManager) {.async.} = let allShards = toHashSet(toSeq(desired.keys)) + toHashSet(toSeq(lastSynced.keys)) + # Step 1: read state across all shards at once and + # create a list of peer dial tasks and shard tracking to delete. + + var dialTasks: seq[EdgeDialTask] + var shardsToDelete: seq[PubsubTopic] + for shard in allShards: let currTopics = desired.getOrDefault(shard) let prevTopics = lastSynced.getOrDefault(shard) @@ -404,11 +445,7 @@ proc edgeFilterSubLoop*(self: SubscriptionManager) {.async.} = asyncSpawn self.syncFilterDeltas(peer, shard, addedTopics, removedTopics) if currTopics.len == 0: - for fut in state.pending: - if not fut.finished: - await fut.cancelAndWait() - self.edgeFilterSubStates.del(shard) - # invalidates `state` — do not use after this + shardsToDelete.add(shard) else: self.updateShardHealth(shard, state[]) @@ -416,11 +453,7 @@ proc edgeFilterSubLoop*(self: SubscriptionManager) {.async.} = if needed > 0: let tracked = state.peers.mapIt(it.peerId).toHashSet() + state.pendingPeers - var candidates = self.node.peerManager.selectPeers( - filter_common.WakuFilterSubscribeCodec, some(shard) - ) - candidates.keepItIf(it.peerId notin tracked) - + let candidates = self.selectFilterCandidates(shard, tracked, needed) let toDial = min(needed, candidates.len) trace "edgeFilterSubLoop: shard reconciliation", @@ -432,8 +465,25 @@ proc edgeFilterSubLoop*(self: SubscriptionManager) {.async.} = toDial = toDial for i in 0 ..< toDial: - let fut = self.dialFilterPeer(candidates[i], shard, toSeq(currTopics)) - state.pending.add(fut) + dialTasks.add( + EdgeDialTask( + peer: candidates[i], shard: shard, topics: toSeq(currTopics) + ) + ) + + # Step 2: execute deferred shard tracking deletion and dial tasks. + + for shard in shardsToDelete: + self.edgeFilterSubStates.withValue(shard, state): + for fut in state.pending: + if not fut.finished: + await fut.cancelAndWait() + self.edgeFilterSubStates.del(shard) + + for task in dialTasks: + let fut = self.dialFilterPeer(task.peer, task.shard, task.topics) + self.edgeFilterSubStates.withValue(task.shard, state): + state.pending.add(fut) lastSynced = desired From 549bf8bc4309aadb30530a181fa1933bced2c9a7 Mon Sep 17 00:00:00 2001 From: Danish Arora <35004822+danisharora099@users.noreply.github.com> Date: Tue, 7 Apr 2026 13:14:32 +0530 Subject: [PATCH 3/3] fix(nix): fetch git submodules automatically via inputs.self (#3738) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The Nix build fails when consumers use `nix build github:logos-messaging/logos-delivery#liblogosdelivery` without appending `?submodules=1` — vendor/nimbus-build-system is missing, causing patchShebangs and substituteInPlace to fail. Two fixes: 1. Add `inputs.self.submodules = true` to flake.nix (Nix >= 2.27) so submodules are fetched automatically without requiring callers to pass `?submodules=1`. 2. Fix the assertion in nix/default.nix: `(src.submodules or true)` always evaluates to true, silently masking the missing-submodules error. Changed to `builtins.pathExists` check on the actual submodule directory so it fails with a helpful message when submodules are genuinely absent. --- flake.nix | 4 ++++ nix/default.nix | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/flake.nix b/flake.nix index ee24c8f13..13ca5e618 100644 --- a/flake.nix +++ b/flake.nix @@ -7,6 +7,10 @@ }; inputs = { + # Ensure Nix fetches git submodules (vendor/*) when evaluating this flake. + # Requires Nix >= 2.27. Consumers no longer need '?submodules=1' in the URL. + self.submodules = true; + # We are pinning the commit because ultimately we want to use same commit across different projects. # A commit from nixpkgs 24.11 release : https://github.com/NixOS/nixpkgs/tree/release-24.11 nixpkgs.url = "github:NixOS/nixpkgs/0ef228213045d2cdb5a169a95d63ded38670b293"; diff --git a/nix/default.nix b/nix/default.nix index 7df58df60..816d0aed8 100644 --- a/nix/default.nix +++ b/nix/default.nix @@ -12,7 +12,7 @@ zerokitRln, }: -assert pkgs.lib.assertMsg ((src.submodules or true) == true) +assert pkgs.lib.assertMsg (builtins.pathExists "${src}/vendor/nimbus-build-system/scripts") "Unable to build without submodules. Append '?submodules=1#' to the URI."; let