logos-delivery/waku/node/delivery_service/subscription_manager.nim
NagyZoltanPeter 42e0aa43d1
feat: persistency (#3880)
* persistency: per-job SQLite-backed storage layer (singleton, brokered)

Adds a backend-neutral CRUD library at waku/persistency/, plus the
nim-brokers dependency swap that enables it.

Architecture (ports-and-adapters):
  * Persistency: process-wide singleton, one root directory.
  * Job: one tenant, one DB file, one worker thread, one BrokerContext.
  * Backend: SQLite via waku/common/databases/db_sqlite. Uniform schema
    kv(category BLOB, key BLOB, payload BLOB) PRIMARY KEY (category, key)
    WITHOUT ROWID, WAL mode.
  * Writes are fire-and-forget via EventBroker(mt) PersistEvent.
  * Reads are async via five RequestBroker(mt) shapes (KvGet, KvExists,
    KvScan, KvCount, KvDelete). Reads return Result[T, PersistencyError].
  * One storage thread per job; tenants isolated by BrokerContext.

Public surface (waku/persistency/persistency.nim):
  Persistency.instance(rootDir) / Persistency.instance() / Persistency.reset()
  p.openJob(id) / p.closeJob(id) / p.dropJob(id) / p.close()
  p.job(id) / p[id] / p.hasJob(id)
  Writes (Job form & string-id form, fire-and-forget):
    persist / persistPut / persistDelete / persistEncoded
  Reads (Job form & string-id form, async Result):
    get / exists / scan / scanPrefix / count / deleteAcked

Key & payload encoding (keys.nim, payload.nim):
  * encodePart family + variadic key(...) / payload(...) macros +
    single-value toKey / toPayload.
  * Primitives: string and openArray[byte] are 2-byte BE length + bytes;
    int{8..64} are sign-flipped 8-byte BE; uint{16..64} are 8-byte BE;
    bool/byte/char are 1 byte; enums are int64(ord(v)).
  * Generic encodePart[T: tuple | object] recurses through fields() so
    any composite Nim type is encodable without ceremony.
  * Stable across Nim/C compiler upgrades: no sizeof, no memcpy, no
    cast on pointers, no host-endianness dependency.
  * `rawKey(bytes)` + `persistPut(..., openArray[byte])` let callers
    bypass the built-in encoder with their own format (CBOR, protobuf...).

Lifecycle:
  * Persistency.new is private; Persistency.instance is the only public
    constructor. Same rootDir is idempotent; conflicting rootDir is
    peInvalidArgument. Persistency.reset for test/restart paths.
  * openJob opens-or-creates the per-job SQLite file; an existing file
    is reused with its data preserved.
  * Teardown integration: Persistency.instance registers a Teardown
    MultiRequestBroker provider that closes all jobs and clears the
    singleton slot when Waku.stop() issues Teardown.request.

Internal layering:
  types.nim          pure value types (Key, KeyRange, KvRow, TxOp,
                     PersistencyError)
  keys.nim           encodePart primitives + key(...) macro
  payload.nim        toPayload + payload(...) macro
  schema.nim         CREATE TABLE + connection pragmas + user_version
  backend_sqlite.nim KvBackend, applyOps (single source of write SQL),
                     getOne/existsOne/deleteOne, scanRange (asc/desc,
                     half-open ranges, open-ended stop), countRange
  backend_comm.nim   EventBroker(mt) PersistEvent + 5 RequestBroker(mt)
                     declarations; encodeErr/decodeErr boundary helpers
  backend_thread.nim startStorageThread / stopStorageThread (shared
                     allocShared0 arg, cstring dbPath, atomic
                     ready/shutdown flags); per-thread provider
                     registration
  persistency.nim    Persistency + Job types, singleton state, public
                     facade
  ../requests/lifecycle_requests.nim
                     Teardown MultiRequestBroker

Tests (69 cases, all passing):
  test_keys.nim          sort-order invariants (length-prefix strings,
                         sign-flipped ints, composite tuples, prefix
                         range)
  test_backend.nim       round-trip / replace / delete-return-value /
                         batched atomicity / asc-desc-half-open-open-
                         ended scans / category isolation / batch
                         txDelete
  test_lifecycle.nim     open-or-create rootDir / non-dir collision /
                         reopen across sessions / idempotent openJob /
                         two-tenant parallel isolation / closeJob joins
                         worker / dropJob removes file / acked delete
  test_facade.nim        put-then-get / atomic batch / scanPrefix
                         asc/desc / deleteAcked hit-miss /
                         fire-and-forget delete / two-tenant facade
                         isolation
  test_encoding.nim      tuple/named-tuple/object keys, embedded Key,
                         enum encoding, field-major composite sort,
                         payload struct encoding, end-to-end struct
                         round-trip through SQLite
  test_string_lookup.nim peJobNotFound semantics / hasJob / subscript /
                         persistPut+get via id / reads short-circuit /
                         writes drop+warn / persistEncoded via id /
                         scan parity Job-ref vs id
  test_singleton.nim     idempotent same-rootDir / different-rootDir
                         rejection / no-arg instance lifecycle / reset
                         retargets / reset idempotence / Teardown.request
                         end-to-end

Prerequisite delivered in the same series: replace the in-tree broker
implementation with the external nim-brokers package; update all
broker call-sites (waku_filter_v2, waku_relay, waku_rln_relay,
delivery_service, peer_manager, requests/*, factory/*, api tests, etc.)
to the new package API; chat2 made to compile again.

Note: SDS adapter (Phase 5 of the design) is deferred -- nim-sds is
still developed side-by-side and the persistency layer is intentionally
SDS-agnostic.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* persistency: pin nim-brokers by URL+commit (workaround for stale registry)

The bare `brokers >= 2.0.1` form cannot resolve on machines where the
local nimble SAT solver enumerates only the registry-recorded 0.1.0 for
brokers. The nim-lang/packages entry for `brokers` carries no per-tag
metadata (only the URL), so until that registry entry is refreshed the
SAT solver clamps the available-versions list to 0.1.0 and rejects the
>= 2.0.1 constraint -- even though pkgs2 and pkgcache both have v2.0.1
cloned locally.

Pinning by URL+commit bypasses the registry path entirely. Inline
comment in waku.nimble documents the situation and the path back to
the bare form once nim-lang/packages is updated.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* persistency: nph format pass

Run `nph` on all 57 Nim files touched by this PR. Pure formatting:
17 files re-styled, no semantic change. Suite still 69/69.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* Fix build, add local-storage-path config, lazy init of Persistency from Waku start

* fix: fix nix deps

* fixes for nix build, regenerate deps

* reverting accidental dependency changes

* Fixing deps

* Apply suggestions from code review

Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com>

* persistency tests: migrate to suite / asyncTest / await

Match the in-tree test convention (procSuite -> suite, sync test +
waitFor -> asyncTest + await):

- procSuite "X": -> suite "X":
- For tests doing async work: test -> asyncTest, waitFor -> await.
- Poll helpers (proc waitFor(t: Job, ...) in test_lifecycle.nim,
  proc waitUntilExists(...) in test_facade.nim and
  test_string_lookup.nim) -> Future[bool] {.async.}, internal
  `waitFor X` -> `await X`, internal `sleep(N)` ->
  `await sleepAsync(chronos.milliseconds(N))`.
- Renamed test_lifecycle.nim's helper proc from `waitFor(t: Job, ...)`
  -> `pollExists(t: Job, ...)`; the previous name shadowed
  chronos.waitFor in the chronos macro expansion.
- `chronos.milliseconds(N)` explicitly qualified because `std/times`
  also exports `milliseconds` (returning TimeInterval, not Duration).
- `check await x` -> `let okN = await x; check okN` to dodge chronos's
  "yield in expr not lowered" with await-as-macro-argument.
- `(await x).foo()` -> `let awN = await x; ... awN.foo() ...` for the
  same reason.

waku/persistency/persistency.nim: nph also pulled the proc signatures
across multiple lines; restored explicit `Future[void] {.async.}`
return types after the colon (an intermediate nph pass had elided them).

Suite: 71 / 71 OK against the new async write surface.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* use idiomatic valueOr instead of ifs

* Reworked persistency shutdown, remove not necessary teardown mechanism

* Use const for DefaultStoragePath

* format to follow coding guidelines - no use of result and explicit returns - no functional change

---------

Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com>
2026-05-16 00:09:07 +02:00

597 lines
22 KiB
Nim

import std/[sequtils, sets, tables, options, strutils], chronos, chronicles, results
import libp2p/[peerid, peerinfo]
import brokers/broker_context
import
waku/[
waku_core,
waku_core/topics,
waku_core/topics/sharding,
waku_node,
waku_relay,
waku_filter_v2/common as filter_common,
waku_filter_v2/client as filter_client,
waku_filter_v2/protocol as filter_protocol,
events/health_events,
events/peer_events,
requests/health_requests,
node/peer_manager,
node/health_monitor/topic_health,
node/health_monitor/connection_status,
]
# ---------------------------------------------------------------------------
# Logos Messaging API SubscriptionManager
#
# Maps all topic subscription intent and centralizes all consistency
# maintenance of the pubsub and content topic subscription model across
# the various network drivers that handle topics (Edge/Filter and Core/Relay).
# ---------------------------------------------------------------------------
type EdgeFilterSubState* = object
peers: seq[RemotePeerInfo]
## Filter service peers with confirmed subscriptions on this shard.
pending: seq[Future[void]] ## In-flight dial futures for peers not yet confirmed.
pendingPeers: HashSet[PeerId] ## PeerIds of peers currently being dialed.
currentHealth: TopicHealth
## Cached health derived from peers.len; updated on every peer set change.
func toTopicHealth*(peersCount: int): TopicHealth =
if peersCount >= HealthyThreshold:
TopicHealth.SUFFICIENTLY_HEALTHY
elif peersCount > 0:
TopicHealth.MINIMALLY_HEALTHY
else:
TopicHealth.UNHEALTHY
type SubscriptionManager* = ref object of RootObj
node: WakuNode
contentTopicSubs: Table[PubsubTopic, HashSet[ContentTopic]]
## Map of Shard to ContentTopic needed because e.g. WakuRelay is PubsubTopic only.
## A present key with an empty HashSet value means pubsubtopic already subscribed
## (via subscribePubsubTopics()) but there's no specific content topic interest yet.
edgeFilterSubStates*: Table[PubsubTopic, EdgeFilterSubState]
## Per-shard filter subscription state for edge mode.
edgeFilterWakeup: AsyncEvent
## Signalled when the edge filter sub loop should re-reconcile.
edgeFilterSubLoopFut: Future[void]
edgeFilterHealthLoopFut: Future[void]
peerEventListener: WakuPeerEventListener
## Listener for peer connect/disconnect events (edge filter wakeup).
iterator subscribedTopics*(
self: SubscriptionManager
): (PubsubTopic, HashSet[ContentTopic]) =
## Iterate over all subscribed content topics, batched per shard.
## This is guaranteed to return a non-empty `topics` (content topics) list on iteration.
for pubsub, topics in self.contentTopicSubs.pairs:
# We are iterating over subscribed content topics; if we are subscribed to
# a shard but have no subscription (interest) for any content topic in that
# shard, then avoid triggering an iteration that doesn't advance the intent
# to iterate over content topic subscriptions.
if topics.len == 0:
continue
yield (pubsub, topics)
proc edgeFilterPeerCount*(sm: SubscriptionManager, shard: PubsubTopic): int =
sm.edgeFilterSubStates.withValue(shard, state):
return state.peers.len
return 0
proc new*(T: typedesc[SubscriptionManager], node: WakuNode): T =
SubscriptionManager(
node: node, contentTopicSubs: initTable[PubsubTopic, HashSet[ContentTopic]]()
)
proc addContentTopicInterest(
self: SubscriptionManager, shard: PubsubTopic, topic: ContentTopic
): Result[void, string] =
var changed = false
if not self.contentTopicSubs.hasKey(shard):
self.contentTopicSubs[shard] = initHashSet[ContentTopic]()
changed = true
self.contentTopicSubs.withValue(shard, cTopics):
if not cTopics[].contains(topic):
cTopics[].incl(topic)
changed = true
if changed and not isNil(self.edgeFilterWakeup):
self.edgeFilterWakeup.fire()
return ok()
proc removeContentTopicInterest(
self: SubscriptionManager, shard: PubsubTopic, topic: ContentTopic
): Result[void, string] =
var changed = false
self.contentTopicSubs.withValue(shard, cTopics):
if cTopics[].contains(topic):
cTopics[].excl(topic)
changed = true
if cTopics[].len == 0 and isNil(self.node.wakuRelay):
self.contentTopicSubs.del(shard) # We're done with cTopics here
if changed and not isNil(self.edgeFilterWakeup):
self.edgeFilterWakeup.fire()
return ok()
proc subscribePubsubTopics(
self: SubscriptionManager, shards: seq[PubsubTopic]
): Result[void, string] =
if isNil(self.node.wakuRelay):
return err("subscribePubsubTopics requires a Relay")
var errors: seq[string]
for shard in shards:
if not self.contentTopicSubs.hasKey(shard):
self.node.subscribe((kind: PubsubSub, topic: shard), nil).isOkOr:
errors.add("shard " & shard & ": " & error)
continue
self.contentTopicSubs[shard] = initHashSet[ContentTopic]()
if errors.len > 0:
return err("subscribeShard errors: " & errors.join("; "))
return ok()
proc getShardForContentTopic(
self: SubscriptionManager, topic: ContentTopic
): Result[PubsubTopic, string] =
if self.node.wakuAutoSharding.isSome():
let shardObj = ?self.node.wakuAutoSharding.get().getShard(topic)
return ok($shardObj)
return err("SubscriptionManager requires AutoSharding")
proc isSubscribed*(
self: SubscriptionManager, topic: ContentTopic
): Result[bool, string] =
let shard = ?self.getShardForContentTopic(topic)
return ok(
self.contentTopicSubs.hasKey(shard) and self.contentTopicSubs[shard].contains(topic)
)
proc isSubscribed*(
self: SubscriptionManager, shard: PubsubTopic, contentTopic: ContentTopic
): bool {.raises: [].} =
self.contentTopicSubs.withValue(shard, cTopics):
return cTopics[].contains(contentTopic)
return false
proc subscribe*(self: SubscriptionManager, topic: ContentTopic): Result[void, string] =
if isNil(self.node.wakuRelay) and isNil(self.node.wakuFilterClient):
return err("SubscriptionManager requires either Relay or Filter Client.")
let shard = ?self.getShardForContentTopic(topic)
if not isNil(self.node.wakuRelay) and not self.contentTopicSubs.hasKey(shard):
?self.subscribePubsubTopics(@[shard])
?self.addContentTopicInterest(shard, topic)
return ok()
proc unsubscribe*(
self: SubscriptionManager, topic: ContentTopic
): Result[void, string] =
if isNil(self.node.wakuRelay) and isNil(self.node.wakuFilterClient):
return err("SubscriptionManager requires either Relay or Filter Client.")
let shard = ?self.getShardForContentTopic(topic)
if self.isSubscribed(shard, topic):
?self.removeContentTopicInterest(shard, topic)
return ok()
# ---------------------------------------------------------------------------
# Edge Filter driver for the Logos Messaging API
#
# The SubscriptionManager absorbs natively the responsibility of using the
# Edge Filter protocol to effect subscriptions and message receipt for edge.
# ---------------------------------------------------------------------------
const EdgeFilterSubscribeTimeout = chronos.seconds(15)
## Timeout for a single filter subscribe/unsubscribe RPC to a service peer.
const EdgeFilterPingTimeout = chronos.seconds(5)
## Timeout for a filter ping health check.
const EdgeFilterLoopInterval = chronos.seconds(30)
## Interval for the edge filter health ping loop.
const EdgeFilterSubLoopDebounce = chronos.seconds(1)
## Debounce delay to coalesce rapid-fire wakeups into a single reconciliation pass.
type EdgeDialTask = object
peer: RemotePeerInfo
shard: PubsubTopic
topics: seq[ContentTopic]
proc updateShardHealth(
self: SubscriptionManager, shard: PubsubTopic, state: var EdgeFilterSubState
) =
## Recompute and emit health for a shard after its peer set changed.
let newHealth = toTopicHealth(state.peers.len)
if newHealth != state.currentHealth:
state.currentHealth = newHealth
EventShardTopicHealthChange.emit(self.node.brokerCtx, shard, newHealth)
proc removePeer(self: SubscriptionManager, shard: PubsubTopic, peerId: PeerId) =
## Remove a peer from edgeFilterSubStates for the given shard,
## update health, and wake the sub loop to dial a replacement.
## Best-effort unsubscribe so the service peer stops pushing to us.
self.edgeFilterSubStates.withValue(shard, state):
var peer: RemotePeerInfo
var found = false
for p in state.peers:
if p.peerId == peerId:
peer = p
found = true
break
if not found:
return
state.peers.keepItIf(it.peerId != peerId)
self.updateShardHealth(shard, state[])
self.edgeFilterWakeup.fire()
if not self.node.wakuFilterClient.isNil():
self.contentTopicSubs.withValue(shard, topics):
let ct = toSeq(topics[])
if ct.len > 0:
proc doUnsubscribe() {.async.} =
discard await self.node.wakuFilterClient.unsubscribe(peer, shard, ct)
asyncSpawn doUnsubscribe()
type SendChunkedFilterRpcKind = enum
FilterSubscribe
FilterUnsubscribe
proc sendChunkedFilterRpc(
self: SubscriptionManager,
peer: RemotePeerInfo,
shard: PubsubTopic,
topics: seq[ContentTopic],
kind: SendChunkedFilterRpcKind,
): Future[bool] {.async.} =
## Send a chunked filter subscribe or unsubscribe RPC. Returns true on
## success. On failure the peer is removed and false is returned.
try:
var i = 0
while i < topics.len:
let chunk =
topics[i ..< min(i + filter_protocol.MaxContentTopicsPerRequest, topics.len)]
let fut =
case kind
of FilterSubscribe:
self.node.wakuFilterClient.subscribe(peer, shard, chunk)
of FilterUnsubscribe:
self.node.wakuFilterClient.unsubscribe(peer, shard, chunk)
if not (await fut.withTimeout(EdgeFilterSubscribeTimeout)) or fut.read().isErr():
trace "sendChunkedFilterRpc: chunk failed",
op = kind, shard = shard, peer = peer.peerId
self.removePeer(shard, peer.peerId)
return false
i += filter_protocol.MaxContentTopicsPerRequest
except CatchableError as exc:
debug "sendChunkedFilterRpc: failed",
op = kind, shard = shard, peer = peer.peerId, err = exc.msg
self.removePeer(shard, peer.peerId)
return false
return true
proc syncFilterDeltas(
self: SubscriptionManager,
peer: RemotePeerInfo,
shard: PubsubTopic,
added: seq[ContentTopic],
removed: seq[ContentTopic],
) {.async.} =
## Push content topic changes (adds/removes) to an already-tracked peer.
if added.len > 0:
if not await self.sendChunkedFilterRpc(peer, shard, added, FilterSubscribe):
return
if removed.len > 0:
discard await self.sendChunkedFilterRpc(peer, shard, removed, FilterUnsubscribe)
proc dialFilterPeer(
self: SubscriptionManager,
peer: RemotePeerInfo,
shard: PubsubTopic,
contentTopics: seq[ContentTopic],
) {.async.} =
## Subscribe a new peer to all content topics on a shard and start tracking it.
self.edgeFilterSubStates.withValue(shard, state):
state.pendingPeers.incl(peer.peerId)
try:
if not await self.sendChunkedFilterRpc(peer, shard, contentTopics, FilterSubscribe):
return
self.edgeFilterSubStates.withValue(shard, state):
if state.peers.anyIt(it.peerId == peer.peerId):
trace "dialFilterPeer: peer already tracked, skipping duplicate",
shard = shard, peer = peer.peerId
return
state.peers.add(peer)
self.updateShardHealth(shard, state[])
trace "dialFilterPeer: successfully subscribed to all chunks",
shard = shard, peer = peer.peerId, totalPeers = state.peers.len
do:
trace "dialFilterPeer: shard removed while subscribing, discarding result",
shard = shard, peer = peer.peerId
finally:
self.edgeFilterSubStates.withValue(shard, state):
state.pendingPeers.excl(peer.peerId)
proc edgeFilterHealthLoop*(self: SubscriptionManager) {.async.} =
## Periodically pings all connected filter service peers to verify they are
## still alive at the application layer. Peers that fail the ping are removed.
while true:
await sleepAsync(EdgeFilterLoopInterval)
if self.node.wakuFilterClient.isNil():
warn "filter client is nil within edge filter health loop"
continue
var connected = initTable[PeerId, RemotePeerInfo]()
for state in self.edgeFilterSubStates.values:
for peer in state.peers:
if self.node.peerManager.switch.peerStore.isConnected(peer.peerId):
connected[peer.peerId] = peer
var alive = initHashSet[PeerId]()
if connected.len > 0:
var pingTasks: seq[(PeerId, Future[FilterSubscribeResult])]
for peer in connected.values:
pingTasks.add(
(peer.peerId, self.node.wakuFilterClient.ping(peer, EdgeFilterPingTimeout))
)
# extract future tasks from (PeerId, Future) tuples and await them
await allFutures(pingTasks.mapIt(it[1]))
for (peerId, task) in pingTasks:
if task.read().isOk():
alive.incl(peerId)
var changed = false
for shard, state in self.edgeFilterSubStates.mpairs:
let oldLen = state.peers.len
state.peers.keepItIf(it.peerId notin connected or alive.contains(it.peerId))
if state.peers.len < oldLen:
changed = true
self.updateShardHealth(shard, state)
trace "Edge Filter health degraded by Ping failure",
shard = shard, new = state.currentHealth
if changed:
self.edgeFilterWakeup.fire()
proc selectFilterCandidates(
self: SubscriptionManager, shard: PubsubTopic, exclude: HashSet[PeerId], needed: int
): seq[RemotePeerInfo] =
## Select filter service peer candidates for a shard.
# Start with every filter server peer that can serve the shard
var allCandidates = self.node.peerManager.selectPeers(
filter_common.WakuFilterSubscribeCodec, some(shard)
)
# Remove all already used in this shard or being dialed for it
allCandidates.keepItIf(it.peerId notin exclude)
# Collect peer IDs already tracked on other shards
var trackedOnOther = initHashSet[PeerId]()
for otherShard, otherState in self.edgeFilterSubStates.pairs:
if otherShard != shard:
for peer in otherState.peers:
trackedOnOther.incl(peer.peerId)
# Prefer peers we already have a connection to first, preserving shuffle
var candidates =
allCandidates.filterIt(it.peerId in trackedOnOther) &
allCandidates.filterIt(it.peerId notin trackedOnOther)
# We need to return 'needed' peers only
if candidates.len > needed:
candidates.setLen(needed)
return candidates
proc edgeFilterSubLoop*(self: SubscriptionManager) {.async.} =
## Reconciles filter subscriptions with the desired state from SubscriptionManager.
var lastSynced = initTable[PubsubTopic, HashSet[ContentTopic]]()
while true:
await self.edgeFilterWakeup.wait()
await sleepAsync(EdgeFilterSubLoopDebounce)
self.edgeFilterWakeup.clear()
trace "edgeFilterSubLoop: woke up"
if isNil(self.node.wakuFilterClient):
trace "edgeFilterSubLoop: wakuFilterClient is nil, skipping"
continue
let desired = self.contentTopicSubs
trace "edgeFilterSubLoop: desired state", numShards = desired.len
let allShards = toHashSet(toSeq(desired.keys)) + toHashSet(toSeq(lastSynced.keys))
# Step 1: read state across all shards at once and
# create a list of peer dial tasks and shard tracking to delete.
var dialTasks: seq[EdgeDialTask]
var shardsToDelete: seq[PubsubTopic]
for shard in allShards:
let currTopics = desired.getOrDefault(shard)
let prevTopics = lastSynced.getOrDefault(shard)
if shard notin self.edgeFilterSubStates:
self.edgeFilterSubStates[shard] =
EdgeFilterSubState(currentHealth: TopicHealth.UNHEALTHY)
let addedTopics = toSeq(currTopics - prevTopics)
let removedTopics = toSeq(prevTopics - currTopics)
self.edgeFilterSubStates.withValue(shard, state):
state.peers.keepItIf(
self.node.peerManager.switch.peerStore.isConnected(it.peerId)
)
state.pending.keepItIf(not it.finished)
if addedTopics.len > 0 or removedTopics.len > 0:
for peer in state.peers:
asyncSpawn self.syncFilterDeltas(peer, shard, addedTopics, removedTopics)
if currTopics.len == 0:
shardsToDelete.add(shard)
else:
self.updateShardHealth(shard, state[])
let needed = max(0, HealthyThreshold - state.peers.len - state.pending.len)
if needed > 0:
let tracked = state.peers.mapIt(it.peerId).toHashSet() + state.pendingPeers
let candidates = self.selectFilterCandidates(shard, tracked, needed)
let toDial = min(needed, candidates.len)
trace "edgeFilterSubLoop: shard reconciliation",
shard = shard,
num_peers = state.peers.len,
num_pending = state.pending.len,
num_needed = needed,
num_available = candidates.len,
toDial = toDial
for i in 0 ..< toDial:
dialTasks.add(
EdgeDialTask(
peer: candidates[i], shard: shard, topics: toSeq(currTopics)
)
)
# Step 2: execute deferred shard tracking deletion and dial tasks.
for shard in shardsToDelete:
self.edgeFilterSubStates.withValue(shard, state):
for fut in state.pending:
if not fut.finished:
await fut.cancelAndWait()
self.edgeFilterSubStates.del(shard)
for task in dialTasks:
let fut = self.dialFilterPeer(task.peer, task.shard, task.topics)
self.edgeFilterSubStates.withValue(task.shard, state):
state.pending.add(fut)
lastSynced = desired
proc startEdgeFilterLoops(self: SubscriptionManager): Result[void, string] =
## Start the edge filter orchestration loops.
## Caller must ensure this is only called in edge mode (relay nil, filter client present).
self.edgeFilterWakeup = newAsyncEvent()
self.peerEventListener = WakuPeerEvent.listen(
self.node.brokerCtx,
proc(evt: WakuPeerEvent) {.async: (raises: []), gcsafe.} =
if evt.kind == WakuPeerEventKind.EventDisconnected or
evt.kind == WakuPeerEventKind.EventMetadataUpdated:
self.edgeFilterWakeup.fire()
,
).valueOr:
return err("Failed to listen to peer events for edge filter: " & error)
self.edgeFilterSubLoopFut = self.edgeFilterSubLoop()
self.edgeFilterHealthLoopFut = self.edgeFilterHealthLoop()
return ok()
proc stopEdgeFilterLoops(self: SubscriptionManager) {.async: (raises: []).} =
## Stop the edge filter orchestration loops and clean up pending futures.
if not isNil(self.edgeFilterSubLoopFut):
await self.edgeFilterSubLoopFut.cancelAndWait()
self.edgeFilterSubLoopFut = nil
if not isNil(self.edgeFilterHealthLoopFut):
await self.edgeFilterHealthLoopFut.cancelAndWait()
self.edgeFilterHealthLoopFut = nil
for shard, state in self.edgeFilterSubStates:
for fut in state.pending:
if not fut.finished:
await fut.cancelAndWait()
await WakuPeerEvent.dropListener(self.node.brokerCtx, self.peerEventListener)
# ---------------------------------------------------------------------------
# SubscriptionManager Lifecycle (calls Edge behavior above)
#
# startSubscriptionManager and stopSubscriptionManager orchestrate both the
# core (relay) and edge (filter) paths, and register/clear broker providers.
# ---------------------------------------------------------------------------
proc startSubscriptionManager*(self: SubscriptionManager): Result[void, string] =
# Register edge filter broker providers. The shard/content health providers
# in WakuNode query these via the broker as a fallback when relay health is
# not available. If edge mode is not active, these providers simply return
# NOT_SUBSCRIBED / strength 0, which is harmless.
RequestEdgeShardHealth.setProvider(
self.node.brokerCtx,
proc(shard: PubsubTopic): Result[RequestEdgeShardHealth, string] =
self.edgeFilterSubStates.withValue(shard, state):
return ok(RequestEdgeShardHealth(health: state.currentHealth))
return ok(RequestEdgeShardHealth(health: TopicHealth.NOT_SUBSCRIBED)),
).isOkOr:
error "Can't set provider for RequestEdgeShardHealth", error = error
RequestEdgeFilterPeerCount.setProvider(
self.node.brokerCtx,
proc(): Result[RequestEdgeFilterPeerCount, string] =
var minPeers = high(int)
for state in self.edgeFilterSubStates.values:
minPeers = min(minPeers, state.peers.len)
if minPeers == high(int):
minPeers = 0
return ok(RequestEdgeFilterPeerCount(peerCount: minPeers)),
).isOkOr:
error "Can't set provider for RequestEdgeFilterPeerCount", error = error
if self.node.wakuRelay.isNil():
return self.startEdgeFilterLoops()
# Core mode: auto-subscribe relay to all shards in autosharding.
if self.node.wakuAutoSharding.isSome():
let autoSharding = self.node.wakuAutoSharding.get()
let clusterId = autoSharding.clusterId
let numShards = autoSharding.shardCountGenZero
if numShards > 0:
var clusterPubsubTopics = newSeqOfCap[PubsubTopic](numShards)
for i in 0 ..< numShards:
let shardObj = RelayShard(clusterId: clusterId, shardId: uint16(i))
clusterPubsubTopics.add(PubsubTopic($shardObj))
self.subscribePubsubTopics(clusterPubsubTopics).isOkOr:
error "Failed to auto-subscribe Relay to cluster shards: ", error = error
else:
info "SubscriptionManager has no AutoSharding configured; skipping auto-subscribe."
return ok()
proc stopSubscriptionManager*(self: SubscriptionManager) {.async: (raises: []).} =
if self.node.wakuRelay.isNil():
await self.stopEdgeFilterLoops()
RequestEdgeShardHealth.clearProvider(self.node.brokerCtx)
RequestEdgeFilterPeerCount.clearProvider(self.node.brokerCtx)