1257 lines
43 KiB
Nim
Raw Normal View History

{.push raises: [].}
import
2026-03-17 10:15:35 -03:00
std/[
options, sets, sequtils, times, strformat, strutils, math, random, tables, algorithm
],
chronos,
chronicles,
metrics,
libp2p/[multistream, muxers/muxer, nameresolving/nameresolver, peerstore],
feat: persistency (#3880) * persistency: per-job SQLite-backed storage layer (singleton, brokered) Adds a backend-neutral CRUD library at waku/persistency/, plus the nim-brokers dependency swap that enables it. Architecture (ports-and-adapters): * Persistency: process-wide singleton, one root directory. * Job: one tenant, one DB file, one worker thread, one BrokerContext. * Backend: SQLite via waku/common/databases/db_sqlite. Uniform schema kv(category BLOB, key BLOB, payload BLOB) PRIMARY KEY (category, key) WITHOUT ROWID, WAL mode. * Writes are fire-and-forget via EventBroker(mt) PersistEvent. * Reads are async via five RequestBroker(mt) shapes (KvGet, KvExists, KvScan, KvCount, KvDelete). Reads return Result[T, PersistencyError]. * One storage thread per job; tenants isolated by BrokerContext. Public surface (waku/persistency/persistency.nim): Persistency.instance(rootDir) / Persistency.instance() / Persistency.reset() p.openJob(id) / p.closeJob(id) / p.dropJob(id) / p.close() p.job(id) / p[id] / p.hasJob(id) Writes (Job form & string-id form, fire-and-forget): persist / persistPut / persistDelete / persistEncoded Reads (Job form & string-id form, async Result): get / exists / scan / scanPrefix / count / deleteAcked Key & payload encoding (keys.nim, payload.nim): * encodePart family + variadic key(...) / payload(...) macros + single-value toKey / toPayload. * Primitives: string and openArray[byte] are 2-byte BE length + bytes; int{8..64} are sign-flipped 8-byte BE; uint{16..64} are 8-byte BE; bool/byte/char are 1 byte; enums are int64(ord(v)). * Generic encodePart[T: tuple | object] recurses through fields() so any composite Nim type is encodable without ceremony. * Stable across Nim/C compiler upgrades: no sizeof, no memcpy, no cast on pointers, no host-endianness dependency. * `rawKey(bytes)` + `persistPut(..., openArray[byte])` let callers bypass the built-in encoder with their own format (CBOR, protobuf...). Lifecycle: * Persistency.new is private; Persistency.instance is the only public constructor. Same rootDir is idempotent; conflicting rootDir is peInvalidArgument. Persistency.reset for test/restart paths. * openJob opens-or-creates the per-job SQLite file; an existing file is reused with its data preserved. * Teardown integration: Persistency.instance registers a Teardown MultiRequestBroker provider that closes all jobs and clears the singleton slot when Waku.stop() issues Teardown.request. Internal layering: types.nim pure value types (Key, KeyRange, KvRow, TxOp, PersistencyError) keys.nim encodePart primitives + key(...) macro payload.nim toPayload + payload(...) macro schema.nim CREATE TABLE + connection pragmas + user_version backend_sqlite.nim KvBackend, applyOps (single source of write SQL), getOne/existsOne/deleteOne, scanRange (asc/desc, half-open ranges, open-ended stop), countRange backend_comm.nim EventBroker(mt) PersistEvent + 5 RequestBroker(mt) declarations; encodeErr/decodeErr boundary helpers backend_thread.nim startStorageThread / stopStorageThread (shared allocShared0 arg, cstring dbPath, atomic ready/shutdown flags); per-thread provider registration persistency.nim Persistency + Job types, singleton state, public facade ../requests/lifecycle_requests.nim Teardown MultiRequestBroker Tests (69 cases, all passing): test_keys.nim sort-order invariants (length-prefix strings, sign-flipped ints, composite tuples, prefix range) test_backend.nim round-trip / replace / delete-return-value / batched atomicity / asc-desc-half-open-open- ended scans / category isolation / batch txDelete test_lifecycle.nim open-or-create rootDir / non-dir collision / reopen across sessions / idempotent openJob / two-tenant parallel isolation / closeJob joins worker / dropJob removes file / acked delete test_facade.nim put-then-get / atomic batch / scanPrefix asc/desc / deleteAcked hit-miss / fire-and-forget delete / two-tenant facade isolation test_encoding.nim tuple/named-tuple/object keys, embedded Key, enum encoding, field-major composite sort, payload struct encoding, end-to-end struct round-trip through SQLite test_string_lookup.nim peJobNotFound semantics / hasJob / subscript / persistPut+get via id / reads short-circuit / writes drop+warn / persistEncoded via id / scan parity Job-ref vs id test_singleton.nim idempotent same-rootDir / different-rootDir rejection / no-arg instance lifecycle / reset retargets / reset idempotence / Teardown.request end-to-end Prerequisite delivered in the same series: replace the in-tree broker implementation with the external nim-brokers package; update all broker call-sites (waku_filter_v2, waku_relay, waku_rln_relay, delivery_service, peer_manager, requests/*, factory/*, api tests, etc.) to the new package API; chat2 made to compile again. Note: SDS adapter (Phase 5 of the design) is deferred -- nim-sds is still developed side-by-side and the persistency layer is intentionally SDS-agnostic. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * persistency: pin nim-brokers by URL+commit (workaround for stale registry) The bare `brokers >= 2.0.1` form cannot resolve on machines where the local nimble SAT solver enumerates only the registry-recorded 0.1.0 for brokers. The nim-lang/packages entry for `brokers` carries no per-tag metadata (only the URL), so until that registry entry is refreshed the SAT solver clamps the available-versions list to 0.1.0 and rejects the >= 2.0.1 constraint -- even though pkgs2 and pkgcache both have v2.0.1 cloned locally. Pinning by URL+commit bypasses the registry path entirely. Inline comment in waku.nimble documents the situation and the path back to the bare form once nim-lang/packages is updated. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * persistency: nph format pass Run `nph` on all 57 Nim files touched by this PR. Pure formatting: 17 files re-styled, no semantic change. Suite still 69/69. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * Fix build, add local-storage-path config, lazy init of Persistency from Waku start * fix: fix nix deps * fixes for nix build, regenerate deps * reverting accidental dependency changes * Fixing deps * Apply suggestions from code review Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> * persistency tests: migrate to suite / asyncTest / await Match the in-tree test convention (procSuite -> suite, sync test + waitFor -> asyncTest + await): - procSuite "X": -> suite "X": - For tests doing async work: test -> asyncTest, waitFor -> await. - Poll helpers (proc waitFor(t: Job, ...) in test_lifecycle.nim, proc waitUntilExists(...) in test_facade.nim and test_string_lookup.nim) -> Future[bool] {.async.}, internal `waitFor X` -> `await X`, internal `sleep(N)` -> `await sleepAsync(chronos.milliseconds(N))`. - Renamed test_lifecycle.nim's helper proc from `waitFor(t: Job, ...)` -> `pollExists(t: Job, ...)`; the previous name shadowed chronos.waitFor in the chronos macro expansion. - `chronos.milliseconds(N)` explicitly qualified because `std/times` also exports `milliseconds` (returning TimeInterval, not Duration). - `check await x` -> `let okN = await x; check okN` to dodge chronos's "yield in expr not lowered" with await-as-macro-argument. - `(await x).foo()` -> `let awN = await x; ... awN.foo() ...` for the same reason. waku/persistency/persistency.nim: nph also pulled the proc signatures across multiple lines; restored explicit `Future[void] {.async.}` return types after the colon (an intermediate nph pass had elided them). Suite: 71 / 71 OK against the new async write surface. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * use idiomatic valueOr instead of ifs * Reworked persistency shutdown, remove not necessary teardown mechanism * Use const for DefaultStoragePath * format to follow coding guidelines - no use of result and explicit returns - no functional change --------- Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com> Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com>
2026-05-16 00:09:07 +02:00
brokers/broker_context
import
waku/[
waku_core,
waku_relay,
waku_metadata,
waku_core/topics/sharding,
waku_relay/protocol,
waku_enr/sharding,
waku_enr/capabilities,
events/peer_events,
feat(mix): bump libp2p stack to v2.0.0 + adopt stateless RLN spam protection Combines five dep-and-build changes that all flow from the libp2p v2.0.0 upgrade and the move to the extracted libp2p_mix / mix-rln plugin stack: waku.nimble: * libp2p: ff8d51857 -> c43199378 (release/v2.0.0 tip; sha-pinned until vacp2p cuts a v2.0.0 tag). * Drop the bare `zlib < 0.2` cap — no longer needed by the upgraded libp2p. * websock: bare ">= 0.4.0" — replaces the d4cd68b URL+SHA workaround that pinned through a libp2p commit-specific websock SHA. * nim-json-rpc: switch to chaitanyaprem/nim-json-rpc#f05fad25 — relaxes websock cap to allow >=0.4.0. TODO: revert to status-im/nim-json-rpc once status-im/nim-json-rpc#277 merges and a tag is cut. * lsquic: bare ">= 0.4.1" (drops URL form). * Add mix-rln-spam-protection-plugin pin (23b278b4) and nim-libp2p-mix pin (50c4ab4f — PR #14 HEAD); the plugin pins the same libp2p_mix SHA so the diamond dep collapses to a single source. waku/factory/waku.nim: * Explicit HPService.setup(switch) / AutonatService.setup(switch) calls. libp2p v2.0.0's Service lifecycle refactor (libp2p#2462) removed switch.start's auto-setup loop, so any caller that assigns directly to switch.services (we do) is responsible for calling setup() themselves. Without it, AutonatService.addressMapper stays nil and peerInfo.expandAddrs SIGSEGVs during start(). Wrapped in try/except for ServiceSetupError so a setup failure surfaces as a logged error rather than a crash. Build / scripts: * scripts/build_rln_mix.sh removed and Makefile simplified — librln is now a single shared archive built from zerokit's `stateless` features (no separate librln_mix archive). * simulations/mixnet/build_setup.sh + setup_credentials.nim updated to use librln_v2.0.2.a directly and run RLN keystore setup before nodes start. Validated: * Cold local-cache nimble setup --localdeps -y. * wakunode2 and chat2mix link cleanly. * Mixnet roundtrip sim: [PASS] bob received message from alice. * RLN proof generation + verification on every in-path mix node: 5 gen_called == 5 verified, 0 SPAM_PROOF_* errors.
2026-06-04 16:54:44 +05:30
common/option_shims,
common/nimchronos,
common/enr,
common/callbacks,
common/utils/parse_size_units,
node/health_monitor/online_monitor,
],
feat(mix): bump libp2p stack to v2.0.0 + adopt stateless RLN spam protection Combines five dep-and-build changes that all flow from the libp2p v2.0.0 upgrade and the move to the extracted libp2p_mix / mix-rln plugin stack: waku.nimble: * libp2p: ff8d51857 -> c43199378 (release/v2.0.0 tip; sha-pinned until vacp2p cuts a v2.0.0 tag). * Drop the bare `zlib < 0.2` cap — no longer needed by the upgraded libp2p. * websock: bare ">= 0.4.0" — replaces the d4cd68b URL+SHA workaround that pinned through a libp2p commit-specific websock SHA. * nim-json-rpc: switch to chaitanyaprem/nim-json-rpc#f05fad25 — relaxes websock cap to allow >=0.4.0. TODO: revert to status-im/nim-json-rpc once status-im/nim-json-rpc#277 merges and a tag is cut. * lsquic: bare ">= 0.4.1" (drops URL form). * Add mix-rln-spam-protection-plugin pin (23b278b4) and nim-libp2p-mix pin (50c4ab4f — PR #14 HEAD); the plugin pins the same libp2p_mix SHA so the diamond dep collapses to a single source. waku/factory/waku.nim: * Explicit HPService.setup(switch) / AutonatService.setup(switch) calls. libp2p v2.0.0's Service lifecycle refactor (libp2p#2462) removed switch.start's auto-setup loop, so any caller that assigns directly to switch.services (we do) is responsible for calling setup() themselves. Without it, AutonatService.addressMapper stays nil and peerInfo.expandAddrs SIGSEGVs during start(). Wrapped in try/except for ServiceSetupError so a setup failure surfaces as a logged error rather than a crash. Build / scripts: * scripts/build_rln_mix.sh removed and Makefile simplified — librln is now a single shared archive built from zerokit's `stateless` features (no separate librln_mix archive). * simulations/mixnet/build_setup.sh + setup_credentials.nim updated to use librln_v2.0.2.a directly and run RLN keystore setup before nodes start. Validated: * Cold local-cache nimble setup --localdeps -y. * wakunode2 and chat2mix link cleanly. * Mixnet roundtrip sim: [PASS] bob received message from alice. * RLN proof generation + verification on every in-path mix node: 5 gen_called == 5 verified, 0 SPAM_PROOF_* errors.
2026-06-04 16:54:44 +05:30
../waku_switch,
./peer_store/peer_storage,
./waku_peer_store
export waku_peer_store, peer_storage, peers
declareCounter waku_peers_dials, "Number of peer dials", ["outcome"]
# TODO: Populate from PeerStore.Source when ready
declarePublicCounter waku_node_conns_initiated,
"Number of connections initiated", ["source"]
declarePublicCounter waku_peers_errors, "Number of peer manager errors", ["type"]
declarePublicGauge waku_connected_peers,
"Number of physical connections per direction and protocol",
labels = ["direction", "protocol"]
declarePublicGauge waku_connected_peers_per_shard,
"Number of physical connections per shard", labels = ["shard"]
2025-09-26 03:30:55 +05:30
declarePublicGauge waku_connected_peers_per_agent,
"Number of physical connections per agent", labels = ["agent"]
declarePublicGauge waku_streams_peers,
"Number of streams per direction and protocol", labels = ["direction", "protocol"]
declarePublicGauge waku_peer_store_size, "Number of peers managed by the peer store"
declarePublicGauge waku_service_peers,
"Service peer protocol and multiaddress ", labels = ["protocol", "peerId"]
declarePublicGauge waku_total_unique_peers, "total number of unique peers"
logScope:
topics = "waku node peer_manager"
randomize()
const
# TODO: Make configurable
DefaultDialTimeout* = chronos.seconds(10)
# Max attempts before removing the peer
MaxFailedAttempts = 5
# Time to wait before attempting to dial again is calculated as:
# initialBackoffInSec*(backoffFactor^(failedAttempts-1))
# 120s, 480s, 1920, 7680s
InitialBackoffInSec = 120
BackoffFactor = 4
# Limit the amount of paralel dials
MaxParallelDials = 10
# Delay between consecutive relayConnectivityLoop runs
ConnectivityLoopInterval = chronos.seconds(30)
# How often the peer store is pruned
PrunePeerStoreInterval = chronos.minutes(10)
# How often metrics and logs are shown/updated
2025-09-26 03:30:55 +05:30
LogAndMetricsInterval = chronos.minutes(5)
# Max peers that we allow from the same IP
DefaultColocationLimit* = 5
2025-01-08 18:53:00 +01:00
type ConnectionChangeHandler* = proc(
peerId: PeerId, peerEvent: PeerEventKind
): Future[void] {.gcsafe, raises: [Defect].}
type PeerManager* = ref object of RootObj
brokerCtx: BrokerContext
switch*: Switch
wakuMetadata*: WakuMetadata
initialBackoffInSec*: int
backoffFactor*: int
maxFailedAttempts*: int
storage*: PeerStorage
serviceSlots*: Table[string, RemotePeerInfo]
relayServiceRatio*: string
maxRelayPeers*: int
maxServicePeers*: int
outRelayPeersTarget: int
inRelayPeersTarget: int
ipTable*: Table[string, seq[PeerId]]
colocationLimit*: int
started: bool
shardedPeerManagement: bool # temp feature flag
2025-01-08 18:53:00 +01:00
onConnectionChange*: ConnectionChangeHandler
online: bool ## state managed by online_monitor module
getShards: GetShards
maxConnections: int
activeStoreRequests*: Table[PeerId, int]
#~~~~~~~~~~~~~~~~~~~#
# Helper Functions #
#~~~~~~~~~~~~~~~~~~~#
proc calculateBackoff(
initialBackoffInSec: int, backoffFactor: int, failedAttempts: int
): timer.Duration =
if failedAttempts == 0:
return chronos.seconds(0)
return chronos.seconds(initialBackoffInSec * (backoffFactor ^ (failedAttempts - 1)))
proc protocolMatcher*(codec: string): Matcher =
## Returns a protocol matcher function for the provided codec
proc match(proto: string): bool {.gcsafe.} =
## Matches a proto with any postfix to the provided codec.
## E.g. if the codec is `/vac/waku/filter/2.0.0` it matches the protos:
## `/vac/waku/filter/2.0.0`, `/vac/waku/filter/2.0.0-beta3`, `/vac/waku/filter/2.0.0-actualnonsense`
return proto.startsWith(codec)
return match
#~~~~~~~~~~~~~~~~~~~~~~~~~~#
# Peer Storage Management #
#~~~~~~~~~~~~~~~~~~~~~~~~~~#
proc insertOrReplace(ps: PeerStorage, remotePeerInfo: RemotePeerInfo) {.gcsafe.} =
## Insert peer entry into persistent storage, or replace existing entry with updated info
ps.put(remotePeerInfo).isOkOr:
warn "failed to store peers", err = error
waku_peers_errors.inc(labelValues = ["storage_failure"])
return
proc addPeer*(
pm: PeerManager, remotePeerInfo: RemotePeerInfo, origin = UnknownOrigin
) {.gcsafe.} =
## Adds peer to manager for the specified protocol
if remotePeerInfo.peerId == pm.switch.peerInfo.peerId:
trace "skipping to manage our unmanageable self"
return
pm.switch.peerStore.addPeer(remotePeerInfo, origin)
trace "Adding peer to manager",
peerId = remotePeerInfo.peerId, addresses = remotePeerInfo.addrs, origin
waku_total_unique_peers.inc()
# Add peer to storage. Entry will subsequently be updated with connectedness information
if not pm.storage.isNil:
# Reading from the db (pm.storage) is only done on startup, hence you need to connect to all saved peers.
# `remotePeerInfo.connectedness` should already be `NotConnected`, but both we reset it to `NotConnected` just in case.
# This reset is also done when reading from storage, I believe, to ensure the `connectedness` state is the correct one.
# So many resets are likely redudant, but I haven't verified whether this is the case or not.
remotePeerInfo.connectedness = NotConnected
pm.storage.insertOrReplace(remotePeerInfo)
proc getPeer*(pm: PeerManager, peerId: PeerId): RemotePeerInfo =
return pm.switch.peerStore.getPeer(peerId)
proc addActiveStoreRequest*(pm: PeerManager, peerId: PeerId) {.gcsafe.} =
pm.activeStoreRequests.mgetOrPut(peerId, 0).inc()
proc removeActiveStoreRequest*(pm: PeerManager, peerId: PeerId) {.gcsafe.} =
let count = pm.activeStoreRequests.getOrDefault(peerId, 0)
if count == 0:
return
let newCount = count - 1
if newCount <= 0:
pm.activeStoreRequests.del(peerId)
else:
pm.activeStoreRequests[peerId] = newCount
proc hasActiveStoreRequest*(pm: PeerManager, peerId: PeerId): bool {.gcsafe.} =
pm.activeStoreRequests.contains(peerId)
proc loadFromStorage(pm: PeerManager) {.gcsafe.} =
## Load peers from storage, if available
trace "loading peers from storage"
var amount = 0
proc onData(remotePeerInfo: RemotePeerInfo) =
let peerId = remotePeerInfo.peerId
if pm.switch.peerInfo.peerId == peerId:
# Do not manage self
return
trace "loading peer",
peerId = peerId,
address = remotePeerInfo.addrs,
protocols = remotePeerInfo.protocols,
agent = remotePeerInfo.agent,
version = remotePeerInfo.protoVersion
# nim-libp2p books
pm.switch.peerStore[AddressBook][peerId] = remotePeerInfo.addrs
pm.switch.peerStore[ProtoBook][peerId] = remotePeerInfo.protocols
pm.switch.peerStore[KeyBook][peerId] = remotePeerInfo.publicKey
pm.switch.peerStore[AgentBook][peerId] = remotePeerInfo.agent
pm.switch.peerStore[ProtoVersionBook][peerId] = remotePeerInfo.protoVersion
# custom books
pm.switch.peerStore[ConnectionBook][peerId] = NotConnected
# Reset connectedness state
pm.switch.peerStore[DisconnectBook][peerId] = remotePeerInfo.disconnectTime
pm.switch.peerStore[SourceBook][peerId] = remotePeerInfo.origin
if remotePeerInfo.enr.isSome():
pm.switch.peerStore[ENRBook][peerId] = remotePeerInfo.enr.get()
amount.inc()
pm.storage.getAll(onData).isOkOr:
warn "loading peers from storage failed", err = error
waku_peers_errors.inc(labelValues = ["storage_load_failure"])
return
trace "recovered peers from storage", amount = amount
feat: active filter subscription management for edge nodes (#3773) feat: active filter subscription management for edge nodes ## Subscription Manager * edgeFilterSubLoop reconciles desired vs actual filter subscriptions * edgeFilterHealthLoop pings filter peers, evicts stale ones * EdgeFilterSubState per-shard tracking of confirmed peers and health * best-effort unsubscribe on peer removal * RequestEdgeShardHealth and RequestEdgeFilterPeerCount broker providers ## WakuNode * Remove old edge health loop (loopEdgeHealth, edgeHealthEvent, calculateEdgeTopicHealth) * Register MessageSeenEvent push handler on filter client during start * startDeliveryService now returns `Result[void, string]` and propagates errors ## Health Monitor * getFilterClientHealth queries RequestEdgeFilterPeerCount via broker * Shard/content health providers fall back to RequestEdgeShardHealth when relay inactive * Listen to EventShardTopicHealthChange for health recalculation * Add missing return p.notReady() on failed edge filter peer count request * HealthyThreshold constant moved to `connection_status.nim` ## Broker types * RequestEdgeShardHealth, RequestEdgeFilterPeerCount request types * EventShardTopicHealthChange event type ## Filter Client * Add timeout parameter to ping proc ## Tests * Health monitor event tests with per-node lockNewGlobalBrokerContext * Edge (light client) health update test * Edge health driven by confirmed filter subscriptions test * API subscription tests: sub/receive, failover, peer replacement Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Co-authored by Zoltan Nagy
2026-03-30 08:30:34 -03:00
proc selectPeers*(
pm: PeerManager, proto: string, shard: Option[PubsubTopic] = none(PubsubTopic)
feat: active filter subscription management for edge nodes (#3773) feat: active filter subscription management for edge nodes ## Subscription Manager * edgeFilterSubLoop reconciles desired vs actual filter subscriptions * edgeFilterHealthLoop pings filter peers, evicts stale ones * EdgeFilterSubState per-shard tracking of confirmed peers and health * best-effort unsubscribe on peer removal * RequestEdgeShardHealth and RequestEdgeFilterPeerCount broker providers ## WakuNode * Remove old edge health loop (loopEdgeHealth, edgeHealthEvent, calculateEdgeTopicHealth) * Register MessageSeenEvent push handler on filter client during start * startDeliveryService now returns `Result[void, string]` and propagates errors ## Health Monitor * getFilterClientHealth queries RequestEdgeFilterPeerCount via broker * Shard/content health providers fall back to RequestEdgeShardHealth when relay inactive * Listen to EventShardTopicHealthChange for health recalculation * Add missing return p.notReady() on failed edge filter peer count request * HealthyThreshold constant moved to `connection_status.nim` ## Broker types * RequestEdgeShardHealth, RequestEdgeFilterPeerCount request types * EventShardTopicHealthChange event type ## Filter Client * Add timeout parameter to ping proc ## Tests * Health monitor event tests with per-node lockNewGlobalBrokerContext * Edge (light client) health update test * Edge health driven by confirmed filter subscriptions test * API subscription tests: sub/receive, failover, peer replacement Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Co-authored by Zoltan Nagy
2026-03-30 08:30:34 -03:00
): seq[RemotePeerInfo] =
## Returns all peers that support the given protocol (and optionally shard),
## shuffled randomly. Callers can further filter or pick from this list.
var peers = pm.switch.peerStore.getPeersByProtocol(proto)
feat: active filter subscription management for edge nodes (#3773) feat: active filter subscription management for edge nodes ## Subscription Manager * edgeFilterSubLoop reconciles desired vs actual filter subscriptions * edgeFilterHealthLoop pings filter peers, evicts stale ones * EdgeFilterSubState per-shard tracking of confirmed peers and health * best-effort unsubscribe on peer removal * RequestEdgeShardHealth and RequestEdgeFilterPeerCount broker providers ## WakuNode * Remove old edge health loop (loopEdgeHealth, edgeHealthEvent, calculateEdgeTopicHealth) * Register MessageSeenEvent push handler on filter client during start * startDeliveryService now returns `Result[void, string]` and propagates errors ## Health Monitor * getFilterClientHealth queries RequestEdgeFilterPeerCount via broker * Shard/content health providers fall back to RequestEdgeShardHealth when relay inactive * Listen to EventShardTopicHealthChange for health recalculation * Add missing return p.notReady() on failed edge filter peer count request * HealthyThreshold constant moved to `connection_status.nim` ## Broker types * RequestEdgeShardHealth, RequestEdgeFilterPeerCount request types * EventShardTopicHealthChange event type ## Filter Client * Add timeout parameter to ping proc ## Tests * Health monitor event tests with per-node lockNewGlobalBrokerContext * Edge (light client) health update test * Edge health driven by confirmed filter subscriptions test * API subscription tests: sub/receive, failover, peer replacement Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Co-authored by Zoltan Nagy
2026-03-30 08:30:34 -03:00
trace "Selecting peers from peerstore",
protocol = proto, num_peers = peers.len, address = cast[uint](pm.switch.peerStore)
if shard.isSome():
let shardInfo = RelayShard.parse(shard.get()).valueOr:
trace "Failed to parse shard from pubsub topic", topic = shard.get()
feat: active filter subscription management for edge nodes (#3773) feat: active filter subscription management for edge nodes ## Subscription Manager * edgeFilterSubLoop reconciles desired vs actual filter subscriptions * edgeFilterHealthLoop pings filter peers, evicts stale ones * EdgeFilterSubState per-shard tracking of confirmed peers and health * best-effort unsubscribe on peer removal * RequestEdgeShardHealth and RequestEdgeFilterPeerCount broker providers ## WakuNode * Remove old edge health loop (loopEdgeHealth, edgeHealthEvent, calculateEdgeTopicHealth) * Register MessageSeenEvent push handler on filter client during start * startDeliveryService now returns `Result[void, string]` and propagates errors ## Health Monitor * getFilterClientHealth queries RequestEdgeFilterPeerCount via broker * Shard/content health providers fall back to RequestEdgeShardHealth when relay inactive * Listen to EventShardTopicHealthChange for health recalculation * Add missing return p.notReady() on failed edge filter peer count request * HealthyThreshold constant moved to `connection_status.nim` ## Broker types * RequestEdgeShardHealth, RequestEdgeFilterPeerCount request types * EventShardTopicHealthChange event type ## Filter Client * Add timeout parameter to ping proc ## Tests * Health monitor event tests with per-node lockNewGlobalBrokerContext * Edge (light client) health update test * Edge health driven by confirmed filter subscriptions test * API subscription tests: sub/receive, failover, peer replacement Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Co-authored by Zoltan Nagy
2026-03-30 08:30:34 -03:00
return @[]
peers.keepItIf(
(it.enr.isSome() and it.enr.get().containsShard(shard.get())) or
feat: active filter subscription management for edge nodes (#3773) feat: active filter subscription management for edge nodes ## Subscription Manager * edgeFilterSubLoop reconciles desired vs actual filter subscriptions * edgeFilterHealthLoop pings filter peers, evicts stale ones * EdgeFilterSubState per-shard tracking of confirmed peers and health * best-effort unsubscribe on peer removal * RequestEdgeShardHealth and RequestEdgeFilterPeerCount broker providers ## WakuNode * Remove old edge health loop (loopEdgeHealth, edgeHealthEvent, calculateEdgeTopicHealth) * Register MessageSeenEvent push handler on filter client during start * startDeliveryService now returns `Result[void, string]` and propagates errors ## Health Monitor * getFilterClientHealth queries RequestEdgeFilterPeerCount via broker * Shard/content health providers fall back to RequestEdgeShardHealth when relay inactive * Listen to EventShardTopicHealthChange for health recalculation * Add missing return p.notReady() on failed edge filter peer count request * HealthyThreshold constant moved to `connection_status.nim` ## Broker types * RequestEdgeShardHealth, RequestEdgeFilterPeerCount request types * EventShardTopicHealthChange event type ## Filter Client * Add timeout parameter to ping proc ## Tests * Health monitor event tests with per-node lockNewGlobalBrokerContext * Edge (light client) health update test * Edge health driven by confirmed filter subscriptions test * API subscription tests: sub/receive, failover, peer replacement Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Co-authored by Zoltan Nagy
2026-03-30 08:30:34 -03:00
(it.shards.len > 0 and it.shards.contains(shardInfo.shardId))
)
shuffle(peers)
feat: active filter subscription management for edge nodes (#3773) feat: active filter subscription management for edge nodes ## Subscription Manager * edgeFilterSubLoop reconciles desired vs actual filter subscriptions * edgeFilterHealthLoop pings filter peers, evicts stale ones * EdgeFilterSubState per-shard tracking of confirmed peers and health * best-effort unsubscribe on peer removal * RequestEdgeShardHealth and RequestEdgeFilterPeerCount broker providers ## WakuNode * Remove old edge health loop (loopEdgeHealth, edgeHealthEvent, calculateEdgeTopicHealth) * Register MessageSeenEvent push handler on filter client during start * startDeliveryService now returns `Result[void, string]` and propagates errors ## Health Monitor * getFilterClientHealth queries RequestEdgeFilterPeerCount via broker * Shard/content health providers fall back to RequestEdgeShardHealth when relay inactive * Listen to EventShardTopicHealthChange for health recalculation * Add missing return p.notReady() on failed edge filter peer count request * HealthyThreshold constant moved to `connection_status.nim` ## Broker types * RequestEdgeShardHealth, RequestEdgeFilterPeerCount request types * EventShardTopicHealthChange event type ## Filter Client * Add timeout parameter to ping proc ## Tests * Health monitor event tests with per-node lockNewGlobalBrokerContext * Edge (light client) health update test * Edge health driven by confirmed filter subscriptions test * API subscription tests: sub/receive, failover, peer replacement Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Co-authored by Zoltan Nagy
2026-03-30 08:30:34 -03:00
return peers
proc selectPeer*(
pm: PeerManager, proto: string, shard: Option[PubsubTopic] = none(PubsubTopic)
): Option[RemotePeerInfo] =
## Selects a single peer for a given protocol, checking service slots first
## (for non-relay protocols).
let peers = pm.selectPeers(proto, shard)
# No criteria for selecting a peer for WakuRelay, random one
if proto == WakuRelayCodec:
# TODO: proper heuristic here that compares peer scores and selects "best" one. For now the first peer for the given protocol is returned
if peers.len > 0:
trace "Got peer from peerstore",
peerId = peers[0].peerId, multi = peers[0].addrs[0], protocol = proto
return some(peers[0])
trace "No peer found for protocol", protocol = proto
return none(RemotePeerInfo)
# For other protocols, we select the peer that is slotted for the given protocol
pm.serviceSlots.withValue(proto, serviceSlot):
trace "Got peer from service slots",
peerId = serviceSlot[].peerId, multi = serviceSlot[].addrs[0], protocol = proto
return some(serviceSlot[])
# If not slotted, we select a random peer for the given protocol
if peers.len > 0:
trace "Got peer from peerstore",
peerId = peers[0].peerId, multi = peers[0].addrs[0], protocol = proto
return some(peers[0])
trace "No peer found for protocol", protocol = proto
return none(RemotePeerInfo)
# Adds a peer to the service slots, which is a list of peers that are slotted for a given protocol
proc addServicePeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string) =
# Do not add relay peers
if proto == WakuRelayCodec:
warn "Can't add relay peer to service peers slots"
return
# Check if the number of service peers has reached the maximum limit
if pm.serviceSlots.len >= pm.maxServicePeers:
warn "Maximum number of service peers reached. Cannot add more.",
peerId = remotePeerInfo.peerId, service = proto
return
info "Adding peer to service slots",
peerId = remotePeerInfo.peerId, addr = remotePeerInfo.addrs[0], service = proto
waku_service_peers.set(1, labelValues = [$proto, $remotePeerInfo.addrs[0]])
# Set peer for service slot
pm.serviceSlots[proto] = remotePeerInfo
pm.addPeer(remotePeerInfo)
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~#
# Connection Lifecycle Management #
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~#
# require pre-connection
proc pruneInRelayConns(pm: PeerManager, amount: int) {.async.}
# Connects to a given node. Note that this function uses `connect` and
# does not provide a protocol. Streams for relay (gossipsub) are created
# automatically without the needing to dial.
proc connectPeer*(
pm: PeerManager,
peer: RemotePeerInfo,
dialTimeout = DefaultDialTimeout,
source = "api",
): Future[bool] {.async.} =
let peerId = peer.peerId
var peerStore = pm.switch.peerStore
# Do not attempt to dial self
if peerId == pm.switch.peerInfo.peerId:
return false
if not peerStore.peerExists(peerId):
pm.addPeer(peer)
let failedAttempts = peerStore[NumberFailedConnBook][peerId]
trace "Connecting to peer",
wireAddr = peer.addrs, peerId = peerId, failedAttempts = failedAttempts
var deadline = sleepAsync(dialTimeout)
let workfut = pm.switch.connect(peerId, peer.addrs)
# Can't use catch: with .withTimeout() in this case
let res = catch:
await workfut or deadline
let reasonFailed =
if not workfut.finished():
await workfut.cancelAndWait()
"timed out"
elif res.isErr():
res.error.msg
else:
if not deadline.finished():
await deadline.cancelAndWait()
waku_peers_dials.inc(labelValues = ["successful"])
waku_node_conns_initiated.inc(labelValues = [source])
peerStore[NumberFailedConnBook][peerId] = 0
return true
# Dial failed
peerStore[NumberFailedConnBook][peerId] = peerStore[NumberFailedConnBook][peerId] + 1
peerStore[LastFailedConnBook][peerId] = Moment.init(getTime().toUnix, Second)
peerStore[ConnectionBook][peerId] = CannotConnect
trace "Connecting peer failed",
peerId = peerId,
reason = reasonFailed,
failedAttempts = peerStore[NumberFailedConnBook][peerId]
waku_peers_dials.inc(labelValues = [reasonFailed])
return false
proc connectToNodes*(
pm: PeerManager,
nodes: seq[string] | seq[RemotePeerInfo],
dialTimeout = DefaultDialTimeout,
source = "api",
) {.async.} =
if nodes.len == 0:
return
info "Dialing multiple peers", numOfPeers = nodes.len, nodes = $nodes
var futConns: seq[Future[bool]]
var connectedPeers: seq[RemotePeerInfo]
for node in nodes:
let node = parsePeerInfo(node)
if node.isOk():
futConns.add(pm.connectPeer(node.value))
connectedPeers.add(node.value)
else:
error "Couldn't parse node info", error = node.error
await allFutures(futConns)
# Filtering successful connectedPeers based on futConns
let combined = zip(connectedPeers, futConns)
connectedPeers = combined.filterIt(it[1].read() == true).mapIt(it[0])
when defined(debugDiscv5):
let peerIds = connectedPeers.mapIt(it.peerId)
let origin = connectedPeers.mapIt(it.origin)
if peerIds.len > 0:
notice "established connections with found peers",
peerIds = peerIds.mapIt(shortLog(it)), origin = origin
else:
notice "could not connect to new peers", attempted = nodes.len
info "Finished dialing multiple peers",
successfulConns = connectedPeers.len, attempted = nodes.len
proc disconnectNode*(pm: PeerManager, peerId: PeerId) {.async.} =
await pm.switch.disconnect(peerId)
proc disconnectNode*(pm: PeerManager, peer: RemotePeerInfo) {.async.} =
let peerId = peer.peerId
await pm.disconnectNode(peerId)
# Dialing should be used for just protocols that require a stream to write and read
# This shall not be used to dial Relay protocols, since that would create
# unneccesary unused streams.
proc dialPeer(
pm: PeerManager,
peerId: PeerID,
addrs: seq[MultiAddress],
proto: string,
dialTimeout = DefaultDialTimeout,
source = "api",
): Future[Option[Connection]] {.async.} =
if peerId == pm.switch.peerInfo.peerId:
error "could not dial self"
return none(Connection)
if proto == WakuRelayCodec:
error "dial shall not be used to connect to relays"
return none(Connection)
trace "Dialing peer", wireAddr = addrs, peerId = peerId, proto = proto
# Dial Peer
let dialFut = pm.switch.dial(peerId, addrs, proto)
let res = catch:
if await dialFut.withTimeout(dialTimeout):
return some(dialFut.read())
else:
await cancelAndWait(dialFut)
let reasonFailed = if res.isOk: "timed out" else: res.error.msg
trace "Dialing peer failed", peerId = peerId, reason = reasonFailed, proto = proto
return none(Connection)
proc dialPeer*(
pm: PeerManager,
remotePeerInfo: RemotePeerInfo,
proto: string,
dialTimeout = DefaultDialTimeout,
source = "api",
): Future[Option[Connection]] {.async.} =
# Dial a given peer and add it to the list of known peers
# TODO: check peer validity and score before continuing. Limit number of peers to be managed.
# First add dialed peer info to peer store, if it does not exist yet..
# TODO: nim libp2p peerstore already adds them
if not pm.switch.peerStore.hasPeer(remotePeerInfo.peerId, proto):
trace "Adding newly dialed peer to manager",
peerId = $remotePeerInfo.peerId, address = $remotePeerInfo.addrs[0], proto = proto
pm.addPeer(remotePeerInfo)
return await pm.dialPeer(
remotePeerInfo.peerId, remotePeerInfo.addrs, proto, dialTimeout, source
)
proc dialPeer*(
pm: PeerManager,
peerId: PeerID,
proto: string,
dialTimeout = DefaultDialTimeout,
source = "api",
): Future[Option[Connection]] {.async.} =
# Dial an existing peer by looking up it's existing addrs in the switch's peerStore
# TODO: check peer validity and score before continuing. Limit number of peers to be managed.
let addrs = pm.switch.peerStore[AddressBook][peerId]
return await pm.dialPeer(peerId, addrs, proto, dialTimeout, source)
proc canBeConnected*(pm: PeerManager, peerId: PeerId): bool =
# Returns if we can try to connect to this peer, based on past failed attempts
# It uses an exponential backoff. Each connection attempt makes us
# wait more before trying again.
let peerStore = pm.switch.peerStore
let failedAttempts = peerStore[NumberFailedConnBook][peerId]
# if it never errored, we can try to connect
if failedAttempts == 0:
return true
# if there are too many failed attempts, do not reconnect
if failedAttempts >= pm.maxFailedAttempts:
return false
# If it errored we wait an exponential backoff from last connection
# the more failed attempts, the greater the backoff since last attempt
let now = Moment.init(getTime().toUnix, Second)
let lastFailed = peerStore[LastFailedConnBook][peerId]
let backoff =
calculateBackoff(pm.initialBackoffInSec, pm.backoffFactor, failedAttempts)
return now >= (lastFailed + backoff)
proc connectedPeers*(
pm: PeerManager, protocol: string = ""
): (seq[PeerId], seq[PeerId]) =
## Returns the PeerIds of peers with an active socket connection.
## If a protocol is specified, it returns peers that currently have one
## or more active logical streams for that protocol.
var inPeers: seq[PeerId]
var outPeers: seq[PeerId]
for peerId, muxers in pm.switch.connManager.getConnections():
for peerConn in muxers:
let streams = peerConn.getStreams()
if protocol.len == 0 or streams.anyIt(it.protocol == protocol):
if peerConn.connection.transportDir == Direction.In:
inPeers.add(peerId)
elif peerConn.connection.transportDir == Direction.Out:
outPeers.add(peerId)
return (inPeers, outPeers)
proc evictPeer*(pm: PeerManager, peerId: PeerId) {.async.} =
## Policy-based eviction (relay-peer limit, IP colocation, pruning).
## Skips the disconnect when the peer has an in-flight store request to
## avoid aborting active store requests.
if pm.hasActiveStoreRequest(peerId):
trace "skipping peer eviction: active store request", peerId = peerId
return
await pm.switch.disconnect(peerId)
proc capablePeers*(pm: PeerManager, protocol: string): (seq[PeerId], seq[PeerId]) =
## Returns the PeerIds of peers with an active socket connection.
## If a protocol is specified, it returns peers that have identified
## themselves as supporting the protocol.
var inPeers: seq[PeerId]
var outPeers: seq[PeerId]
for peerId, muxers in pm.switch.connManager.getConnections():
# filter out peers that don't have the capability registered in the peer store
if pm.switch.peerStore.hasPeer(peerId, protocol):
for peerConn in muxers:
if peerConn.connection.transportDir == Direction.In:
inPeers.add(peerId)
elif peerConn.connection.transportDir == Direction.Out:
outPeers.add(peerId)
return (inPeers, outPeers)
proc getConnectedPeersCount*(pm: PeerManager, protocol: string): int =
## Returns the total number of unique connected peers (inbound + outbound)
## with active streams for a specific protocol.
let (inPeers, outPeers) = pm.connectedPeers(protocol)
var peers = initHashSet[PeerId](nextPowerOfTwo(inPeers.len + outPeers.len))
for p in inPeers:
peers.incl(p)
for p in outPeers:
peers.incl(p)
return peers.len
proc getCapablePeersCount*(pm: PeerManager, protocol: string): int =
## Returns the total number of unique connected peers (inbound + outbound)
## who have identified themselves as supporting the given protocol.
let (inPeers, outPeers) = pm.capablePeers(protocol)
var peers = initHashSet[PeerId](nextPowerOfTwo(inPeers.len + outPeers.len))
for p in inPeers:
peers.incl(p)
for p in outPeers:
peers.incl(p)
return peers.len
proc getPeersForShard*(pm: PeerManager, protocolId: string, shard: PubsubTopic): int =
let (inPeers, outPeers) = pm.connectedPeers(protocolId)
let connectedProtocolPeers = inPeers & outPeers
if connectedProtocolPeers.len == 0:
return 0
let shardInfo = RelayShard.parse(shard).valueOr:
# count raw peers of the given protocol if for some reason we can't get
# a shard mapping out of the gossipsub topic string.
return connectedProtocolPeers.len
var shardPeers = 0
for peerId in connectedProtocolPeers:
if pm.switch.peerStore.hasShard(peerId, shardInfo.clusterId, shardInfo.shardId):
shardPeers.inc()
return shardPeers
2025-06-27 11:16:00 +02:00
proc disconnectAllPeers*(pm: PeerManager) {.async.} =
let (inPeerIds, outPeerIds) = pm.connectedPeers()
let connectedPeers = concat(inPeerIds, outPeerIds)
let futs = connectedPeers.mapIt(pm.disconnectNode(it))
await allFutures(futs)
proc getStreamByPeerIdAndProtocol*(
pm: PeerManager, peerId: PeerId, protocol: string
): Future[Result[Connection, string]] {.async.} =
## Establishes a new stream to the given peer and protocol or returns the existing stream, if any.
## Notice that the "Connection" type represents a stream within a transport connection
## (we will need to adapt this term.)
let peerIdsMuxers: Table[PeerId, seq[Muxer]] = pm.switch.connManager.getConnections()
if not peerIdsMuxers.contains(peerId):
return err("peerId not found in connManager: " & $peerId)
let muxers = peerIdsMuxers[peerId]
var streams = newSeq[Connection](0)
for m in muxers:
for s in m.getStreams():
## getStreams is defined in nim-libp2p
streams.add(s)
## Try to get the opened streams for the given protocol
let streamsOfInterest = streams.filterIt(
it.protocol == protocol and not LPStream(it).isClosed and
not LPStream(it).isClosedRemotely
)
if streamsOfInterest.len > 0:
## In theory there should be one stream per protocol. Then we just pick up the 1st
return ok(streamsOfInterest[0])
## There isn't still a stream. Let's dial to create one
let streamRes = await pm.dialPeer(peerId, protocol)
if streamRes.isNone():
return err("getStreamByPeerIdProto no connection to peer: " & $peerId)
return ok(streamRes.get())
proc connectToRelayPeers*(pm: PeerManager) {.async.} =
# only attempt if current node is online
if not pm.online:
error "connectToRelayPeers: won't attempt new connections - node is offline"
return
var (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec)
let totalRelayPeers = inRelayPeers.len + outRelayPeers.len
if inRelayPeers.len > pm.inRelayPeersTarget:
await pm.pruneInRelayConns(inRelayPeers.len - pm.inRelayPeersTarget)
if outRelayPeers.len >= pm.outRelayPeersTarget:
return
let notConnectedPeers = pm.switch.peerStore.getDisconnectedPeers()
var outsideBackoffPeers = notConnectedPeers.filterIt(pm.canBeConnected(it.peerId))
shuffle(outsideBackoffPeers)
var index = 0
var numPendingConnReqs =
min(outsideBackoffPeers.len, pm.outRelayPeersTarget - outRelayPeers.len)
## number of outstanding connection requests
while numPendingConnReqs > 0 and outRelayPeers.len < pm.outRelayPeersTarget:
let numPeersToConnect = min(numPendingConnReqs, MaxParallelDials)
await pm.connectToNodes(outsideBackoffPeers[index ..< (index + numPeersToConnect)])
(inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec)
index += numPeersToConnect
numPendingConnReqs -= numPeersToConnect
proc reconnectPeers*(
pm: PeerManager, proto: string, backoffTime: chronos.Duration = chronos.seconds(0)
) {.async.} =
## Reconnect to peers registered for this protocol. This will update connectedness.
## Especially useful to resume connections from persistent storage after a restart.
info "Reconnecting peers", proto = proto
# Proto is not persisted, we need to iterate over all peers.
for peerInfo in pm.switch.peerStore.peers(protocolMatcher(proto)):
# Check that the peer can be connected
if peerInfo.connectedness == CannotConnect:
error "Not reconnecting to unreachable or non-existing peer",
peerId = peerInfo.peerId
continue
if backoffTime > ZeroDuration:
info "Backing off before reconnect",
peerId = peerInfo.peerId, backoffTime = backoffTime
# We disconnected recently and still need to wait for a backoff period before connecting
await sleepAsync(backoffTime)
await pm.connectToNodes(@[peerInfo])
proc getNumStreams*(pm: PeerManager, protocol: string): (int, int) =
var
numStreamsIn = 0
numStreamsOut = 0
for peerId, muxers in pm.switch.connManager.getConnections():
for peerConn in muxers:
for stream in peerConn.getStreams():
if stream.protocol == protocol:
if stream.dir == Direction.In:
numStreamsIn += 1
elif stream.dir == Direction.Out:
numStreamsOut += 1
return (numStreamsIn, numStreamsOut)
proc getPeerIp(pm: PeerManager, peerId: PeerId): Option[string] =
if not pm.switch.connManager.getConnections().hasKey(peerId):
return none(string)
let conns = pm.switch.connManager.getConnections().getOrDefault(peerId)
if conns.len == 0:
return none(string)
let obAddr = conns[0].connection.observedAddr.valueOr:
return none(string)
# TODO: think if circuit relay ips should be handled differently
return some(obAddr.getHostname())
#~~~~~~~~~~~~~~~~~#
# Event Handling #
#~~~~~~~~~~~~~~~~~#
proc refreshPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} =
let res = catch:
await pm.switch.dial(peerId, WakuMetadataCodec)
var reason: string
block guardClauses:
let conn = res.valueOr:
reason = "dial failed: " & error.msg
break guardClauses
let metadata = (await pm.wakuMetadata.request(conn)).valueOr:
reason = "waku metatdata request failed: " & error
break guardClauses
let clusterId = metadata.clusterId.valueOr:
reason = "empty cluster-id reported"
break guardClauses
if pm.wakuMetadata.clusterId != clusterId:
reason =
"different clusterId reported: " & $pm.wakuMetadata.clusterId & " vs " &
$clusterId
break guardClauses
# Store the shard information from metadata in the peer store
if pm.switch.peerStore.peerExists(peerId):
let shards = metadata.shards.mapIt(it.uint16)
pm.switch.peerStore.setShardInfo(peerId, shards)
# TODO: should only trigger an event if metadata actually changed
# should include the shard subscription delta in the event when
# it is a MetadataUpdated event
feat: active filter subscription management for edge nodes (#3773) feat: active filter subscription management for edge nodes ## Subscription Manager * edgeFilterSubLoop reconciles desired vs actual filter subscriptions * edgeFilterHealthLoop pings filter peers, evicts stale ones * EdgeFilterSubState per-shard tracking of confirmed peers and health * best-effort unsubscribe on peer removal * RequestEdgeShardHealth and RequestEdgeFilterPeerCount broker providers ## WakuNode * Remove old edge health loop (loopEdgeHealth, edgeHealthEvent, calculateEdgeTopicHealth) * Register MessageSeenEvent push handler on filter client during start * startDeliveryService now returns `Result[void, string]` and propagates errors ## Health Monitor * getFilterClientHealth queries RequestEdgeFilterPeerCount via broker * Shard/content health providers fall back to RequestEdgeShardHealth when relay inactive * Listen to EventShardTopicHealthChange for health recalculation * Add missing return p.notReady() on failed edge filter peer count request * HealthyThreshold constant moved to `connection_status.nim` ## Broker types * RequestEdgeShardHealth, RequestEdgeFilterPeerCount request types * EventShardTopicHealthChange event type ## Filter Client * Add timeout parameter to ping proc ## Tests * Health monitor event tests with per-node lockNewGlobalBrokerContext * Edge (light client) health update test * Edge health driven by confirmed filter subscriptions test * API subscription tests: sub/receive, failover, peer replacement Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Co-authored by Zoltan Nagy
2026-03-30 08:30:34 -03:00
WakuPeerEvent.emit(pm.brokerCtx, peerId, WakuPeerEventKind.EventMetadataUpdated)
return
info "disconnecting from peer", peerId = peerId, reason = reason
asyncSpawn(pm.switch.disconnect(peerId))
pm.switch.peerStore.delete(peerId)
# called when a peer i) first connects to us ii) disconnects all connections from us
proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =
if not pm.wakuMetadata.isNil() and event.kind == PeerEventKind.Joined:
await pm.refreshPeerMetadata(peerId)
var peerStore = pm.switch.peerStore
var direction: PeerDirection
var connectedness: Connectedness
case event.kind
of PeerEventKind.Joined:
direction = if event.initiator: Outbound else: Inbound
connectedness = Connected
## Check max allowed in-relay peers
let inRelayPeers = pm.connectedPeers(WakuRelayCodec)[0]
if inRelayPeers.len > pm.inRelayPeersTarget and
peerStore.hasPeer(peerId, WakuRelayCodec):
info "relay peer limit reached, evicting peer",
peerId = peerId,
inRelayPeers = inRelayPeers.len,
inRelayPeersTarget = pm.inRelayPeersTarget
await pm.evictPeer(peerId)
## Apply max ip colocation limit
if (let ip = pm.getPeerIp(peerId); ip.isSome()):
pm.ipTable.mgetOrPut(ip.get, newSeq[PeerId]()).add(peerId)
# in theory this should always be one, but just in case
let peersBehindIp = pm.ipTable[ip.get]
# pm.colocationLimit == 0 disables the ip colocation limit
if pm.colocationLimit != 0 and peersBehindIp.len > pm.colocationLimit:
for peerId in peersBehindIp[0 ..< (peersBehindIp.len - pm.colocationLimit)]:
info "Pruning connection due to ip colocation", peerId = peerId, ip = ip
asyncSpawn(pm.evictPeer(peerId))
peerStore.delete(peerId)
feat: active filter subscription management for edge nodes (#3773) feat: active filter subscription management for edge nodes ## Subscription Manager * edgeFilterSubLoop reconciles desired vs actual filter subscriptions * edgeFilterHealthLoop pings filter peers, evicts stale ones * EdgeFilterSubState per-shard tracking of confirmed peers and health * best-effort unsubscribe on peer removal * RequestEdgeShardHealth and RequestEdgeFilterPeerCount broker providers ## WakuNode * Remove old edge health loop (loopEdgeHealth, edgeHealthEvent, calculateEdgeTopicHealth) * Register MessageSeenEvent push handler on filter client during start * startDeliveryService now returns `Result[void, string]` and propagates errors ## Health Monitor * getFilterClientHealth queries RequestEdgeFilterPeerCount via broker * Shard/content health providers fall back to RequestEdgeShardHealth when relay inactive * Listen to EventShardTopicHealthChange for health recalculation * Add missing return p.notReady() on failed edge filter peer count request * HealthyThreshold constant moved to `connection_status.nim` ## Broker types * RequestEdgeShardHealth, RequestEdgeFilterPeerCount request types * EventShardTopicHealthChange event type ## Filter Client * Add timeout parameter to ping proc ## Tests * Health monitor event tests with per-node lockNewGlobalBrokerContext * Edge (light client) health update test * Edge health driven by confirmed filter subscriptions test * API subscription tests: sub/receive, failover, peer replacement Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Co-authored by Zoltan Nagy
2026-03-30 08:30:34 -03:00
WakuPeerEvent.emit(pm.brokerCtx, peerId, WakuPeerEventKind.EventConnected)
2025-01-08 18:53:00 +01:00
if not pm.onConnectionChange.isNil():
# we don't want to await for the callback to finish
asyncSpawn pm.onConnectionChange(peerId, Joined)
of PeerEventKind.Left:
direction = UnknownDirection
connectedness = CanConnect
# note we cant access the peerId ip here as the connection was already closed
for ip, peerIds in pm.ipTable.pairs:
if peerIds.contains(peerId):
pm.ipTable[ip] = pm.ipTable[ip].filterIt(it != peerId)
if pm.ipTable[ip].len == 0:
pm.ipTable.del(ip)
break
feat: active filter subscription management for edge nodes (#3773) feat: active filter subscription management for edge nodes ## Subscription Manager * edgeFilterSubLoop reconciles desired vs actual filter subscriptions * edgeFilterHealthLoop pings filter peers, evicts stale ones * EdgeFilterSubState per-shard tracking of confirmed peers and health * best-effort unsubscribe on peer removal * RequestEdgeShardHealth and RequestEdgeFilterPeerCount broker providers ## WakuNode * Remove old edge health loop (loopEdgeHealth, edgeHealthEvent, calculateEdgeTopicHealth) * Register MessageSeenEvent push handler on filter client during start * startDeliveryService now returns `Result[void, string]` and propagates errors ## Health Monitor * getFilterClientHealth queries RequestEdgeFilterPeerCount via broker * Shard/content health providers fall back to RequestEdgeShardHealth when relay inactive * Listen to EventShardTopicHealthChange for health recalculation * Add missing return p.notReady() on failed edge filter peer count request * HealthyThreshold constant moved to `connection_status.nim` ## Broker types * RequestEdgeShardHealth, RequestEdgeFilterPeerCount request types * EventShardTopicHealthChange event type ## Filter Client * Add timeout parameter to ping proc ## Tests * Health monitor event tests with per-node lockNewGlobalBrokerContext * Edge (light client) health update test * Edge health driven by confirmed filter subscriptions test * API subscription tests: sub/receive, failover, peer replacement Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Co-authored by Zoltan Nagy
2026-03-30 08:30:34 -03:00
WakuPeerEvent.emit(pm.brokerCtx, peerId, WakuPeerEventKind.EventDisconnected)
2025-01-08 18:53:00 +01:00
if not pm.onConnectionChange.isNil():
# we don't want to await for the callback to finish
asyncSpawn pm.onConnectionChange(peerId, Left)
of PeerEventKind.Identified:
info "event identified", peerId = peerId
feat: active filter subscription management for edge nodes (#3773) feat: active filter subscription management for edge nodes ## Subscription Manager * edgeFilterSubLoop reconciles desired vs actual filter subscriptions * edgeFilterHealthLoop pings filter peers, evicts stale ones * EdgeFilterSubState per-shard tracking of confirmed peers and health * best-effort unsubscribe on peer removal * RequestEdgeShardHealth and RequestEdgeFilterPeerCount broker providers ## WakuNode * Remove old edge health loop (loopEdgeHealth, edgeHealthEvent, calculateEdgeTopicHealth) * Register MessageSeenEvent push handler on filter client during start * startDeliveryService now returns `Result[void, string]` and propagates errors ## Health Monitor * getFilterClientHealth queries RequestEdgeFilterPeerCount via broker * Shard/content health providers fall back to RequestEdgeShardHealth when relay inactive * Listen to EventShardTopicHealthChange for health recalculation * Add missing return p.notReady() on failed edge filter peer count request * HealthyThreshold constant moved to `connection_status.nim` ## Broker types * RequestEdgeShardHealth, RequestEdgeFilterPeerCount request types * EventShardTopicHealthChange event type ## Filter Client * Add timeout parameter to ping proc ## Tests * Health monitor event tests with per-node lockNewGlobalBrokerContext * Edge (light client) health update test * Edge health driven by confirmed filter subscriptions test * API subscription tests: sub/receive, failover, peer replacement Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Co-authored by Zoltan Nagy
2026-03-30 08:30:34 -03:00
WakuPeerEvent.emit(pm.brokerCtx, peerId, WakuPeerEventKind.EventIdentified)
peerStore[ConnectionBook][peerId] = connectedness
peerStore[DirectionBook][peerId] = direction
if not pm.storage.isNil:
var remotePeerInfo = peerStore.getPeer(peerId)
if event.kind == PeerEventKind.Left:
remotePeerInfo.disconnectTime = getTime().toUnix
pm.storage.insertOrReplace(remotePeerInfo)
#~~~~~~~~~~~~~~~~~#
# Metrics Logging #
#~~~~~~~~~~~~~~~~~#
proc logAndMetrics(pm: PeerManager) {.async.} =
heartbeat "Scheduling log and metrics run", LogAndMetricsInterval:
var peerStore = pm.switch.peerStore
# log metrics
let (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec)
let notConnectedPeers =
peerStore.getDisconnectedPeers().mapIt(RemotePeerInfo.init(it.peerId, it.addrs))
let outsideBackoffPeers = notConnectedPeers.filterIt(pm.canBeConnected(it.peerId))
2025-09-26 03:30:55 +05:30
let connections = pm.switch.connManager.getConnections()
let totalConnections = connections.len
info "Relay peer connections",
inRelayConns = $inRelayPeers.len & "/" & $pm.inRelayPeersTarget,
outRelayConns = $outRelayPeers.len & "/" & $pm.outRelayPeersTarget,
totalConnections = $totalConnections & "/" & $pm.maxConnections,
notConnectedPeers = notConnectedPeers.len,
outsideBackoffPeers = outsideBackoffPeers.len
# update prometheus metrics
for proto in peerStore.getWakuProtos():
let (protoConnsIn, protoConnsOut) = pm.connectedPeers(proto)
let (protoStreamsIn, protoStreamsOut) = pm.getNumStreams(proto)
waku_connected_peers.set(
protoConnsIn.len.float64, labelValues = [$Direction.In, proto]
)
waku_connected_peers.set(
protoConnsOut.len.float64, labelValues = [$Direction.Out, proto]
)
waku_streams_peers.set(
protoStreamsIn.float64, labelValues = [$Direction.In, proto]
)
waku_streams_peers.set(
protoStreamsOut.float64, labelValues = [$Direction.Out, proto]
)
2025-09-26 03:30:55 +05:30
var agentCounts = initTable[string, int]()
var connectedPeerIds: HashSet[PeerId]
for peerId, muxers in connections:
connectedPeerIds.incl(peerId)
if peerStore[AgentBook].contains(peerId):
let agent = peerStore[AgentBook][peerId]
agentCounts[agent] = agentCounts.getOrDefault(agent, 0) + 1
for agent, count in agentCounts:
waku_connected_peers_per_agent.set(count.float64, labelValues = [$agent])
for shard in pm.getShards().items:
2025-09-26 03:30:55 +05:30
# peers known for this shard
let shardPeers =
peerStore.getPeersByShard(uint16(pm.wakuMetadata.clusterId), shard)
2025-09-26 03:30:55 +05:30
# keep only those that are physically connected right now
let connectedInShard = shardPeers.filterIt(connectedPeerIds.contains(it.peerId))
waku_connected_peers_per_shard.set(
2025-09-26 03:30:55 +05:30
connectedInShard.len.float64, labelValues = [$shard]
)
proc getOnlineStateObserver*(pm: PeerManager): OnOnlineStateChange =
return proc(online: bool) {.gcsafe, raises: [].} =
pm.online = online
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~#
# Pruning and Maintenance (Stale Peers Management) #
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~#
proc manageRelayPeers*(pm: PeerManager) {.async.} =
let shardsCount = pm.getShards().len
#TODO: this check should not be based on whether shards are present, but rather if relay is mounted
if shardsCount == 0:
return
if not pm.online:
error "manageRelayPeers: won't attempt new connections - node is offline"
return
var peersToConnect: HashSet[PeerId] # Can't use RemotePeerInfo as they are ref objects
var peersToDisconnect: int
# Get all connected peers for Waku Relay
var (inPeers, outPeers) = pm.connectedPeers(WakuRelayCodec)
# Calculate in/out target number of peers for each shards
let inTarget = pm.inRelayPeersTarget div shardsCount
let outTarget = pm.outRelayPeersTarget div shardsCount
var peerStore = pm.switch.peerStore
for shard in pm.getShards().items:
# Filter out peer not on this shard
let connectedInPeers =
inPeers.filterIt(peerStore.hasShard(it, uint16(pm.wakuMetadata.clusterId), shard))
let connectedOutPeers = outPeers.filterIt(
peerStore.hasShard(it, uint16(pm.wakuMetadata.clusterId), shard)
)
# Calculate the difference between current values and targets
let inPeerDiff = connectedInPeers.len - inTarget
let outPeerDiff = outTarget - connectedOutPeers.len
if inPeerDiff > 0:
peersToDisconnect += inPeerDiff
if outPeerDiff <= 0:
continue
# Get all peers for this shard
var connectablePeers =
peerStore.getPeersByShard(uint16(pm.wakuMetadata.clusterId), uint16(shard))
let shardCount = connectablePeers.len
connectablePeers.keepItIf(
not peerStore.isConnected(it.peerId) and pm.canBeConnected(it.peerId)
)
let connectableCount = connectablePeers.len
connectablePeers.keepItIf(peerStore.hasCapability(it.peerId, Relay))
let relayCount = connectablePeers.len
info "Sharded Peer Management",
shard = shard,
connectable = $connectableCount & "/" & $shardCount,
relayConnectable = $relayCount & "/" & $shardCount,
relayInboundTarget = $connectedInPeers.len & "/" & $inTarget,
relayOutboundTarget = $connectedOutPeers.len & "/" & $outTarget
# Always pick random connectable relay peers
shuffle(connectablePeers)
let length = min(outPeerDiff, connectablePeers.len)
for peer in connectablePeers[0 ..< length]:
trace "Peer To Connect To", peerId = $peer.peerId
peersToConnect.incl(peer.peerId)
await pm.pruneInRelayConns(peersToDisconnect)
if peersToConnect.len == 0:
return
let uniquePeers = toSeq(peersToConnect).mapIt(peerStore.getPeer(it))
# Connect to all nodes
for i in countup(0, uniquePeers.len, MaxParallelDials):
let stop = min(i + MaxParallelDials, uniquePeers.len)
trace "Connecting to Peers", peerIds = $uniquePeers[i ..< stop]
await pm.connectToNodes(uniquePeers[i ..< stop])
proc prunePeerStore*(pm: PeerManager) =
let peerStore = pm.switch.peerStore
let numPeers = peerStore[AddressBook].book.len
let capacity = peerStore.getCapacity()
if numPeers <= capacity:
return
trace "Peer store capacity exceeded", numPeers = numPeers, capacity = capacity
let pruningCount = numPeers - capacity
var peersToPrune: HashSet[PeerId]
# prune failed connections
for peerId, count in peerStore[NumberFailedConnBook].book.pairs:
if count < pm.maxFailedAttempts:
continue
if peersToPrune.len >= pruningCount:
break
peersToPrune.incl(peerId)
var notConnected = peerStore.getDisconnectedPeers().mapIt(it.peerId)
# Always pick random non-connected peers
shuffle(notConnected)
var shardlessPeers: seq[PeerId]
var peersByShard = initTable[uint16, seq[PeerId]]()
for peer in notConnected:
if not peerStore[ENRBook].contains(peer):
shardlessPeers.add(peer)
continue
let record = peerStore[ENRBook][peer]
let rec = record.toTyped().valueOr:
shardlessPeers.add(peer)
continue
let rs = rec.relaySharding().valueOr:
shardlessPeers.add(peer)
continue
for shard in rs.shardIds:
peersByShard.mgetOrPut(shard, @[]).add(peer)
# prune not connected peers without shard
for peer in shardlessPeers:
if peersToPrune.len >= pruningCount:
break
peersToPrune.incl(peer)
# calculate the avg peers per shard
let total = sum(toSeq(peersByShard.values).mapIt(it.len))
let avg = min(1, total div max(1, peersByShard.len))
# prune peers from shard with higher than avg count
for shard, peers in peersByShard.pairs:
let count = max(peers.len - avg, 0)
for peer in peers[0 .. count]:
if peersToPrune.len >= pruningCount:
break
peersToPrune.incl(peer)
for peer in peersToPrune:
peerStore.delete(peer)
let afterNumPeers = peerStore[AddressBook].book.len
trace "Finished pruning peer store",
beforeNumPeers = numPeers,
afterNumPeers = afterNumPeers,
capacity = capacity,
pruned = peersToPrune.len
# Prunes peers from peerstore to remove old/stale ones
proc prunePeerStoreLoop(pm: PeerManager) {.async.} =
trace "Starting prune peerstore loop"
while pm.started:
pm.prunePeerStore()
await sleepAsync(PrunePeerStoreInterval)
# Ensures a healthy amount of connected relay peers
proc relayConnectivityLoop*(pm: PeerManager) {.async.} =
trace "Starting relay connectivity loop"
while pm.started:
if pm.shardedPeerManagement:
await pm.manageRelayPeers()
else:
await pm.connectToRelayPeers()
let
(inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec)
excessInConns = max(inRelayPeers.len - pm.inRelayPeersTarget, 0)
# One minus the percentage of excess connections relative to the target, limited to 100%
# We calculate one minus this percentage because we want the factor to be inversely proportional to the number of excess peers
inFactor = 1 - min(excessInConns / pm.inRelayPeersTarget, 1)
# Percentage of out relay peers relative to the target
outFactor = min(outRelayPeers.len / pm.outRelayPeersTarget, 1)
factor = min(outFactor, inFactor)
dynamicSleepInterval =
chronos.seconds(int(float(ConnectivityLoopInterval.seconds()) * factor))
# Shorten the connectivity loop interval dynamically based on percentage of peers to fill or connections to prune
await sleepAsync(max(dynamicSleepInterval, chronos.seconds(1)))
proc pruneInRelayConns(pm: PeerManager, amount: int) {.async.} =
if amount <= 0:
return
let (inRelayPeers, _) = pm.connectedPeers(WakuRelayCodec)
let connsToPrune = min(amount, inRelayPeers.len)
for p in inRelayPeers[0 ..< connsToPrune]:
trace "Pruning Peer", Peer = $p
asyncSpawn(pm.evictPeer(p))
proc addExtPeerEventHandler*(
pm: PeerManager, eventHandler: PeerEventHandler, eventKind: PeerEventKind
) =
pm.switch.addPeerEventHandler(eventHandler, eventKind)
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~#
# Initialization and Constructor #
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~#
proc setShardGetter*(pm: PeerManager, c: GetShards) =
pm.getShards = c
proc start*(pm: PeerManager) =
pm.started = true
asyncSpawn pm.relayConnectivityLoop()
asyncSpawn pm.prunePeerStoreLoop()
asyncSpawn pm.logAndMetrics()
proc stop*(pm: PeerManager) =
pm.started = false
proc new*(
T: type PeerManager,
switch: Switch,
wakuMetadata: WakuMetadata = nil,
maxRelayPeers: Option[int] = none(int),
maxServicePeers: Option[int] = none(int),
relayServiceRatio: string = "50:50",
storage: PeerStorage = nil,
initialBackoffInSec = InitialBackoffInSec,
backoffFactor = BackoffFactor,
maxFailedAttempts = MaxFailedAttempts,
colocationLimit = DefaultColocationLimit,
shardedPeerManagement = false,
maxConnections: int = MaxConnections,
): PeerManager {.gcsafe.} =
let capacity = switch.peerStore.capacity
if maxConnections > capacity:
error "Max number of connections can't be greater than PeerManager capacity",
capacity = capacity, maxConnections = maxConnections
raise newException(
Defect, "Max number of connections can't be greater than PeerManager capacity"
)
var relayRatio: float64
var serviceRatio: float64
(relayRatio, serviceRatio) = parseRelayServiceRatio(relayServiceRatio).get()
var relayPeers = int(ceil(float(maxConnections) * relayRatio))
var servicePeers = int(floor(float(maxConnections) * serviceRatio))
let minRelayPeers = WakuRelay.getDHigh()
if relayPeers < minRelayPeers:
let errorMsg =
fmt"""Doesn't fulfill minimum criteria for relay (which increases the chance of the node becoming isolated.)
relayPeers: {relayPeers}, should be greater or equal than minRelayPeers: {minRelayPeers}
relayServiceRatio: {relayServiceRatio}
maxConnections: {maxConnections}"""
error "Wrong relay peers config", error = errorMsg
return
let outRelayPeersTarget = relayPeers div 3
let inRelayPeersTarget = relayPeers - outRelayPeersTarget
# attempt to calculate max backoff to prevent potential overflows or unreasonably high values
let backoff = calculateBackoff(initialBackoffInSec, backoffFactor, maxFailedAttempts)
if backoff.weeks() > 1:
error "Max backoff time can't be over 1 week", maxBackoff = backoff
raise newException(Defect, "Max backoff time can't be over 1 week")
let brokerCtx = globalBrokerContext()
let pm = PeerManager(
switch: switch,
brokerCtx: brokerCtx,
wakuMetadata: wakuMetadata,
storage: storage,
initialBackoffInSec: initialBackoffInSec,
backoffFactor: backoffFactor,
maxRelayPeers: relayPeers,
maxServicePeers: servicePeers,
outRelayPeersTarget: outRelayPeersTarget,
inRelayPeersTarget: inRelayPeersTarget,
maxFailedAttempts: maxFailedAttempts,
colocationLimit: colocationLimit,
shardedPeerManagement: shardedPeerManagement,
online: true,
maxConnections: maxConnections,
)
proc peerHook(
peerId: PeerId, event: PeerEvent
): Future[void] {.gcsafe, async: (raises: [CancelledError]).} =
try:
await onPeerEvent(pm, peerId, event)
except CatchableError:
error "exception in onPeerEvent", error = getCurrentExceptionMsg()
var peerStore = pm.switch.peerStore
proc peerStoreChanged(peerId: PeerId) {.gcsafe.} =
waku_peer_store_size.set(toSeq(peerStore[AddressBook].book.keys).len.int64)
pm.switch.addPeerEventHandler(peerHook, PeerEventKind.Joined)
pm.switch.addPeerEventHandler(peerHook, PeerEventKind.Left)
# called every time the peerstore is updated
peerStore[AddressBook].addHandler(peerStoreChanged)
pm.serviceSlots = initTable[string, RemotePeerInfo]()
pm.ipTable = initTable[string, seq[PeerId]]()
pm.activeStoreRequests = initTable[PeerId, int]()
if not storage.isNil():
trace "found persistent peer storage"
pm.loadFromStorage() # Load previously managed peers.
else:
trace "no peer storage found"
return pm