logos-delivery/waku/node/subscription_manager.nim

679 lines
24 KiB
Nim
Raw Normal View History

import std/[sequtils, sets, tables, options], chronos, chronicles, metrics, results
feat: active filter subscription management for edge nodes (#3773) feat: active filter subscription management for edge nodes ## Subscription Manager * edgeFilterSubLoop reconciles desired vs actual filter subscriptions * edgeFilterHealthLoop pings filter peers, evicts stale ones * EdgeFilterSubState per-shard tracking of confirmed peers and health * best-effort unsubscribe on peer removal * RequestEdgeShardHealth and RequestEdgeFilterPeerCount broker providers ## WakuNode * Remove old edge health loop (loopEdgeHealth, edgeHealthEvent, calculateEdgeTopicHealth) * Register MessageSeenEvent push handler on filter client during start * startDeliveryService now returns `Result[void, string]` and propagates errors ## Health Monitor * getFilterClientHealth queries RequestEdgeFilterPeerCount via broker * Shard/content health providers fall back to RequestEdgeShardHealth when relay inactive * Listen to EventShardTopicHealthChange for health recalculation * Add missing return p.notReady() on failed edge filter peer count request * HealthyThreshold constant moved to `connection_status.nim` ## Broker types * RequestEdgeShardHealth, RequestEdgeFilterPeerCount request types * EventShardTopicHealthChange event type ## Filter Client * Add timeout parameter to ping proc ## Tests * Health monitor event tests with per-node lockNewGlobalBrokerContext * Edge (light client) health update test * Edge health driven by confirmed filter subscriptions test * API subscription tests: sub/receive, failover, peer replacement Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Co-authored by Zoltan Nagy
2026-03-30 08:30:34 -03:00
import libp2p/[peerid, peerinfo]
feat: persistency (#3880) * persistency: per-job SQLite-backed storage layer (singleton, brokered) Adds a backend-neutral CRUD library at waku/persistency/, plus the nim-brokers dependency swap that enables it. Architecture (ports-and-adapters): * Persistency: process-wide singleton, one root directory. * Job: one tenant, one DB file, one worker thread, one BrokerContext. * Backend: SQLite via waku/common/databases/db_sqlite. Uniform schema kv(category BLOB, key BLOB, payload BLOB) PRIMARY KEY (category, key) WITHOUT ROWID, WAL mode. * Writes are fire-and-forget via EventBroker(mt) PersistEvent. * Reads are async via five RequestBroker(mt) shapes (KvGet, KvExists, KvScan, KvCount, KvDelete). Reads return Result[T, PersistencyError]. * One storage thread per job; tenants isolated by BrokerContext. Public surface (waku/persistency/persistency.nim): Persistency.instance(rootDir) / Persistency.instance() / Persistency.reset() p.openJob(id) / p.closeJob(id) / p.dropJob(id) / p.close() p.job(id) / p[id] / p.hasJob(id) Writes (Job form & string-id form, fire-and-forget): persist / persistPut / persistDelete / persistEncoded Reads (Job form & string-id form, async Result): get / exists / scan / scanPrefix / count / deleteAcked Key & payload encoding (keys.nim, payload.nim): * encodePart family + variadic key(...) / payload(...) macros + single-value toKey / toPayload. * Primitives: string and openArray[byte] are 2-byte BE length + bytes; int{8..64} are sign-flipped 8-byte BE; uint{16..64} are 8-byte BE; bool/byte/char are 1 byte; enums are int64(ord(v)). * Generic encodePart[T: tuple | object] recurses through fields() so any composite Nim type is encodable without ceremony. * Stable across Nim/C compiler upgrades: no sizeof, no memcpy, no cast on pointers, no host-endianness dependency. * `rawKey(bytes)` + `persistPut(..., openArray[byte])` let callers bypass the built-in encoder with their own format (CBOR, protobuf...). Lifecycle: * Persistency.new is private; Persistency.instance is the only public constructor. Same rootDir is idempotent; conflicting rootDir is peInvalidArgument. Persistency.reset for test/restart paths. * openJob opens-or-creates the per-job SQLite file; an existing file is reused with its data preserved. * Teardown integration: Persistency.instance registers a Teardown MultiRequestBroker provider that closes all jobs and clears the singleton slot when Waku.stop() issues Teardown.request. Internal layering: types.nim pure value types (Key, KeyRange, KvRow, TxOp, PersistencyError) keys.nim encodePart primitives + key(...) macro payload.nim toPayload + payload(...) macro schema.nim CREATE TABLE + connection pragmas + user_version backend_sqlite.nim KvBackend, applyOps (single source of write SQL), getOne/existsOne/deleteOne, scanRange (asc/desc, half-open ranges, open-ended stop), countRange backend_comm.nim EventBroker(mt) PersistEvent + 5 RequestBroker(mt) declarations; encodeErr/decodeErr boundary helpers backend_thread.nim startStorageThread / stopStorageThread (shared allocShared0 arg, cstring dbPath, atomic ready/shutdown flags); per-thread provider registration persistency.nim Persistency + Job types, singleton state, public facade ../requests/lifecycle_requests.nim Teardown MultiRequestBroker Tests (69 cases, all passing): test_keys.nim sort-order invariants (length-prefix strings, sign-flipped ints, composite tuples, prefix range) test_backend.nim round-trip / replace / delete-return-value / batched atomicity / asc-desc-half-open-open- ended scans / category isolation / batch txDelete test_lifecycle.nim open-or-create rootDir / non-dir collision / reopen across sessions / idempotent openJob / two-tenant parallel isolation / closeJob joins worker / dropJob removes file / acked delete test_facade.nim put-then-get / atomic batch / scanPrefix asc/desc / deleteAcked hit-miss / fire-and-forget delete / two-tenant facade isolation test_encoding.nim tuple/named-tuple/object keys, embedded Key, enum encoding, field-major composite sort, payload struct encoding, end-to-end struct round-trip through SQLite test_string_lookup.nim peJobNotFound semantics / hasJob / subscript / persistPut+get via id / reads short-circuit / writes drop+warn / persistEncoded via id / scan parity Job-ref vs id test_singleton.nim idempotent same-rootDir / different-rootDir rejection / no-arg instance lifecycle / reset retargets / reset idempotence / Teardown.request end-to-end Prerequisite delivered in the same series: replace the in-tree broker implementation with the external nim-brokers package; update all broker call-sites (waku_filter_v2, waku_relay, waku_rln_relay, delivery_service, peer_manager, requests/*, factory/*, api tests, etc.) to the new package API; chat2 made to compile again. Note: SDS adapter (Phase 5 of the design) is deferred -- nim-sds is still developed side-by-side and the persistency layer is intentionally SDS-agnostic. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * persistency: pin nim-brokers by URL+commit (workaround for stale registry) The bare `brokers >= 2.0.1` form cannot resolve on machines where the local nimble SAT solver enumerates only the registry-recorded 0.1.0 for brokers. The nim-lang/packages entry for `brokers` carries no per-tag metadata (only the URL), so until that registry entry is refreshed the SAT solver clamps the available-versions list to 0.1.0 and rejects the >= 2.0.1 constraint -- even though pkgs2 and pkgcache both have v2.0.1 cloned locally. Pinning by URL+commit bypasses the registry path entirely. Inline comment in waku.nimble documents the situation and the path back to the bare form once nim-lang/packages is updated. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * persistency: nph format pass Run `nph` on all 57 Nim files touched by this PR. Pure formatting: 17 files re-styled, no semantic change. Suite still 69/69. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * Fix build, add local-storage-path config, lazy init of Persistency from Waku start * fix: fix nix deps * fixes for nix build, regenerate deps * reverting accidental dependency changes * Fixing deps * Apply suggestions from code review Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> * persistency tests: migrate to suite / asyncTest / await Match the in-tree test convention (procSuite -> suite, sync test + waitFor -> asyncTest + await): - procSuite "X": -> suite "X": - For tests doing async work: test -> asyncTest, waitFor -> await. - Poll helpers (proc waitFor(t: Job, ...) in test_lifecycle.nim, proc waitUntilExists(...) in test_facade.nim and test_string_lookup.nim) -> Future[bool] {.async.}, internal `waitFor X` -> `await X`, internal `sleep(N)` -> `await sleepAsync(chronos.milliseconds(N))`. - Renamed test_lifecycle.nim's helper proc from `waitFor(t: Job, ...)` -> `pollExists(t: Job, ...)`; the previous name shadowed chronos.waitFor in the chronos macro expansion. - `chronos.milliseconds(N)` explicitly qualified because `std/times` also exports `milliseconds` (returning TimeInterval, not Duration). - `check await x` -> `let okN = await x; check okN` to dodge chronos's "yield in expr not lowered" with await-as-macro-argument. - `(await x).foo()` -> `let awN = await x; ... awN.foo() ...` for the same reason. waku/persistency/persistency.nim: nph also pulled the proc signatures across multiple lines; restored explicit `Future[void] {.async.}` return types after the colon (an intermediate nph pass had elided them). Suite: 71 / 71 OK against the new async write surface. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * use idiomatic valueOr instead of ifs * Reworked persistency shutdown, remove not necessary teardown mechanism * Use const for DefaultStoragePath * format to follow coding guidelines - no use of result and explicit returns - no functional change --------- Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com> Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com>
2026-05-16 00:09:07 +02:00
import brokers/broker_context
import
waku/[
waku_core,
waku_core/topics/sharding,
node/node_types,
node/node_telemetry,
waku_relay,
waku_archive,
waku_store_sync,
feat: active filter subscription management for edge nodes (#3773) feat: active filter subscription management for edge nodes ## Subscription Manager * edgeFilterSubLoop reconciles desired vs actual filter subscriptions * edgeFilterHealthLoop pings filter peers, evicts stale ones * EdgeFilterSubState per-shard tracking of confirmed peers and health * best-effort unsubscribe on peer removal * RequestEdgeShardHealth and RequestEdgeFilterPeerCount broker providers ## WakuNode * Remove old edge health loop (loopEdgeHealth, edgeHealthEvent, calculateEdgeTopicHealth) * Register MessageSeenEvent push handler on filter client during start * startDeliveryService now returns `Result[void, string]` and propagates errors ## Health Monitor * getFilterClientHealth queries RequestEdgeFilterPeerCount via broker * Shard/content health providers fall back to RequestEdgeShardHealth when relay inactive * Listen to EventShardTopicHealthChange for health recalculation * Add missing return p.notReady() on failed edge filter peer count request * HealthyThreshold constant moved to `connection_status.nim` ## Broker types * RequestEdgeShardHealth, RequestEdgeFilterPeerCount request types * EventShardTopicHealthChange event type ## Filter Client * Add timeout parameter to ping proc ## Tests * Health monitor event tests with per-node lockNewGlobalBrokerContext * Edge (light client) health update test * Edge health driven by confirmed filter subscriptions test * API subscription tests: sub/receive, failover, peer replacement Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Co-authored by Zoltan Nagy
2026-03-30 08:30:34 -03:00
waku_filter_v2/common as filter_common,
waku_filter_v2/client as filter_client,
waku_filter_v2/protocol as filter_protocol,
events/health_events,
events/message_events,
feat: active filter subscription management for edge nodes (#3773) feat: active filter subscription management for edge nodes ## Subscription Manager * edgeFilterSubLoop reconciles desired vs actual filter subscriptions * edgeFilterHealthLoop pings filter peers, evicts stale ones * EdgeFilterSubState per-shard tracking of confirmed peers and health * best-effort unsubscribe on peer removal * RequestEdgeShardHealth and RequestEdgeFilterPeerCount broker providers ## WakuNode * Remove old edge health loop (loopEdgeHealth, edgeHealthEvent, calculateEdgeTopicHealth) * Register MessageSeenEvent push handler on filter client during start * startDeliveryService now returns `Result[void, string]` and propagates errors ## Health Monitor * getFilterClientHealth queries RequestEdgeFilterPeerCount via broker * Shard/content health providers fall back to RequestEdgeShardHealth when relay inactive * Listen to EventShardTopicHealthChange for health recalculation * Add missing return p.notReady() on failed edge filter peer count request * HealthyThreshold constant moved to `connection_status.nim` ## Broker types * RequestEdgeShardHealth, RequestEdgeFilterPeerCount request types * EventShardTopicHealthChange event type ## Filter Client * Add timeout parameter to ping proc ## Tests * Health monitor event tests with per-node lockNewGlobalBrokerContext * Edge (light client) health update test * Edge health driven by confirmed filter subscriptions test * API subscription tests: sub/receive, failover, peer replacement Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Co-authored by Zoltan Nagy
2026-03-30 08:30:34 -03:00
events/peer_events,
requests/health_requests,
node/peer_manager,
node/health_monitor/topic_health,
node/health_monitor/connection_status,
]
{.push raises: [].}
feat: active filter subscription management for edge nodes (#3773) feat: active filter subscription management for edge nodes ## Subscription Manager * edgeFilterSubLoop reconciles desired vs actual filter subscriptions * edgeFilterHealthLoop pings filter peers, evicts stale ones * EdgeFilterSubState per-shard tracking of confirmed peers and health * best-effort unsubscribe on peer removal * RequestEdgeShardHealth and RequestEdgeFilterPeerCount broker providers ## WakuNode * Remove old edge health loop (loopEdgeHealth, edgeHealthEvent, calculateEdgeTopicHealth) * Register MessageSeenEvent push handler on filter client during start * startDeliveryService now returns `Result[void, string]` and propagates errors ## Health Monitor * getFilterClientHealth queries RequestEdgeFilterPeerCount via broker * Shard/content health providers fall back to RequestEdgeShardHealth when relay inactive * Listen to EventShardTopicHealthChange for health recalculation * Add missing return p.notReady() on failed edge filter peer count request * HealthyThreshold constant moved to `connection_status.nim` ## Broker types * RequestEdgeShardHealth, RequestEdgeFilterPeerCount request types * EventShardTopicHealthChange event type ## Filter Client * Add timeout parameter to ping proc ## Tests * Health monitor event tests with per-node lockNewGlobalBrokerContext * Edge (light client) health update test * Edge health driven by confirmed filter subscriptions test * API subscription tests: sub/receive, failover, peer replacement Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Co-authored by Zoltan Nagy
2026-03-30 08:30:34 -03:00
proc doRelaySubscribe(
node: WakuNode, shard: PubsubTopic, appHandler: WakuRelayHandler = nil
): bool =
let alreadySubscribed = node.wakuRelay.isSubscribed(shard)
feat: active filter subscription management for edge nodes (#3773) feat: active filter subscription management for edge nodes ## Subscription Manager * edgeFilterSubLoop reconciles desired vs actual filter subscriptions * edgeFilterHealthLoop pings filter peers, evicts stale ones * EdgeFilterSubState per-shard tracking of confirmed peers and health * best-effort unsubscribe on peer removal * RequestEdgeShardHealth and RequestEdgeFilterPeerCount broker providers ## WakuNode * Remove old edge health loop (loopEdgeHealth, edgeHealthEvent, calculateEdgeTopicHealth) * Register MessageSeenEvent push handler on filter client during start * startDeliveryService now returns `Result[void, string]` and propagates errors ## Health Monitor * getFilterClientHealth queries RequestEdgeFilterPeerCount via broker * Shard/content health providers fall back to RequestEdgeShardHealth when relay inactive * Listen to EventShardTopicHealthChange for health recalculation * Add missing return p.notReady() on failed edge filter peer count request * HealthyThreshold constant moved to `connection_status.nim` ## Broker types * RequestEdgeShardHealth, RequestEdgeFilterPeerCount request types * EventShardTopicHealthChange event type ## Filter Client * Add timeout parameter to ping proc ## Tests * Health monitor event tests with per-node lockNewGlobalBrokerContext * Edge (light client) health update test * Edge health driven by confirmed filter subscriptions test * API subscription tests: sub/receive, failover, peer replacement Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Co-authored by Zoltan Nagy
2026-03-30 08:30:34 -03:00
if not appHandler.isNil():
if not alreadySubscribed or not node.legacyAppHandlers.hasKey(shard):
node.legacyAppHandlers[shard] = appHandler
else:
debug "Legacy appHandler already exists for active PubsubTopic, ignoring new handler",
topic = shard
if alreadySubscribed:
return false
proc traceHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
let msgSizeKB = msg.payload.len / 1000
waku_node_messages.inc(labelValues = ["relay"])
waku_histogram_message_size.observe(msgSizeKB)
proc filterHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
if node.wakuFilter.isNil():
return
await node.wakuFilter.handleMessage(topic, msg)
proc archiveHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
if node.wakuArchive.isNil():
return
await node.wakuArchive.handleMessage(topic, msg)
proc syncHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
if node.wakuStoreReconciliation.isNil():
return
node.wakuStoreReconciliation.messageIngress(topic, msg)
proc internalHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
MessageSeenEvent.emit(node.brokerCtx, topic, msg)
let uniqueTopicHandler = proc(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
await traceHandler(topic, msg)
await filterHandler(topic, msg)
await archiveHandler(topic, msg)
await syncHandler(topic, msg)
await internalHandler(topic, msg)
if node.legacyAppHandlers.hasKey(topic) and not node.legacyAppHandlers[topic].isNil():
await node.legacyAppHandlers[topic](topic, msg)
node.wakuRelay.subscribe(shard, uniqueTopicHandler)
node.topicSubscriptionQueue.emit((kind: PubsubSub, topic: shard))
return true
proc doRelayUnsubscribe(node: WakuNode, shard: PubsubTopic) =
if node.legacyAppHandlers.hasKey(shard):
node.legacyAppHandlers.del(shard)
if node.wakuRelay.isSubscribed(shard):
node.wakuRelay.unsubscribe(shard)
node.topicSubscriptionQueue.emit((kind: PubsubUnsub, topic: shard))
proc new*(T: type SubscriptionManager, node: WakuNode): T =
T(
node: node,
shards: initTable[PubsubTopic, ShardSubscription](),
edgeFilterSubStates: initTable[PubsubTopic, EdgeFilterSubState](),
edgeFilterWakeup: newAsyncEvent(),
)
func wanted(entry: ShardSubscription): bool =
## True if the shard has content-topic interest or a direct subscription.
return entry.contentTopics.len > 0 or entry.directShardSub
proc isContentSubscribed*(
self: SubscriptionManager, shard: PubsubTopic, contentTopic: ContentTopic
): bool =
self.shards.withValue(shard, sub):
return contentTopic in sub.contentTopics
return false
iterator subscribedContentTopics*(
self: SubscriptionManager
): (PubsubTopic, HashSet[ContentTopic]) =
## Yields each shard with its non-empty content-topic set.
for shard, sub in self.shards.pairs:
if sub.contentTopics.len > 0:
yield (shard, sub.contentTopics)
func toTopicHealth*(peersCount: int): TopicHealth =
if peersCount >= HealthyThreshold:
return TopicHealth.SUFFICIENTLY_HEALTHY
elif peersCount > 0:
return TopicHealth.MINIMALLY_HEALTHY
else:
return TopicHealth.UNHEALTHY
proc edgeFilterPeerCount*(self: SubscriptionManager, shard: PubsubTopic): int =
self.edgeFilterSubStates.withValue(shard, state):
return state.peers.len
return 0
proc getShardForContentTopic(
self: SubscriptionManager, topic: ContentTopic
): Result[PubsubTopic, string] =
if self.node.wakuAutoSharding.isSome():
let shardObj = ?self.node.wakuAutoSharding.get().getShard(topic)
return ok($shardObj)
return err("autosharding is not configured; pass an explicit shard")
proc subscribeShard*(
self: SubscriptionManager, shard: PubsubTopic, handler: WakuRelayHandler = nil
): Result[void, string] =
## Subscribes to the shard directly and joins the relay mesh.
var added = false
self.shards.withValue(shard, entry):
if not entry.directShardSub:
entry.directShardSub = true
added = true
do:
2026-05-29 13:13:14 -03:00
self.shards[shard] = ShardSubscription(
contentTopics: initHashSet[ContentTopic](), directShardSub: true
)
added = true
if added:
self.edgeFilterWakeup.fire()
if not isNil(self.node.wakuRelay):
discard self.node.doRelaySubscribe(shard, handler)
return ok()
proc unsubscribeShard*(
self: SubscriptionManager, shard: PubsubTopic
): Result[void, string] =
## Drops the direct shard subscription; unsubscribes the mesh if no content topic wants it.
var removed = false
var shardEmpty = false
self.shards.withValue(shard, entry):
if entry.directShardSub:
entry.directShardSub = false
removed = true
shardEmpty = not entry[].wanted()
if removed:
self.edgeFilterWakeup.fire()
if shardEmpty:
self.shards.del(shard)
if not isNil(self.node.wakuRelay):
self.node.doRelayUnsubscribe(shard)
return ok()
proc subscribe*(
self: SubscriptionManager,
shard: PubsubTopic,
contentTopic: ContentTopic,
handler: WakuRelayHandler = nil,
): Result[void, string] =
## Adds content-topic interest on the shard and joins the relay mesh.
var added = false
self.shards.withValue(shard, entry):
if contentTopic notin entry.contentTopics:
entry.contentTopics.incl(contentTopic)
added = true
do:
var entry = ShardSubscription(contentTopics: initHashSet[ContentTopic]())
entry.contentTopics.incl(contentTopic)
self.shards[shard] = entry
added = true
if added:
self.edgeFilterWakeup.fire()
if not isNil(self.node.wakuRelay):
discard self.node.doRelaySubscribe(shard, handler)
return ok()
proc unsubscribe*(
self: SubscriptionManager, shard: PubsubTopic, contentTopic: ContentTopic
): Result[void, string] =
## Drops content-topic interest on the shard; unsubscribes the mesh if nothing else wants it.
var removed = false
var shardEmpty = false
self.shards.withValue(shard, entry):
if contentTopic in entry.contentTopics:
entry.contentTopics.excl(contentTopic)
removed = true
shardEmpty = not entry[].wanted()
if removed:
self.edgeFilterWakeup.fire()
if shardEmpty:
self.shards.del(shard)
if not isNil(self.node.wakuRelay):
self.node.doRelayUnsubscribe(shard)
return ok()
2026-05-29 13:13:14 -03:00
proc subscribe*(self: SubscriptionManager, topic: ContentTopic): Result[void, string] =
## Subscribes to a content topic, resolving its shard via autosharding.
let shard = ?self.getShardForContentTopic(topic)
return self.subscribe(shard, topic)
proc unsubscribe*(
self: SubscriptionManager, topic: ContentTopic
): Result[void, string] =
## Unsubscribes from a content topic, resolving its shard via autosharding.
let shard = ?self.getShardForContentTopic(topic)
return self.unsubscribe(shard, topic)
proc unsubscribeAll*(
self: SubscriptionManager, shard: PubsubTopic
): Result[void, string] =
## Drops every content topic on the shard, then the direct subscription.
var snapshot: seq[ContentTopic]
self.shards.withValue(shard, sub):
snapshot = toSeq(sub.contentTopics)
for contentTopic in snapshot:
?self.unsubscribe(shard, contentTopic)
return self.unsubscribeShard(shard)
proc isSubscribed*(
self: SubscriptionManager, topic: ContentTopic
): Result[bool, string] =
let shard = ?self.getShardForContentTopic(topic)
return ok(self.isContentSubscribed(shard, topic))
proc subscribeAllAutoshards*(self: SubscriptionManager): Result[void, string] =
## Subscribes the relay to every shard in the configured autosharding cluster.
if self.node.wakuRelay.isNil() or self.node.wakuAutoSharding.isNone():
return ok()
let autoSharding = self.node.wakuAutoSharding.get()
let numShards = autoSharding.shardCountGenZero
if numShards == 0:
return ok()
feat: active filter subscription management for edge nodes (#3773) feat: active filter subscription management for edge nodes ## Subscription Manager * edgeFilterSubLoop reconciles desired vs actual filter subscriptions * edgeFilterHealthLoop pings filter peers, evicts stale ones * EdgeFilterSubState per-shard tracking of confirmed peers and health * best-effort unsubscribe on peer removal * RequestEdgeShardHealth and RequestEdgeFilterPeerCount broker providers ## WakuNode * Remove old edge health loop (loopEdgeHealth, edgeHealthEvent, calculateEdgeTopicHealth) * Register MessageSeenEvent push handler on filter client during start * startDeliveryService now returns `Result[void, string]` and propagates errors ## Health Monitor * getFilterClientHealth queries RequestEdgeFilterPeerCount via broker * Shard/content health providers fall back to RequestEdgeShardHealth when relay inactive * Listen to EventShardTopicHealthChange for health recalculation * Add missing return p.notReady() on failed edge filter peer count request * HealthyThreshold constant moved to `connection_status.nim` ## Broker types * RequestEdgeShardHealth, RequestEdgeFilterPeerCount request types * EventShardTopicHealthChange event type ## Filter Client * Add timeout parameter to ping proc ## Tests * Health monitor event tests with per-node lockNewGlobalBrokerContext * Edge (light client) health update test * Edge health driven by confirmed filter subscriptions test * API subscription tests: sub/receive, failover, peer replacement Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Co-authored by Zoltan Nagy
2026-03-30 08:30:34 -03:00
for i in 0'u32 ..< numShards:
let shardObj = RelayShard(clusterId: autoSharding.clusterId, shardId: uint16(i))
self.subscribeShard(PubsubTopic($shardObj)).isOkOr:
error "failed to auto-subscribe relay to cluster shard",
shard = $shardObj, error = error
ok()
{.pop.}
feat: active filter subscription management for edge nodes (#3773) feat: active filter subscription management for edge nodes ## Subscription Manager * edgeFilterSubLoop reconciles desired vs actual filter subscriptions * edgeFilterHealthLoop pings filter peers, evicts stale ones * EdgeFilterSubState per-shard tracking of confirmed peers and health * best-effort unsubscribe on peer removal * RequestEdgeShardHealth and RequestEdgeFilterPeerCount broker providers ## WakuNode * Remove old edge health loop (loopEdgeHealth, edgeHealthEvent, calculateEdgeTopicHealth) * Register MessageSeenEvent push handler on filter client during start * startDeliveryService now returns `Result[void, string]` and propagates errors ## Health Monitor * getFilterClientHealth queries RequestEdgeFilterPeerCount via broker * Shard/content health providers fall back to RequestEdgeShardHealth when relay inactive * Listen to EventShardTopicHealthChange for health recalculation * Add missing return p.notReady() on failed edge filter peer count request * HealthyThreshold constant moved to `connection_status.nim` ## Broker types * RequestEdgeShardHealth, RequestEdgeFilterPeerCount request types * EventShardTopicHealthChange event type ## Filter Client * Add timeout parameter to ping proc ## Tests * Health monitor event tests with per-node lockNewGlobalBrokerContext * Edge (light client) health update test * Edge health driven by confirmed filter subscriptions test * API subscription tests: sub/receive, failover, peer replacement Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Co-authored by Zoltan Nagy
2026-03-30 08:30:34 -03:00
const EdgeFilterSubscribeTimeout = chronos.seconds(15)
## Timeout for a single filter subscribe/unsubscribe RPC to a service peer.
const EdgeFilterPingTimeout = chronos.seconds(5)
## Timeout for a filter ping health check.
const EdgeFilterLoopInterval = chronos.seconds(30)
## Interval for the edge filter health ping loop.
const EdgeFilterSubLoopDebounce = chronos.seconds(1)
## Debounce delay to coalesce rapid-fire wakeups into a single reconciliation pass.
type EdgeDialTask = object
peer: RemotePeerInfo
shard: PubsubTopic
topics: seq[ContentTopic]
feat: active filter subscription management for edge nodes (#3773) feat: active filter subscription management for edge nodes ## Subscription Manager * edgeFilterSubLoop reconciles desired vs actual filter subscriptions * edgeFilterHealthLoop pings filter peers, evicts stale ones * EdgeFilterSubState per-shard tracking of confirmed peers and health * best-effort unsubscribe on peer removal * RequestEdgeShardHealth and RequestEdgeFilterPeerCount broker providers ## WakuNode * Remove old edge health loop (loopEdgeHealth, edgeHealthEvent, calculateEdgeTopicHealth) * Register MessageSeenEvent push handler on filter client during start * startDeliveryService now returns `Result[void, string]` and propagates errors ## Health Monitor * getFilterClientHealth queries RequestEdgeFilterPeerCount via broker * Shard/content health providers fall back to RequestEdgeShardHealth when relay inactive * Listen to EventShardTopicHealthChange for health recalculation * Add missing return p.notReady() on failed edge filter peer count request * HealthyThreshold constant moved to `connection_status.nim` ## Broker types * RequestEdgeShardHealth, RequestEdgeFilterPeerCount request types * EventShardTopicHealthChange event type ## Filter Client * Add timeout parameter to ping proc ## Tests * Health monitor event tests with per-node lockNewGlobalBrokerContext * Edge (light client) health update test * Edge health driven by confirmed filter subscriptions test * API subscription tests: sub/receive, failover, peer replacement Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Co-authored by Zoltan Nagy
2026-03-30 08:30:34 -03:00
proc updateShardHealth(
self: SubscriptionManager, shard: PubsubTopic, state: var EdgeFilterSubState
) =
## Recompute and emit health for a shard after its peer set changed.
let newHealth = toTopicHealth(state.peers.len)
if newHealth != state.currentHealth:
state.currentHealth = newHealth
EventShardTopicHealthChange.emit(self.node.brokerCtx, shard, newHealth)
proc removePeer(self: SubscriptionManager, shard: PubsubTopic, peerId: PeerId) =
## Remove a peer from edgeFilterSubStates for the given shard,
## update health, and wake the sub loop to dial a replacement.
## Best-effort unsubscribe so the service peer stops pushing to us.
self.edgeFilterSubStates.withValue(shard, state):
var idx = -1
for i, p in state.peers:
feat: active filter subscription management for edge nodes (#3773) feat: active filter subscription management for edge nodes ## Subscription Manager * edgeFilterSubLoop reconciles desired vs actual filter subscriptions * edgeFilterHealthLoop pings filter peers, evicts stale ones * EdgeFilterSubState per-shard tracking of confirmed peers and health * best-effort unsubscribe on peer removal * RequestEdgeShardHealth and RequestEdgeFilterPeerCount broker providers ## WakuNode * Remove old edge health loop (loopEdgeHealth, edgeHealthEvent, calculateEdgeTopicHealth) * Register MessageSeenEvent push handler on filter client during start * startDeliveryService now returns `Result[void, string]` and propagates errors ## Health Monitor * getFilterClientHealth queries RequestEdgeFilterPeerCount via broker * Shard/content health providers fall back to RequestEdgeShardHealth when relay inactive * Listen to EventShardTopicHealthChange for health recalculation * Add missing return p.notReady() on failed edge filter peer count request * HealthyThreshold constant moved to `connection_status.nim` ## Broker types * RequestEdgeShardHealth, RequestEdgeFilterPeerCount request types * EventShardTopicHealthChange event type ## Filter Client * Add timeout parameter to ping proc ## Tests * Health monitor event tests with per-node lockNewGlobalBrokerContext * Edge (light client) health update test * Edge health driven by confirmed filter subscriptions test * API subscription tests: sub/receive, failover, peer replacement Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Co-authored by Zoltan Nagy
2026-03-30 08:30:34 -03:00
if p.peerId == peerId:
idx = i
feat: active filter subscription management for edge nodes (#3773) feat: active filter subscription management for edge nodes ## Subscription Manager * edgeFilterSubLoop reconciles desired vs actual filter subscriptions * edgeFilterHealthLoop pings filter peers, evicts stale ones * EdgeFilterSubState per-shard tracking of confirmed peers and health * best-effort unsubscribe on peer removal * RequestEdgeShardHealth and RequestEdgeFilterPeerCount broker providers ## WakuNode * Remove old edge health loop (loopEdgeHealth, edgeHealthEvent, calculateEdgeTopicHealth) * Register MessageSeenEvent push handler on filter client during start * startDeliveryService now returns `Result[void, string]` and propagates errors ## Health Monitor * getFilterClientHealth queries RequestEdgeFilterPeerCount via broker * Shard/content health providers fall back to RequestEdgeShardHealth when relay inactive * Listen to EventShardTopicHealthChange for health recalculation * Add missing return p.notReady() on failed edge filter peer count request * HealthyThreshold constant moved to `connection_status.nim` ## Broker types * RequestEdgeShardHealth, RequestEdgeFilterPeerCount request types * EventShardTopicHealthChange event type ## Filter Client * Add timeout parameter to ping proc ## Tests * Health monitor event tests with per-node lockNewGlobalBrokerContext * Edge (light client) health update test * Edge health driven by confirmed filter subscriptions test * API subscription tests: sub/receive, failover, peer replacement Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Co-authored by Zoltan Nagy
2026-03-30 08:30:34 -03:00
break
if idx < 0:
feat: active filter subscription management for edge nodes (#3773) feat: active filter subscription management for edge nodes ## Subscription Manager * edgeFilterSubLoop reconciles desired vs actual filter subscriptions * edgeFilterHealthLoop pings filter peers, evicts stale ones * EdgeFilterSubState per-shard tracking of confirmed peers and health * best-effort unsubscribe on peer removal * RequestEdgeShardHealth and RequestEdgeFilterPeerCount broker providers ## WakuNode * Remove old edge health loop (loopEdgeHealth, edgeHealthEvent, calculateEdgeTopicHealth) * Register MessageSeenEvent push handler on filter client during start * startDeliveryService now returns `Result[void, string]` and propagates errors ## Health Monitor * getFilterClientHealth queries RequestEdgeFilterPeerCount via broker * Shard/content health providers fall back to RequestEdgeShardHealth when relay inactive * Listen to EventShardTopicHealthChange for health recalculation * Add missing return p.notReady() on failed edge filter peer count request * HealthyThreshold constant moved to `connection_status.nim` ## Broker types * RequestEdgeShardHealth, RequestEdgeFilterPeerCount request types * EventShardTopicHealthChange event type ## Filter Client * Add timeout parameter to ping proc ## Tests * Health monitor event tests with per-node lockNewGlobalBrokerContext * Edge (light client) health update test * Edge health driven by confirmed filter subscriptions test * API subscription tests: sub/receive, failover, peer replacement Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Co-authored by Zoltan Nagy
2026-03-30 08:30:34 -03:00
return
let peer = state.peers[idx]
state.peers.del(idx)
feat: active filter subscription management for edge nodes (#3773) feat: active filter subscription management for edge nodes ## Subscription Manager * edgeFilterSubLoop reconciles desired vs actual filter subscriptions * edgeFilterHealthLoop pings filter peers, evicts stale ones * EdgeFilterSubState per-shard tracking of confirmed peers and health * best-effort unsubscribe on peer removal * RequestEdgeShardHealth and RequestEdgeFilterPeerCount broker providers ## WakuNode * Remove old edge health loop (loopEdgeHealth, edgeHealthEvent, calculateEdgeTopicHealth) * Register MessageSeenEvent push handler on filter client during start * startDeliveryService now returns `Result[void, string]` and propagates errors ## Health Monitor * getFilterClientHealth queries RequestEdgeFilterPeerCount via broker * Shard/content health providers fall back to RequestEdgeShardHealth when relay inactive * Listen to EventShardTopicHealthChange for health recalculation * Add missing return p.notReady() on failed edge filter peer count request * HealthyThreshold constant moved to `connection_status.nim` ## Broker types * RequestEdgeShardHealth, RequestEdgeFilterPeerCount request types * EventShardTopicHealthChange event type ## Filter Client * Add timeout parameter to ping proc ## Tests * Health monitor event tests with per-node lockNewGlobalBrokerContext * Edge (light client) health update test * Edge health driven by confirmed filter subscriptions test * API subscription tests: sub/receive, failover, peer replacement Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Co-authored by Zoltan Nagy
2026-03-30 08:30:34 -03:00
self.updateShardHealth(shard, state[])
self.edgeFilterWakeup.fire()
if not self.node.wakuFilterClient.isNil():
self.shards.withValue(shard, sub):
let ct = toSeq(sub.contentTopics)
feat: active filter subscription management for edge nodes (#3773) feat: active filter subscription management for edge nodes ## Subscription Manager * edgeFilterSubLoop reconciles desired vs actual filter subscriptions * edgeFilterHealthLoop pings filter peers, evicts stale ones * EdgeFilterSubState per-shard tracking of confirmed peers and health * best-effort unsubscribe on peer removal * RequestEdgeShardHealth and RequestEdgeFilterPeerCount broker providers ## WakuNode * Remove old edge health loop (loopEdgeHealth, edgeHealthEvent, calculateEdgeTopicHealth) * Register MessageSeenEvent push handler on filter client during start * startDeliveryService now returns `Result[void, string]` and propagates errors ## Health Monitor * getFilterClientHealth queries RequestEdgeFilterPeerCount via broker * Shard/content health providers fall back to RequestEdgeShardHealth when relay inactive * Listen to EventShardTopicHealthChange for health recalculation * Add missing return p.notReady() on failed edge filter peer count request * HealthyThreshold constant moved to `connection_status.nim` ## Broker types * RequestEdgeShardHealth, RequestEdgeFilterPeerCount request types * EventShardTopicHealthChange event type ## Filter Client * Add timeout parameter to ping proc ## Tests * Health monitor event tests with per-node lockNewGlobalBrokerContext * Edge (light client) health update test * Edge health driven by confirmed filter subscriptions test * API subscription tests: sub/receive, failover, peer replacement Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Co-authored by Zoltan Nagy
2026-03-30 08:30:34 -03:00
if ct.len > 0:
proc doUnsubscribe() {.async.} =
discard await self.node.wakuFilterClient.unsubscribe(peer, shard, ct)
asyncSpawn doUnsubscribe()
type SendChunkedFilterRpcKind = enum
FilterSubscribe
FilterUnsubscribe
proc sendChunkedFilterRpc(
self: SubscriptionManager,
peer: RemotePeerInfo,
shard: PubsubTopic,
topics: seq[ContentTopic],
kind: SendChunkedFilterRpcKind,
): Future[bool] {.async.} =
## Send a chunked filter subscribe or unsubscribe RPC. Returns true on
## success. On failure the peer is removed and false is returned.
try:
var i = 0
while i < topics.len:
let chunk =
topics[i ..< min(i + filter_protocol.MaxContentTopicsPerRequest, topics.len)]
let fut =
case kind
of FilterSubscribe:
self.node.wakuFilterClient.subscribe(peer, shard, chunk)
of FilterUnsubscribe:
self.node.wakuFilterClient.unsubscribe(peer, shard, chunk)
if not (await fut.withTimeout(EdgeFilterSubscribeTimeout)) or fut.read().isErr():
trace "sendChunkedFilterRpc: chunk failed",
op = kind, shard = shard, peer = peer.peerId
self.removePeer(shard, peer.peerId)
return false
i += filter_protocol.MaxContentTopicsPerRequest
except CatchableError as exc:
debug "sendChunkedFilterRpc: failed",
op = kind, shard = shard, peer = peer.peerId, err = exc.msg
self.removePeer(shard, peer.peerId)
return false
return true
proc syncFilterDeltas(
self: SubscriptionManager,
peer: RemotePeerInfo,
shard: PubsubTopic,
added: seq[ContentTopic],
removed: seq[ContentTopic],
) {.async.} =
## Push content topic changes (adds/removes) to an already-tracked peer.
if added.len > 0:
if not await self.sendChunkedFilterRpc(peer, shard, added, FilterSubscribe):
return
if removed.len > 0:
discard await self.sendChunkedFilterRpc(peer, shard, removed, FilterUnsubscribe)
proc dialFilterPeer(
self: SubscriptionManager,
peer: RemotePeerInfo,
shard: PubsubTopic,
contentTopics: seq[ContentTopic],
) {.async.} =
## Subscribe a new peer to all content topics on a shard and start tracking it.
self.edgeFilterSubStates.withValue(shard, state):
state.pendingPeers.incl(peer.peerId)
try:
if not await self.sendChunkedFilterRpc(peer, shard, contentTopics, FilterSubscribe):
return
self.edgeFilterSubStates.withValue(shard, state):
if state.peers.anyIt(it.peerId == peer.peerId):
trace "dialFilterPeer: peer already tracked, skipping duplicate",
shard = shard, peer = peer.peerId
return
state.peers.add(peer)
self.updateShardHealth(shard, state[])
trace "dialFilterPeer: successfully subscribed to all chunks",
shard = shard, peer = peer.peerId, totalPeers = state.peers.len
do:
trace "dialFilterPeer: shard removed while subscribing, discarding result",
shard = shard, peer = peer.peerId
finally:
self.edgeFilterSubStates.withValue(shard, state):
state.pendingPeers.excl(peer.peerId)
proc edgeFilterConnectionLoop(self: SubscriptionManager) {.async.} =
## Periodically pings all tracked filter service peers to verify they are
feat: active filter subscription management for edge nodes (#3773) feat: active filter subscription management for edge nodes ## Subscription Manager * edgeFilterSubLoop reconciles desired vs actual filter subscriptions * edgeFilterHealthLoop pings filter peers, evicts stale ones * EdgeFilterSubState per-shard tracking of confirmed peers and health * best-effort unsubscribe on peer removal * RequestEdgeShardHealth and RequestEdgeFilterPeerCount broker providers ## WakuNode * Remove old edge health loop (loopEdgeHealth, edgeHealthEvent, calculateEdgeTopicHealth) * Register MessageSeenEvent push handler on filter client during start * startDeliveryService now returns `Result[void, string]` and propagates errors ## Health Monitor * getFilterClientHealth queries RequestEdgeFilterPeerCount via broker * Shard/content health providers fall back to RequestEdgeShardHealth when relay inactive * Listen to EventShardTopicHealthChange for health recalculation * Add missing return p.notReady() on failed edge filter peer count request * HealthyThreshold constant moved to `connection_status.nim` ## Broker types * RequestEdgeShardHealth, RequestEdgeFilterPeerCount request types * EventShardTopicHealthChange event type ## Filter Client * Add timeout parameter to ping proc ## Tests * Health monitor event tests with per-node lockNewGlobalBrokerContext * Edge (light client) health update test * Edge health driven by confirmed filter subscriptions test * API subscription tests: sub/receive, failover, peer replacement Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Co-authored by Zoltan Nagy
2026-03-30 08:30:34 -03:00
## still alive at the application layer. Peers that fail the ping are removed.
while true:
await sleepAsync(EdgeFilterLoopInterval)
if self.node.wakuFilterClient.isNil():
warn "filter client is nil within edge filter connection loop"
feat: active filter subscription management for edge nodes (#3773) feat: active filter subscription management for edge nodes ## Subscription Manager * edgeFilterSubLoop reconciles desired vs actual filter subscriptions * edgeFilterHealthLoop pings filter peers, evicts stale ones * EdgeFilterSubState per-shard tracking of confirmed peers and health * best-effort unsubscribe on peer removal * RequestEdgeShardHealth and RequestEdgeFilterPeerCount broker providers ## WakuNode * Remove old edge health loop (loopEdgeHealth, edgeHealthEvent, calculateEdgeTopicHealth) * Register MessageSeenEvent push handler on filter client during start * startDeliveryService now returns `Result[void, string]` and propagates errors ## Health Monitor * getFilterClientHealth queries RequestEdgeFilterPeerCount via broker * Shard/content health providers fall back to RequestEdgeShardHealth when relay inactive * Listen to EventShardTopicHealthChange for health recalculation * Add missing return p.notReady() on failed edge filter peer count request * HealthyThreshold constant moved to `connection_status.nim` ## Broker types * RequestEdgeShardHealth, RequestEdgeFilterPeerCount request types * EventShardTopicHealthChange event type ## Filter Client * Add timeout parameter to ping proc ## Tests * Health monitor event tests with per-node lockNewGlobalBrokerContext * Edge (light client) health update test * Edge health driven by confirmed filter subscriptions test * API subscription tests: sub/receive, failover, peer replacement Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Co-authored by Zoltan Nagy
2026-03-30 08:30:34 -03:00
continue
var connected = initTable[PeerId, RemotePeerInfo]()
for state in self.edgeFilterSubStates.values:
for peer in state.peers:
if self.node.peerManager.switch.peerStore.isConnected(peer.peerId):
connected[peer.peerId] = peer
var alive = initHashSet[PeerId]()
if connected.len > 0:
var pingTasks: seq[(PeerId, Future[FilterSubscribeResult])]
feat: active filter subscription management for edge nodes (#3773) feat: active filter subscription management for edge nodes ## Subscription Manager * edgeFilterSubLoop reconciles desired vs actual filter subscriptions * edgeFilterHealthLoop pings filter peers, evicts stale ones * EdgeFilterSubState per-shard tracking of confirmed peers and health * best-effort unsubscribe on peer removal * RequestEdgeShardHealth and RequestEdgeFilterPeerCount broker providers ## WakuNode * Remove old edge health loop (loopEdgeHealth, edgeHealthEvent, calculateEdgeTopicHealth) * Register MessageSeenEvent push handler on filter client during start * startDeliveryService now returns `Result[void, string]` and propagates errors ## Health Monitor * getFilterClientHealth queries RequestEdgeFilterPeerCount via broker * Shard/content health providers fall back to RequestEdgeShardHealth when relay inactive * Listen to EventShardTopicHealthChange for health recalculation * Add missing return p.notReady() on failed edge filter peer count request * HealthyThreshold constant moved to `connection_status.nim` ## Broker types * RequestEdgeShardHealth, RequestEdgeFilterPeerCount request types * EventShardTopicHealthChange event type ## Filter Client * Add timeout parameter to ping proc ## Tests * Health monitor event tests with per-node lockNewGlobalBrokerContext * Edge (light client) health update test * Edge health driven by confirmed filter subscriptions test * API subscription tests: sub/receive, failover, peer replacement Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Co-authored by Zoltan Nagy
2026-03-30 08:30:34 -03:00
for peer in connected.values:
pingTasks.add(
(peer.peerId, self.node.wakuFilterClient.ping(peer, EdgeFilterPingTimeout))
)
await allFutures(pingTasks.mapIt(it[1]))
for (peerId, task) in pingTasks:
if task.read().isOk():
alive.incl(peerId)
var changed = false
for shard, state in self.edgeFilterSubStates.mpairs:
let oldLen = state.peers.len
state.peers.keepItIf(it.peerId notin connected or alive.contains(it.peerId))
if state.peers.len < oldLen:
changed = true
self.updateShardHealth(shard, state)
trace "Edge Filter health degraded by Ping failure",
shard = shard, new = state.currentHealth
if changed:
self.edgeFilterWakeup.fire()
proc selectFilterCandidates(
self: SubscriptionManager, shard: PubsubTopic, exclude: HashSet[PeerId], needed: int
): seq[RemotePeerInfo] =
## Select filter service peer candidates for a shard.
# Start with every filter server peer that can serve the shard
var allCandidates = self.node.peerManager.selectPeers(
filter_common.WakuFilterSubscribeCodec, some(shard)
)
# Remove all already used in this shard or being dialed for it
allCandidates.keepItIf(it.peerId notin exclude)
# Collect peer IDs already tracked on other shards
var trackedOnOther = initHashSet[PeerId]()
for otherShard, otherState in self.edgeFilterSubStates.pairs:
if otherShard != shard:
for peer in otherState.peers:
trackedOnOther.incl(peer.peerId)
# Prefer peers we already have a connection to first, preserving shuffle
var candidates =
allCandidates.filterIt(it.peerId in trackedOnOther) &
allCandidates.filterIt(it.peerId notin trackedOnOther)
# We need to return 'needed' peers only
if candidates.len > needed:
candidates.setLen(needed)
return candidates
proc edgeFilterSubLoop(self: SubscriptionManager) {.async.} =
feat: active filter subscription management for edge nodes (#3773) feat: active filter subscription management for edge nodes ## Subscription Manager * edgeFilterSubLoop reconciles desired vs actual filter subscriptions * edgeFilterHealthLoop pings filter peers, evicts stale ones * EdgeFilterSubState per-shard tracking of confirmed peers and health * best-effort unsubscribe on peer removal * RequestEdgeShardHealth and RequestEdgeFilterPeerCount broker providers ## WakuNode * Remove old edge health loop (loopEdgeHealth, edgeHealthEvent, calculateEdgeTopicHealth) * Register MessageSeenEvent push handler on filter client during start * startDeliveryService now returns `Result[void, string]` and propagates errors ## Health Monitor * getFilterClientHealth queries RequestEdgeFilterPeerCount via broker * Shard/content health providers fall back to RequestEdgeShardHealth when relay inactive * Listen to EventShardTopicHealthChange for health recalculation * Add missing return p.notReady() on failed edge filter peer count request * HealthyThreshold constant moved to `connection_status.nim` ## Broker types * RequestEdgeShardHealth, RequestEdgeFilterPeerCount request types * EventShardTopicHealthChange event type ## Filter Client * Add timeout parameter to ping proc ## Tests * Health monitor event tests with per-node lockNewGlobalBrokerContext * Edge (light client) health update test * Edge health driven by confirmed filter subscriptions test * API subscription tests: sub/receive, failover, peer replacement Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Co-authored by Zoltan Nagy
2026-03-30 08:30:34 -03:00
## Reconciles filter subscriptions with the desired state from SubscriptionManager.
var lastSynced = initTable[PubsubTopic, HashSet[ContentTopic]]()
while true:
await self.edgeFilterWakeup.wait()
await sleepAsync(EdgeFilterSubLoopDebounce)
self.edgeFilterWakeup.clear()
trace "edgeFilterSubLoop: woke up"
if isNil(self.node.wakuFilterClient):
trace "edgeFilterSubLoop: wakuFilterClient is nil, skipping"
continue
var newSynced = initTable[PubsubTopic, HashSet[ContentTopic]]()
var allShards: HashSet[PubsubTopic]
for shard, sub in self.shards.pairs:
if sub.contentTopics.len > 0:
newSynced[shard] = sub.contentTopics
allShards.incl(shard)
for shard in lastSynced.keys:
allShards.incl(shard)
feat: active filter subscription management for edge nodes (#3773) feat: active filter subscription management for edge nodes ## Subscription Manager * edgeFilterSubLoop reconciles desired vs actual filter subscriptions * edgeFilterHealthLoop pings filter peers, evicts stale ones * EdgeFilterSubState per-shard tracking of confirmed peers and health * best-effort unsubscribe on peer removal * RequestEdgeShardHealth and RequestEdgeFilterPeerCount broker providers ## WakuNode * Remove old edge health loop (loopEdgeHealth, edgeHealthEvent, calculateEdgeTopicHealth) * Register MessageSeenEvent push handler on filter client during start * startDeliveryService now returns `Result[void, string]` and propagates errors ## Health Monitor * getFilterClientHealth queries RequestEdgeFilterPeerCount via broker * Shard/content health providers fall back to RequestEdgeShardHealth when relay inactive * Listen to EventShardTopicHealthChange for health recalculation * Add missing return p.notReady() on failed edge filter peer count request * HealthyThreshold constant moved to `connection_status.nim` ## Broker types * RequestEdgeShardHealth, RequestEdgeFilterPeerCount request types * EventShardTopicHealthChange event type ## Filter Client * Add timeout parameter to ping proc ## Tests * Health monitor event tests with per-node lockNewGlobalBrokerContext * Edge (light client) health update test * Edge health driven by confirmed filter subscriptions test * API subscription tests: sub/receive, failover, peer replacement Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Co-authored by Zoltan Nagy
2026-03-30 08:30:34 -03:00
trace "edgeFilterSubLoop: desired state", numShards = newSynced.len
feat: active filter subscription management for edge nodes (#3773) feat: active filter subscription management for edge nodes ## Subscription Manager * edgeFilterSubLoop reconciles desired vs actual filter subscriptions * edgeFilterHealthLoop pings filter peers, evicts stale ones * EdgeFilterSubState per-shard tracking of confirmed peers and health * best-effort unsubscribe on peer removal * RequestEdgeShardHealth and RequestEdgeFilterPeerCount broker providers ## WakuNode * Remove old edge health loop (loopEdgeHealth, edgeHealthEvent, calculateEdgeTopicHealth) * Register MessageSeenEvent push handler on filter client during start * startDeliveryService now returns `Result[void, string]` and propagates errors ## Health Monitor * getFilterClientHealth queries RequestEdgeFilterPeerCount via broker * Shard/content health providers fall back to RequestEdgeShardHealth when relay inactive * Listen to EventShardTopicHealthChange for health recalculation * Add missing return p.notReady() on failed edge filter peer count request * HealthyThreshold constant moved to `connection_status.nim` ## Broker types * RequestEdgeShardHealth, RequestEdgeFilterPeerCount request types * EventShardTopicHealthChange event type ## Filter Client * Add timeout parameter to ping proc ## Tests * Health monitor event tests with per-node lockNewGlobalBrokerContext * Edge (light client) health update test * Edge health driven by confirmed filter subscriptions test * API subscription tests: sub/receive, failover, peer replacement Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Co-authored by Zoltan Nagy
2026-03-30 08:30:34 -03:00
# Step 1: read state across all shards at once and
# create a list of peer dial tasks and shard tracking to delete.
var dialTasks: seq[EdgeDialTask]
var shardsToDelete: seq[PubsubTopic]
feat: active filter subscription management for edge nodes (#3773) feat: active filter subscription management for edge nodes ## Subscription Manager * edgeFilterSubLoop reconciles desired vs actual filter subscriptions * edgeFilterHealthLoop pings filter peers, evicts stale ones * EdgeFilterSubState per-shard tracking of confirmed peers and health * best-effort unsubscribe on peer removal * RequestEdgeShardHealth and RequestEdgeFilterPeerCount broker providers ## WakuNode * Remove old edge health loop (loopEdgeHealth, edgeHealthEvent, calculateEdgeTopicHealth) * Register MessageSeenEvent push handler on filter client during start * startDeliveryService now returns `Result[void, string]` and propagates errors ## Health Monitor * getFilterClientHealth queries RequestEdgeFilterPeerCount via broker * Shard/content health providers fall back to RequestEdgeShardHealth when relay inactive * Listen to EventShardTopicHealthChange for health recalculation * Add missing return p.notReady() on failed edge filter peer count request * HealthyThreshold constant moved to `connection_status.nim` ## Broker types * RequestEdgeShardHealth, RequestEdgeFilterPeerCount request types * EventShardTopicHealthChange event type ## Filter Client * Add timeout parameter to ping proc ## Tests * Health monitor event tests with per-node lockNewGlobalBrokerContext * Edge (light client) health update test * Edge health driven by confirmed filter subscriptions test * API subscription tests: sub/receive, failover, peer replacement Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Co-authored by Zoltan Nagy
2026-03-30 08:30:34 -03:00
for shard in allShards:
# Compute added/removed deltas via direct iteration; no HashSet copies.
var addedTopics: seq[ContentTopic]
var removedTopics: seq[ContentTopic]
newSynced.withValue(shard, curr):
lastSynced.withValue(shard, prev):
for t in curr[]:
if t notin prev[]:
addedTopics.add(t)
for t in prev[]:
if t notin curr[]:
removedTopics.add(t)
do:
for t in curr[]:
addedTopics.add(t)
do:
lastSynced.withValue(shard, prev):
for t in prev[]:
removedTopics.add(t)
discard self.edgeFilterSubStates.mgetOrPut(
shard, EdgeFilterSubState(currentHealth: TopicHealth.UNHEALTHY)
)
feat: active filter subscription management for edge nodes (#3773) feat: active filter subscription management for edge nodes ## Subscription Manager * edgeFilterSubLoop reconciles desired vs actual filter subscriptions * edgeFilterHealthLoop pings filter peers, evicts stale ones * EdgeFilterSubState per-shard tracking of confirmed peers and health * best-effort unsubscribe on peer removal * RequestEdgeShardHealth and RequestEdgeFilterPeerCount broker providers ## WakuNode * Remove old edge health loop (loopEdgeHealth, edgeHealthEvent, calculateEdgeTopicHealth) * Register MessageSeenEvent push handler on filter client during start * startDeliveryService now returns `Result[void, string]` and propagates errors ## Health Monitor * getFilterClientHealth queries RequestEdgeFilterPeerCount via broker * Shard/content health providers fall back to RequestEdgeShardHealth when relay inactive * Listen to EventShardTopicHealthChange for health recalculation * Add missing return p.notReady() on failed edge filter peer count request * HealthyThreshold constant moved to `connection_status.nim` ## Broker types * RequestEdgeShardHealth, RequestEdgeFilterPeerCount request types * EventShardTopicHealthChange event type ## Filter Client * Add timeout parameter to ping proc ## Tests * Health monitor event tests with per-node lockNewGlobalBrokerContext * Edge (light client) health update test * Edge health driven by confirmed filter subscriptions test * API subscription tests: sub/receive, failover, peer replacement Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Co-authored by Zoltan Nagy
2026-03-30 08:30:34 -03:00
self.edgeFilterSubStates.withValue(shard, state):
state.peers.keepItIf(
self.node.peerManager.switch.peerStore.isConnected(it.peerId)
)
state.pending.keepItIf(not it.finished)
if addedTopics.len > 0 or removedTopics.len > 0:
for peer in state.peers:
asyncSpawn self.syncFilterDeltas(peer, shard, addedTopics, removedTopics)
if shard notin newSynced:
shardsToDelete.add(shard)
feat: active filter subscription management for edge nodes (#3773) feat: active filter subscription management for edge nodes ## Subscription Manager * edgeFilterSubLoop reconciles desired vs actual filter subscriptions * edgeFilterHealthLoop pings filter peers, evicts stale ones * EdgeFilterSubState per-shard tracking of confirmed peers and health * best-effort unsubscribe on peer removal * RequestEdgeShardHealth and RequestEdgeFilterPeerCount broker providers ## WakuNode * Remove old edge health loop (loopEdgeHealth, edgeHealthEvent, calculateEdgeTopicHealth) * Register MessageSeenEvent push handler on filter client during start * startDeliveryService now returns `Result[void, string]` and propagates errors ## Health Monitor * getFilterClientHealth queries RequestEdgeFilterPeerCount via broker * Shard/content health providers fall back to RequestEdgeShardHealth when relay inactive * Listen to EventShardTopicHealthChange for health recalculation * Add missing return p.notReady() on failed edge filter peer count request * HealthyThreshold constant moved to `connection_status.nim` ## Broker types * RequestEdgeShardHealth, RequestEdgeFilterPeerCount request types * EventShardTopicHealthChange event type ## Filter Client * Add timeout parameter to ping proc ## Tests * Health monitor event tests with per-node lockNewGlobalBrokerContext * Edge (light client) health update test * Edge health driven by confirmed filter subscriptions test * API subscription tests: sub/receive, failover, peer replacement Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Co-authored by Zoltan Nagy
2026-03-30 08:30:34 -03:00
else:
self.updateShardHealth(shard, state[])
let needed = max(0, HealthyThreshold - state.peers.len - state.pending.len)
if needed > 0:
var tracked: HashSet[PeerId]
for p in state.peers:
tracked.incl(p.peerId)
for p in state.pendingPeers:
tracked.incl(p)
let candidates = self.selectFilterCandidates(shard, tracked, needed)
feat: active filter subscription management for edge nodes (#3773) feat: active filter subscription management for edge nodes ## Subscription Manager * edgeFilterSubLoop reconciles desired vs actual filter subscriptions * edgeFilterHealthLoop pings filter peers, evicts stale ones * EdgeFilterSubState per-shard tracking of confirmed peers and health * best-effort unsubscribe on peer removal * RequestEdgeShardHealth and RequestEdgeFilterPeerCount broker providers ## WakuNode * Remove old edge health loop (loopEdgeHealth, edgeHealthEvent, calculateEdgeTopicHealth) * Register MessageSeenEvent push handler on filter client during start * startDeliveryService now returns `Result[void, string]` and propagates errors ## Health Monitor * getFilterClientHealth queries RequestEdgeFilterPeerCount via broker * Shard/content health providers fall back to RequestEdgeShardHealth when relay inactive * Listen to EventShardTopicHealthChange for health recalculation * Add missing return p.notReady() on failed edge filter peer count request * HealthyThreshold constant moved to `connection_status.nim` ## Broker types * RequestEdgeShardHealth, RequestEdgeFilterPeerCount request types * EventShardTopicHealthChange event type ## Filter Client * Add timeout parameter to ping proc ## Tests * Health monitor event tests with per-node lockNewGlobalBrokerContext * Edge (light client) health update test * Edge health driven by confirmed filter subscriptions test * API subscription tests: sub/receive, failover, peer replacement Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Co-authored by Zoltan Nagy
2026-03-30 08:30:34 -03:00
let toDial = min(needed, candidates.len)
trace "edgeFilterSubLoop: shard reconciliation",
shard = shard,
num_peers = state.peers.len,
num_pending = state.pending.len,
num_needed = needed,
num_available = candidates.len,
toDial = toDial
var dialTopics: seq[ContentTopic]
newSynced.withValue(shard, curr):
dialTopics = toSeq(curr[])
feat: active filter subscription management for edge nodes (#3773) feat: active filter subscription management for edge nodes ## Subscription Manager * edgeFilterSubLoop reconciles desired vs actual filter subscriptions * edgeFilterHealthLoop pings filter peers, evicts stale ones * EdgeFilterSubState per-shard tracking of confirmed peers and health * best-effort unsubscribe on peer removal * RequestEdgeShardHealth and RequestEdgeFilterPeerCount broker providers ## WakuNode * Remove old edge health loop (loopEdgeHealth, edgeHealthEvent, calculateEdgeTopicHealth) * Register MessageSeenEvent push handler on filter client during start * startDeliveryService now returns `Result[void, string]` and propagates errors ## Health Monitor * getFilterClientHealth queries RequestEdgeFilterPeerCount via broker * Shard/content health providers fall back to RequestEdgeShardHealth when relay inactive * Listen to EventShardTopicHealthChange for health recalculation * Add missing return p.notReady() on failed edge filter peer count request * HealthyThreshold constant moved to `connection_status.nim` ## Broker types * RequestEdgeShardHealth, RequestEdgeFilterPeerCount request types * EventShardTopicHealthChange event type ## Filter Client * Add timeout parameter to ping proc ## Tests * Health monitor event tests with per-node lockNewGlobalBrokerContext * Edge (light client) health update test * Edge health driven by confirmed filter subscriptions test * API subscription tests: sub/receive, failover, peer replacement Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Co-authored by Zoltan Nagy
2026-03-30 08:30:34 -03:00
for i in 0 ..< toDial:
dialTasks.add(
EdgeDialTask(peer: candidates[i], shard: shard, topics: dialTopics)
)
# Step 2: execute deferred shard tracking deletion and dial tasks.
for shard in shardsToDelete:
self.edgeFilterSubStates.withValue(shard, state):
for fut in state.pending:
if not fut.finished:
await fut.cancelAndWait()
self.edgeFilterSubStates.del(shard)
for task in dialTasks:
let fut = self.dialFilterPeer(task.peer, task.shard, task.topics)
self.edgeFilterSubStates.withValue(task.shard, state):
state.pending.add(fut)
feat: active filter subscription management for edge nodes (#3773) feat: active filter subscription management for edge nodes ## Subscription Manager * edgeFilterSubLoop reconciles desired vs actual filter subscriptions * edgeFilterHealthLoop pings filter peers, evicts stale ones * EdgeFilterSubState per-shard tracking of confirmed peers and health * best-effort unsubscribe on peer removal * RequestEdgeShardHealth and RequestEdgeFilterPeerCount broker providers ## WakuNode * Remove old edge health loop (loopEdgeHealth, edgeHealthEvent, calculateEdgeTopicHealth) * Register MessageSeenEvent push handler on filter client during start * startDeliveryService now returns `Result[void, string]` and propagates errors ## Health Monitor * getFilterClientHealth queries RequestEdgeFilterPeerCount via broker * Shard/content health providers fall back to RequestEdgeShardHealth when relay inactive * Listen to EventShardTopicHealthChange for health recalculation * Add missing return p.notReady() on failed edge filter peer count request * HealthyThreshold constant moved to `connection_status.nim` ## Broker types * RequestEdgeShardHealth, RequestEdgeFilterPeerCount request types * EventShardTopicHealthChange event type ## Filter Client * Add timeout parameter to ping proc ## Tests * Health monitor event tests with per-node lockNewGlobalBrokerContext * Edge (light client) health update test * Edge health driven by confirmed filter subscriptions test * API subscription tests: sub/receive, failover, peer replacement Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Co-authored by Zoltan Nagy
2026-03-30 08:30:34 -03:00
lastSynced = newSynced
feat: active filter subscription management for edge nodes (#3773) feat: active filter subscription management for edge nodes ## Subscription Manager * edgeFilterSubLoop reconciles desired vs actual filter subscriptions * edgeFilterHealthLoop pings filter peers, evicts stale ones * EdgeFilterSubState per-shard tracking of confirmed peers and health * best-effort unsubscribe on peer removal * RequestEdgeShardHealth and RequestEdgeFilterPeerCount broker providers ## WakuNode * Remove old edge health loop (loopEdgeHealth, edgeHealthEvent, calculateEdgeTopicHealth) * Register MessageSeenEvent push handler on filter client during start * startDeliveryService now returns `Result[void, string]` and propagates errors ## Health Monitor * getFilterClientHealth queries RequestEdgeFilterPeerCount via broker * Shard/content health providers fall back to RequestEdgeShardHealth when relay inactive * Listen to EventShardTopicHealthChange for health recalculation * Add missing return p.notReady() on failed edge filter peer count request * HealthyThreshold constant moved to `connection_status.nim` ## Broker types * RequestEdgeShardHealth, RequestEdgeFilterPeerCount request types * EventShardTopicHealthChange event type ## Filter Client * Add timeout parameter to ping proc ## Tests * Health monitor event tests with per-node lockNewGlobalBrokerContext * Edge (light client) health update test * Edge health driven by confirmed filter subscriptions test * API subscription tests: sub/receive, failover, peer replacement Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Co-authored by Zoltan Nagy
2026-03-30 08:30:34 -03:00
proc startEdgeFilterLoops(self: SubscriptionManager): Result[void, string] =
## Start the edge filter orchestration loops.
## Caller must ensure this is only called in edge mode (relay nil, filter client present).
self.peerEventListener = WakuPeerEvent.listen(
self.node.brokerCtx,
proc(evt: WakuPeerEvent) {.async: (raises: []), gcsafe.} =
if evt.kind == WakuPeerEventKind.EventDisconnected or
evt.kind == WakuPeerEventKind.EventMetadataUpdated:
self.edgeFilterWakeup.fire()
,
).valueOr:
return err("Failed to listen to peer events for edge filter: " & error)
self.edgeFilterSubLoopFut = self.edgeFilterSubLoop()
self.edgeFilterConnectionLoopFut = self.edgeFilterConnectionLoop()
feat: active filter subscription management for edge nodes (#3773) feat: active filter subscription management for edge nodes ## Subscription Manager * edgeFilterSubLoop reconciles desired vs actual filter subscriptions * edgeFilterHealthLoop pings filter peers, evicts stale ones * EdgeFilterSubState per-shard tracking of confirmed peers and health * best-effort unsubscribe on peer removal * RequestEdgeShardHealth and RequestEdgeFilterPeerCount broker providers ## WakuNode * Remove old edge health loop (loopEdgeHealth, edgeHealthEvent, calculateEdgeTopicHealth) * Register MessageSeenEvent push handler on filter client during start * startDeliveryService now returns `Result[void, string]` and propagates errors ## Health Monitor * getFilterClientHealth queries RequestEdgeFilterPeerCount via broker * Shard/content health providers fall back to RequestEdgeShardHealth when relay inactive * Listen to EventShardTopicHealthChange for health recalculation * Add missing return p.notReady() on failed edge filter peer count request * HealthyThreshold constant moved to `connection_status.nim` ## Broker types * RequestEdgeShardHealth, RequestEdgeFilterPeerCount request types * EventShardTopicHealthChange event type ## Filter Client * Add timeout parameter to ping proc ## Tests * Health monitor event tests with per-node lockNewGlobalBrokerContext * Edge (light client) health update test * Edge health driven by confirmed filter subscriptions test * API subscription tests: sub/receive, failover, peer replacement Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Co-authored by Zoltan Nagy
2026-03-30 08:30:34 -03:00
return ok()
proc stopEdgeFilterLoops(self: SubscriptionManager) {.async: (raises: []).} =
## Stop the edge filter orchestration loops and clean up pending futures.
if not isNil(self.edgeFilterSubLoopFut):
await self.edgeFilterSubLoopFut.cancelAndWait()
self.edgeFilterSubLoopFut = nil
if not isNil(self.edgeFilterConnectionLoopFut):
await self.edgeFilterConnectionLoopFut.cancelAndWait()
self.edgeFilterConnectionLoopFut = nil
feat: active filter subscription management for edge nodes (#3773) feat: active filter subscription management for edge nodes ## Subscription Manager * edgeFilterSubLoop reconciles desired vs actual filter subscriptions * edgeFilterHealthLoop pings filter peers, evicts stale ones * EdgeFilterSubState per-shard tracking of confirmed peers and health * best-effort unsubscribe on peer removal * RequestEdgeShardHealth and RequestEdgeFilterPeerCount broker providers ## WakuNode * Remove old edge health loop (loopEdgeHealth, edgeHealthEvent, calculateEdgeTopicHealth) * Register MessageSeenEvent push handler on filter client during start * startDeliveryService now returns `Result[void, string]` and propagates errors ## Health Monitor * getFilterClientHealth queries RequestEdgeFilterPeerCount via broker * Shard/content health providers fall back to RequestEdgeShardHealth when relay inactive * Listen to EventShardTopicHealthChange for health recalculation * Add missing return p.notReady() on failed edge filter peer count request * HealthyThreshold constant moved to `connection_status.nim` ## Broker types * RequestEdgeShardHealth, RequestEdgeFilterPeerCount request types * EventShardTopicHealthChange event type ## Filter Client * Add timeout parameter to ping proc ## Tests * Health monitor event tests with per-node lockNewGlobalBrokerContext * Edge (light client) health update test * Edge health driven by confirmed filter subscriptions test * API subscription tests: sub/receive, failover, peer replacement Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Co-authored by Zoltan Nagy
2026-03-30 08:30:34 -03:00
for shard, state in self.edgeFilterSubStates:
for fut in state.pending:
if not fut.finished:
await fut.cancelAndWait()
feat: persistency (#3880) * persistency: per-job SQLite-backed storage layer (singleton, brokered) Adds a backend-neutral CRUD library at waku/persistency/, plus the nim-brokers dependency swap that enables it. Architecture (ports-and-adapters): * Persistency: process-wide singleton, one root directory. * Job: one tenant, one DB file, one worker thread, one BrokerContext. * Backend: SQLite via waku/common/databases/db_sqlite. Uniform schema kv(category BLOB, key BLOB, payload BLOB) PRIMARY KEY (category, key) WITHOUT ROWID, WAL mode. * Writes are fire-and-forget via EventBroker(mt) PersistEvent. * Reads are async via five RequestBroker(mt) shapes (KvGet, KvExists, KvScan, KvCount, KvDelete). Reads return Result[T, PersistencyError]. * One storage thread per job; tenants isolated by BrokerContext. Public surface (waku/persistency/persistency.nim): Persistency.instance(rootDir) / Persistency.instance() / Persistency.reset() p.openJob(id) / p.closeJob(id) / p.dropJob(id) / p.close() p.job(id) / p[id] / p.hasJob(id) Writes (Job form & string-id form, fire-and-forget): persist / persistPut / persistDelete / persistEncoded Reads (Job form & string-id form, async Result): get / exists / scan / scanPrefix / count / deleteAcked Key & payload encoding (keys.nim, payload.nim): * encodePart family + variadic key(...) / payload(...) macros + single-value toKey / toPayload. * Primitives: string and openArray[byte] are 2-byte BE length + bytes; int{8..64} are sign-flipped 8-byte BE; uint{16..64} are 8-byte BE; bool/byte/char are 1 byte; enums are int64(ord(v)). * Generic encodePart[T: tuple | object] recurses through fields() so any composite Nim type is encodable without ceremony. * Stable across Nim/C compiler upgrades: no sizeof, no memcpy, no cast on pointers, no host-endianness dependency. * `rawKey(bytes)` + `persistPut(..., openArray[byte])` let callers bypass the built-in encoder with their own format (CBOR, protobuf...). Lifecycle: * Persistency.new is private; Persistency.instance is the only public constructor. Same rootDir is idempotent; conflicting rootDir is peInvalidArgument. Persistency.reset for test/restart paths. * openJob opens-or-creates the per-job SQLite file; an existing file is reused with its data preserved. * Teardown integration: Persistency.instance registers a Teardown MultiRequestBroker provider that closes all jobs and clears the singleton slot when Waku.stop() issues Teardown.request. Internal layering: types.nim pure value types (Key, KeyRange, KvRow, TxOp, PersistencyError) keys.nim encodePart primitives + key(...) macro payload.nim toPayload + payload(...) macro schema.nim CREATE TABLE + connection pragmas + user_version backend_sqlite.nim KvBackend, applyOps (single source of write SQL), getOne/existsOne/deleteOne, scanRange (asc/desc, half-open ranges, open-ended stop), countRange backend_comm.nim EventBroker(mt) PersistEvent + 5 RequestBroker(mt) declarations; encodeErr/decodeErr boundary helpers backend_thread.nim startStorageThread / stopStorageThread (shared allocShared0 arg, cstring dbPath, atomic ready/shutdown flags); per-thread provider registration persistency.nim Persistency + Job types, singleton state, public facade ../requests/lifecycle_requests.nim Teardown MultiRequestBroker Tests (69 cases, all passing): test_keys.nim sort-order invariants (length-prefix strings, sign-flipped ints, composite tuples, prefix range) test_backend.nim round-trip / replace / delete-return-value / batched atomicity / asc-desc-half-open-open- ended scans / category isolation / batch txDelete test_lifecycle.nim open-or-create rootDir / non-dir collision / reopen across sessions / idempotent openJob / two-tenant parallel isolation / closeJob joins worker / dropJob removes file / acked delete test_facade.nim put-then-get / atomic batch / scanPrefix asc/desc / deleteAcked hit-miss / fire-and-forget delete / two-tenant facade isolation test_encoding.nim tuple/named-tuple/object keys, embedded Key, enum encoding, field-major composite sort, payload struct encoding, end-to-end struct round-trip through SQLite test_string_lookup.nim peJobNotFound semantics / hasJob / subscript / persistPut+get via id / reads short-circuit / writes drop+warn / persistEncoded via id / scan parity Job-ref vs id test_singleton.nim idempotent same-rootDir / different-rootDir rejection / no-arg instance lifecycle / reset retargets / reset idempotence / Teardown.request end-to-end Prerequisite delivered in the same series: replace the in-tree broker implementation with the external nim-brokers package; update all broker call-sites (waku_filter_v2, waku_relay, waku_rln_relay, delivery_service, peer_manager, requests/*, factory/*, api tests, etc.) to the new package API; chat2 made to compile again. Note: SDS adapter (Phase 5 of the design) is deferred -- nim-sds is still developed side-by-side and the persistency layer is intentionally SDS-agnostic. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * persistency: pin nim-brokers by URL+commit (workaround for stale registry) The bare `brokers >= 2.0.1` form cannot resolve on machines where the local nimble SAT solver enumerates only the registry-recorded 0.1.0 for brokers. The nim-lang/packages entry for `brokers` carries no per-tag metadata (only the URL), so until that registry entry is refreshed the SAT solver clamps the available-versions list to 0.1.0 and rejects the >= 2.0.1 constraint -- even though pkgs2 and pkgcache both have v2.0.1 cloned locally. Pinning by URL+commit bypasses the registry path entirely. Inline comment in waku.nimble documents the situation and the path back to the bare form once nim-lang/packages is updated. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * persistency: nph format pass Run `nph` on all 57 Nim files touched by this PR. Pure formatting: 17 files re-styled, no semantic change. Suite still 69/69. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * Fix build, add local-storage-path config, lazy init of Persistency from Waku start * fix: fix nix deps * fixes for nix build, regenerate deps * reverting accidental dependency changes * Fixing deps * Apply suggestions from code review Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> * persistency tests: migrate to suite / asyncTest / await Match the in-tree test convention (procSuite -> suite, sync test + waitFor -> asyncTest + await): - procSuite "X": -> suite "X": - For tests doing async work: test -> asyncTest, waitFor -> await. - Poll helpers (proc waitFor(t: Job, ...) in test_lifecycle.nim, proc waitUntilExists(...) in test_facade.nim and test_string_lookup.nim) -> Future[bool] {.async.}, internal `waitFor X` -> `await X`, internal `sleep(N)` -> `await sleepAsync(chronos.milliseconds(N))`. - Renamed test_lifecycle.nim's helper proc from `waitFor(t: Job, ...)` -> `pollExists(t: Job, ...)`; the previous name shadowed chronos.waitFor in the chronos macro expansion. - `chronos.milliseconds(N)` explicitly qualified because `std/times` also exports `milliseconds` (returning TimeInterval, not Duration). - `check await x` -> `let okN = await x; check okN` to dodge chronos's "yield in expr not lowered" with await-as-macro-argument. - `(await x).foo()` -> `let awN = await x; ... awN.foo() ...` for the same reason. waku/persistency/persistency.nim: nph also pulled the proc signatures across multiple lines; restored explicit `Future[void] {.async.}` return types after the colon (an intermediate nph pass had elided them). Suite: 71 / 71 OK against the new async write surface. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * use idiomatic valueOr instead of ifs * Reworked persistency shutdown, remove not necessary teardown mechanism * Use const for DefaultStoragePath * format to follow coding guidelines - no use of result and explicit returns - no functional change --------- Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com> Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com>
2026-05-16 00:09:07 +02:00
await WakuPeerEvent.dropListener(self.node.brokerCtx, self.peerEventListener)
feat: active filter subscription management for edge nodes (#3773) feat: active filter subscription management for edge nodes ## Subscription Manager * edgeFilterSubLoop reconciles desired vs actual filter subscriptions * edgeFilterHealthLoop pings filter peers, evicts stale ones * EdgeFilterSubState per-shard tracking of confirmed peers and health * best-effort unsubscribe on peer removal * RequestEdgeShardHealth and RequestEdgeFilterPeerCount broker providers ## WakuNode * Remove old edge health loop (loopEdgeHealth, edgeHealthEvent, calculateEdgeTopicHealth) * Register MessageSeenEvent push handler on filter client during start * startDeliveryService now returns `Result[void, string]` and propagates errors ## Health Monitor * getFilterClientHealth queries RequestEdgeFilterPeerCount via broker * Shard/content health providers fall back to RequestEdgeShardHealth when relay inactive * Listen to EventShardTopicHealthChange for health recalculation * Add missing return p.notReady() on failed edge filter peer count request * HealthyThreshold constant moved to `connection_status.nim` ## Broker types * RequestEdgeShardHealth, RequestEdgeFilterPeerCount request types * EventShardTopicHealthChange event type ## Filter Client * Add timeout parameter to ping proc ## Tests * Health monitor event tests with per-node lockNewGlobalBrokerContext * Edge (light client) health update test * Edge health driven by confirmed filter subscriptions test * API subscription tests: sub/receive, failover, peer replacement Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Co-authored by Zoltan Nagy
2026-03-30 08:30:34 -03:00
proc start*(self: SubscriptionManager): Result[void, string] =
let edgeShardHealthRes = RequestEdgeShardHealth.setProvider(
feat: active filter subscription management for edge nodes (#3773) feat: active filter subscription management for edge nodes ## Subscription Manager * edgeFilterSubLoop reconciles desired vs actual filter subscriptions * edgeFilterHealthLoop pings filter peers, evicts stale ones * EdgeFilterSubState per-shard tracking of confirmed peers and health * best-effort unsubscribe on peer removal * RequestEdgeShardHealth and RequestEdgeFilterPeerCount broker providers ## WakuNode * Remove old edge health loop (loopEdgeHealth, edgeHealthEvent, calculateEdgeTopicHealth) * Register MessageSeenEvent push handler on filter client during start * startDeliveryService now returns `Result[void, string]` and propagates errors ## Health Monitor * getFilterClientHealth queries RequestEdgeFilterPeerCount via broker * Shard/content health providers fall back to RequestEdgeShardHealth when relay inactive * Listen to EventShardTopicHealthChange for health recalculation * Add missing return p.notReady() on failed edge filter peer count request * HealthyThreshold constant moved to `connection_status.nim` ## Broker types * RequestEdgeShardHealth, RequestEdgeFilterPeerCount request types * EventShardTopicHealthChange event type ## Filter Client * Add timeout parameter to ping proc ## Tests * Health monitor event tests with per-node lockNewGlobalBrokerContext * Edge (light client) health update test * Edge health driven by confirmed filter subscriptions test * API subscription tests: sub/receive, failover, peer replacement Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Co-authored by Zoltan Nagy
2026-03-30 08:30:34 -03:00
self.node.brokerCtx,
proc(shard: PubsubTopic): Result[RequestEdgeShardHealth, string] =
self.edgeFilterSubStates.withValue(shard, state):
return ok(RequestEdgeShardHealth(health: state.currentHealth))
return ok(RequestEdgeShardHealth(health: TopicHealth.NOT_SUBSCRIBED)),
)
self.ownsEdgeShardHealthProvider = edgeShardHealthRes.isOk()
if edgeShardHealthRes.isErr():
error "Can't set provider for RequestEdgeShardHealth",
error = edgeShardHealthRes.error
feat: active filter subscription management for edge nodes (#3773) feat: active filter subscription management for edge nodes ## Subscription Manager * edgeFilterSubLoop reconciles desired vs actual filter subscriptions * edgeFilterHealthLoop pings filter peers, evicts stale ones * EdgeFilterSubState per-shard tracking of confirmed peers and health * best-effort unsubscribe on peer removal * RequestEdgeShardHealth and RequestEdgeFilterPeerCount broker providers ## WakuNode * Remove old edge health loop (loopEdgeHealth, edgeHealthEvent, calculateEdgeTopicHealth) * Register MessageSeenEvent push handler on filter client during start * startDeliveryService now returns `Result[void, string]` and propagates errors ## Health Monitor * getFilterClientHealth queries RequestEdgeFilterPeerCount via broker * Shard/content health providers fall back to RequestEdgeShardHealth when relay inactive * Listen to EventShardTopicHealthChange for health recalculation * Add missing return p.notReady() on failed edge filter peer count request * HealthyThreshold constant moved to `connection_status.nim` ## Broker types * RequestEdgeShardHealth, RequestEdgeFilterPeerCount request types * EventShardTopicHealthChange event type ## Filter Client * Add timeout parameter to ping proc ## Tests * Health monitor event tests with per-node lockNewGlobalBrokerContext * Edge (light client) health update test * Edge health driven by confirmed filter subscriptions test * API subscription tests: sub/receive, failover, peer replacement Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Co-authored by Zoltan Nagy
2026-03-30 08:30:34 -03:00
let edgeFilterPeerCountRes = RequestEdgeFilterPeerCount.setProvider(
feat: active filter subscription management for edge nodes (#3773) feat: active filter subscription management for edge nodes ## Subscription Manager * edgeFilterSubLoop reconciles desired vs actual filter subscriptions * edgeFilterHealthLoop pings filter peers, evicts stale ones * EdgeFilterSubState per-shard tracking of confirmed peers and health * best-effort unsubscribe on peer removal * RequestEdgeShardHealth and RequestEdgeFilterPeerCount broker providers ## WakuNode * Remove old edge health loop (loopEdgeHealth, edgeHealthEvent, calculateEdgeTopicHealth) * Register MessageSeenEvent push handler on filter client during start * startDeliveryService now returns `Result[void, string]` and propagates errors ## Health Monitor * getFilterClientHealth queries RequestEdgeFilterPeerCount via broker * Shard/content health providers fall back to RequestEdgeShardHealth when relay inactive * Listen to EventShardTopicHealthChange for health recalculation * Add missing return p.notReady() on failed edge filter peer count request * HealthyThreshold constant moved to `connection_status.nim` ## Broker types * RequestEdgeShardHealth, RequestEdgeFilterPeerCount request types * EventShardTopicHealthChange event type ## Filter Client * Add timeout parameter to ping proc ## Tests * Health monitor event tests with per-node lockNewGlobalBrokerContext * Edge (light client) health update test * Edge health driven by confirmed filter subscriptions test * API subscription tests: sub/receive, failover, peer replacement Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Co-authored by Zoltan Nagy
2026-03-30 08:30:34 -03:00
self.node.brokerCtx,
proc(): Result[RequestEdgeFilterPeerCount, string] =
var minPeers = high(int)
for state in self.edgeFilterSubStates.values:
minPeers = min(minPeers, state.peers.len)
if minPeers == high(int):
minPeers = 0
return ok(RequestEdgeFilterPeerCount(peerCount: minPeers)),
)
self.ownsEdgeFilterPeerCountProvider = edgeFilterPeerCountRes.isOk()
if edgeFilterPeerCountRes.isErr():
error "Can't set provider for RequestEdgeFilterPeerCount",
error = edgeFilterPeerCountRes.error
# Start Edge workers only when we are in Edge mode (relay not mounted)
# AND the filter client is mounted (otherwise the loops have nothing
# to talk to and just spam "filter client is nil" warnings).
if self.node.wakuRelay.isNil() and not self.node.wakuFilterClient.isNil():
feat: active filter subscription management for edge nodes (#3773) feat: active filter subscription management for edge nodes ## Subscription Manager * edgeFilterSubLoop reconciles desired vs actual filter subscriptions * edgeFilterHealthLoop pings filter peers, evicts stale ones * EdgeFilterSubState per-shard tracking of confirmed peers and health * best-effort unsubscribe on peer removal * RequestEdgeShardHealth and RequestEdgeFilterPeerCount broker providers ## WakuNode * Remove old edge health loop (loopEdgeHealth, edgeHealthEvent, calculateEdgeTopicHealth) * Register MessageSeenEvent push handler on filter client during start * startDeliveryService now returns `Result[void, string]` and propagates errors ## Health Monitor * getFilterClientHealth queries RequestEdgeFilterPeerCount via broker * Shard/content health providers fall back to RequestEdgeShardHealth when relay inactive * Listen to EventShardTopicHealthChange for health recalculation * Add missing return p.notReady() on failed edge filter peer count request * HealthyThreshold constant moved to `connection_status.nim` ## Broker types * RequestEdgeShardHealth, RequestEdgeFilterPeerCount request types * EventShardTopicHealthChange event type ## Filter Client * Add timeout parameter to ping proc ## Tests * Health monitor event tests with per-node lockNewGlobalBrokerContext * Edge (light client) health update test * Edge health driven by confirmed filter subscriptions test * API subscription tests: sub/receive, failover, peer replacement Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Co-authored by Zoltan Nagy
2026-03-30 08:30:34 -03:00
return self.startEdgeFilterLoops()
return ok()
proc stop*(self: SubscriptionManager) {.async: (raises: []).} =
# Stop Edge workers if we started them in `start` (Edge mode + filter client).
if self.node.wakuRelay.isNil() and not self.node.wakuFilterClient.isNil():
feat: active filter subscription management for edge nodes (#3773) feat: active filter subscription management for edge nodes ## Subscription Manager * edgeFilterSubLoop reconciles desired vs actual filter subscriptions * edgeFilterHealthLoop pings filter peers, evicts stale ones * EdgeFilterSubState per-shard tracking of confirmed peers and health * best-effort unsubscribe on peer removal * RequestEdgeShardHealth and RequestEdgeFilterPeerCount broker providers ## WakuNode * Remove old edge health loop (loopEdgeHealth, edgeHealthEvent, calculateEdgeTopicHealth) * Register MessageSeenEvent push handler on filter client during start * startDeliveryService now returns `Result[void, string]` and propagates errors ## Health Monitor * getFilterClientHealth queries RequestEdgeFilterPeerCount via broker * Shard/content health providers fall back to RequestEdgeShardHealth when relay inactive * Listen to EventShardTopicHealthChange for health recalculation * Add missing return p.notReady() on failed edge filter peer count request * HealthyThreshold constant moved to `connection_status.nim` ## Broker types * RequestEdgeShardHealth, RequestEdgeFilterPeerCount request types * EventShardTopicHealthChange event type ## Filter Client * Add timeout parameter to ping proc ## Tests * Health monitor event tests with per-node lockNewGlobalBrokerContext * Edge (light client) health update test * Edge health driven by confirmed filter subscriptions test * API subscription tests: sub/receive, failover, peer replacement Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Co-authored by Zoltan Nagy
2026-03-30 08:30:34 -03:00
await self.stopEdgeFilterLoops()
# Only clear providers we actually registered: another SubscriptionManager
# sharing this brokerCtx may have won the race, and clearing its provider
# would leave the broker silently provider-less.
if self.ownsEdgeShardHealthProvider:
RequestEdgeShardHealth.clearProvider(self.node.brokerCtx)
self.ownsEdgeShardHealthProvider = false
if self.ownsEdgeFilterPeerCountProvider:
RequestEdgeFilterPeerCount.clearProvider(self.node.brokerCtx)
self.ownsEdgeFilterPeerCountProvider = false