import std/[sequtils, sets, tables, options, strutils], chronos, chronicles, results import libp2p/[peerid, peerinfo] import waku/[ waku_core, waku_core/topics, waku_core/topics/sharding, waku_node, waku_relay, waku_filter_v2/common as filter_common, waku_filter_v2/client as filter_client, waku_filter_v2/protocol as filter_protocol, common/broker/broker_context, events/health_events, events/peer_events, requests/health_requests, node/peer_manager, node/health_monitor/topic_health, node/health_monitor/connection_status, ] # --------------------------------------------------------------------------- # Logos Messaging API SubscriptionManager # # Maps all topic subscription intent and centralizes all consistency # maintenance of the pubsub and content topic subscription model across # the various network drivers that handle topics (Edge/Filter and Core/Relay). # --------------------------------------------------------------------------- type EdgeFilterSubState* = object peers: seq[RemotePeerInfo] ## Filter service peers with confirmed subscriptions on this shard. pending: seq[Future[void]] ## In-flight dial futures for peers not yet confirmed. pendingPeers: HashSet[PeerId] ## PeerIds of peers currently being dialed. currentHealth: TopicHealth ## Cached health derived from peers.len; updated on every peer set change. func toTopicHealth*(peersCount: int): TopicHealth = if peersCount >= HealthyThreshold: TopicHealth.SUFFICIENTLY_HEALTHY elif peersCount > 0: TopicHealth.MINIMALLY_HEALTHY else: TopicHealth.UNHEALTHY type SubscriptionManager* = ref object of RootObj node: WakuNode contentTopicSubs: Table[PubsubTopic, HashSet[ContentTopic]] ## Map of Shard to ContentTopic needed because e.g. WakuRelay is PubsubTopic only. ## A present key with an empty HashSet value means pubsubtopic already subscribed ## (via subscribePubsubTopics()) but there's no specific content topic interest yet. edgeFilterSubStates*: Table[PubsubTopic, EdgeFilterSubState] ## Per-shard filter subscription state for edge mode. edgeFilterWakeup: AsyncEvent ## Signalled when the edge filter sub loop should re-reconcile. edgeFilterSubLoopFut: Future[void] edgeFilterHealthLoopFut: Future[void] peerEventListener: WakuPeerEventListener ## Listener for peer connect/disconnect events (edge filter wakeup). iterator subscribedTopics*( self: SubscriptionManager ): (PubsubTopic, HashSet[ContentTopic]) = for pubsub, topics in self.contentTopicSubs.pairs: yield (pubsub, topics) proc edgeFilterPeerCount*(sm: SubscriptionManager, shard: PubsubTopic): int = sm.edgeFilterSubStates.withValue(shard, state): return state.peers.len return 0 proc new*(T: typedesc[SubscriptionManager], node: WakuNode): T = SubscriptionManager( node: node, contentTopicSubs: initTable[PubsubTopic, HashSet[ContentTopic]]() ) proc addContentTopicInterest( self: SubscriptionManager, shard: PubsubTopic, topic: ContentTopic ): Result[void, string] = var changed = false if not self.contentTopicSubs.hasKey(shard): self.contentTopicSubs[shard] = initHashSet[ContentTopic]() changed = true self.contentTopicSubs.withValue(shard, cTopics): if not cTopics[].contains(topic): cTopics[].incl(topic) changed = true if changed and not isNil(self.edgeFilterWakeup): self.edgeFilterWakeup.fire() return ok() proc removeContentTopicInterest( self: SubscriptionManager, shard: PubsubTopic, topic: ContentTopic ): Result[void, string] = var changed = false self.contentTopicSubs.withValue(shard, cTopics): if cTopics[].contains(topic): cTopics[].excl(topic) changed = true if cTopics[].len == 0 and isNil(self.node.wakuRelay): self.contentTopicSubs.del(shard) # We're done with cTopics here if changed and not isNil(self.edgeFilterWakeup): self.edgeFilterWakeup.fire() return ok() proc subscribePubsubTopics( self: SubscriptionManager, shards: seq[PubsubTopic] ): Result[void, string] = if isNil(self.node.wakuRelay): return err("subscribePubsubTopics requires a Relay") var errors: seq[string] for shard in shards: if not self.contentTopicSubs.hasKey(shard): self.node.subscribe((kind: PubsubSub, topic: shard), nil).isOkOr: errors.add("shard " & shard & ": " & error) continue self.contentTopicSubs[shard] = initHashSet[ContentTopic]() if errors.len > 0: return err("subscribeShard errors: " & errors.join("; ")) return ok() proc getShardForContentTopic( self: SubscriptionManager, topic: ContentTopic ): Result[PubsubTopic, string] = if self.node.wakuAutoSharding.isSome(): let shardObj = ?self.node.wakuAutoSharding.get().getShard(topic) return ok($shardObj) return err("SubscriptionManager requires AutoSharding") proc isSubscribed*( self: SubscriptionManager, topic: ContentTopic ): Result[bool, string] = let shard = ?self.getShardForContentTopic(topic) return ok( self.contentTopicSubs.hasKey(shard) and self.contentTopicSubs[shard].contains(topic) ) proc isSubscribed*( self: SubscriptionManager, shard: PubsubTopic, contentTopic: ContentTopic ): bool {.raises: [].} = self.contentTopicSubs.withValue(shard, cTopics): return cTopics[].contains(contentTopic) return false proc subscribe*(self: SubscriptionManager, topic: ContentTopic): Result[void, string] = if isNil(self.node.wakuRelay) and isNil(self.node.wakuFilterClient): return err("SubscriptionManager requires either Relay or Filter Client.") let shard = ?self.getShardForContentTopic(topic) if not isNil(self.node.wakuRelay) and not self.contentTopicSubs.hasKey(shard): ?self.subscribePubsubTopics(@[shard]) ?self.addContentTopicInterest(shard, topic) return ok() proc unsubscribe*( self: SubscriptionManager, topic: ContentTopic ): Result[void, string] = if isNil(self.node.wakuRelay) and isNil(self.node.wakuFilterClient): return err("SubscriptionManager requires either Relay or Filter Client.") let shard = ?self.getShardForContentTopic(topic) if self.isSubscribed(shard, topic): ?self.removeContentTopicInterest(shard, topic) return ok() # --------------------------------------------------------------------------- # Edge Filter driver for the Logos Messaging API # # The SubscriptionManager absorbs natively the responsibility of using the # Edge Filter protocol to effect subscriptions and message receipt for edge. # --------------------------------------------------------------------------- const EdgeFilterSubscribeTimeout = chronos.seconds(15) ## Timeout for a single filter subscribe/unsubscribe RPC to a service peer. const EdgeFilterPingTimeout = chronos.seconds(5) ## Timeout for a filter ping health check. const EdgeFilterLoopInterval = chronos.seconds(30) ## Interval for the edge filter health ping loop. const EdgeFilterSubLoopDebounce = chronos.seconds(1) ## Debounce delay to coalesce rapid-fire wakeups into a single reconciliation pass. type EdgeDialTask = object peer: RemotePeerInfo shard: PubsubTopic topics: seq[ContentTopic] proc updateShardHealth( self: SubscriptionManager, shard: PubsubTopic, state: var EdgeFilterSubState ) = ## Recompute and emit health for a shard after its peer set changed. let newHealth = toTopicHealth(state.peers.len) if newHealth != state.currentHealth: state.currentHealth = newHealth EventShardTopicHealthChange.emit(self.node.brokerCtx, shard, newHealth) proc removePeer(self: SubscriptionManager, shard: PubsubTopic, peerId: PeerId) = ## Remove a peer from edgeFilterSubStates for the given shard, ## update health, and wake the sub loop to dial a replacement. ## Best-effort unsubscribe so the service peer stops pushing to us. self.edgeFilterSubStates.withValue(shard, state): var peer: RemotePeerInfo var found = false for p in state.peers: if p.peerId == peerId: peer = p found = true break if not found: return state.peers.keepItIf(it.peerId != peerId) self.updateShardHealth(shard, state[]) self.edgeFilterWakeup.fire() if not self.node.wakuFilterClient.isNil(): self.contentTopicSubs.withValue(shard, topics): let ct = toSeq(topics[]) if ct.len > 0: proc doUnsubscribe() {.async.} = discard await self.node.wakuFilterClient.unsubscribe(peer, shard, ct) asyncSpawn doUnsubscribe() type SendChunkedFilterRpcKind = enum FilterSubscribe FilterUnsubscribe proc sendChunkedFilterRpc( self: SubscriptionManager, peer: RemotePeerInfo, shard: PubsubTopic, topics: seq[ContentTopic], kind: SendChunkedFilterRpcKind, ): Future[bool] {.async.} = ## Send a chunked filter subscribe or unsubscribe RPC. Returns true on ## success. On failure the peer is removed and false is returned. try: var i = 0 while i < topics.len: let chunk = topics[i ..< min(i + filter_protocol.MaxContentTopicsPerRequest, topics.len)] let fut = case kind of FilterSubscribe: self.node.wakuFilterClient.subscribe(peer, shard, chunk) of FilterUnsubscribe: self.node.wakuFilterClient.unsubscribe(peer, shard, chunk) if not (await fut.withTimeout(EdgeFilterSubscribeTimeout)) or fut.read().isErr(): trace "sendChunkedFilterRpc: chunk failed", op = kind, shard = shard, peer = peer.peerId self.removePeer(shard, peer.peerId) return false i += filter_protocol.MaxContentTopicsPerRequest except CatchableError as exc: debug "sendChunkedFilterRpc: failed", op = kind, shard = shard, peer = peer.peerId, err = exc.msg self.removePeer(shard, peer.peerId) return false return true proc syncFilterDeltas( self: SubscriptionManager, peer: RemotePeerInfo, shard: PubsubTopic, added: seq[ContentTopic], removed: seq[ContentTopic], ) {.async.} = ## Push content topic changes (adds/removes) to an already-tracked peer. if added.len > 0: if not await self.sendChunkedFilterRpc(peer, shard, added, FilterSubscribe): return if removed.len > 0: discard await self.sendChunkedFilterRpc(peer, shard, removed, FilterUnsubscribe) proc dialFilterPeer( self: SubscriptionManager, peer: RemotePeerInfo, shard: PubsubTopic, contentTopics: seq[ContentTopic], ) {.async.} = ## Subscribe a new peer to all content topics on a shard and start tracking it. self.edgeFilterSubStates.withValue(shard, state): state.pendingPeers.incl(peer.peerId) try: if not await self.sendChunkedFilterRpc(peer, shard, contentTopics, FilterSubscribe): return self.edgeFilterSubStates.withValue(shard, state): if state.peers.anyIt(it.peerId == peer.peerId): trace "dialFilterPeer: peer already tracked, skipping duplicate", shard = shard, peer = peer.peerId return state.peers.add(peer) self.updateShardHealth(shard, state[]) trace "dialFilterPeer: successfully subscribed to all chunks", shard = shard, peer = peer.peerId, totalPeers = state.peers.len do: trace "dialFilterPeer: shard removed while subscribing, discarding result", shard = shard, peer = peer.peerId finally: self.edgeFilterSubStates.withValue(shard, state): state.pendingPeers.excl(peer.peerId) proc edgeFilterHealthLoop*(self: SubscriptionManager) {.async.} = ## Periodically pings all connected filter service peers to verify they are ## still alive at the application layer. Peers that fail the ping are removed. while true: await sleepAsync(EdgeFilterLoopInterval) if self.node.wakuFilterClient.isNil(): warn "filter client is nil within edge filter health loop" continue var connected = initTable[PeerId, RemotePeerInfo]() for state in self.edgeFilterSubStates.values: for peer in state.peers: if self.node.peerManager.switch.peerStore.isConnected(peer.peerId): connected[peer.peerId] = peer var alive = initHashSet[PeerId]() if connected.len > 0: var pingTasks: seq[(PeerId, Future[FilterSubscribeResult])] for peer in connected.values: pingTasks.add( (peer.peerId, self.node.wakuFilterClient.ping(peer, EdgeFilterPingTimeout)) ) # extract future tasks from (PeerId, Future) tuples and await them await allFutures(pingTasks.mapIt(it[1])) for (peerId, task) in pingTasks: if task.read().isOk(): alive.incl(peerId) var changed = false for shard, state in self.edgeFilterSubStates.mpairs: let oldLen = state.peers.len state.peers.keepItIf(it.peerId notin connected or alive.contains(it.peerId)) if state.peers.len < oldLen: changed = true self.updateShardHealth(shard, state) trace "Edge Filter health degraded by Ping failure", shard = shard, new = state.currentHealth if changed: self.edgeFilterWakeup.fire() proc selectFilterCandidates( self: SubscriptionManager, shard: PubsubTopic, exclude: HashSet[PeerId], needed: int ): seq[RemotePeerInfo] = ## Select filter service peer candidates for a shard. # Start with every filter server peer that can serve the shard var allCandidates = self.node.peerManager.selectPeers( filter_common.WakuFilterSubscribeCodec, some(shard) ) # Remove all already used in this shard or being dialed for it allCandidates.keepItIf(it.peerId notin exclude) # Collect peer IDs already tracked on other shards var trackedOnOther = initHashSet[PeerId]() for otherShard, otherState in self.edgeFilterSubStates.pairs: if otherShard != shard: for peer in otherState.peers: trackedOnOther.incl(peer.peerId) # Prefer peers we already have a connection to first, preserving shuffle var candidates = allCandidates.filterIt(it.peerId in trackedOnOther) & allCandidates.filterIt(it.peerId notin trackedOnOther) # We need to return 'needed' peers only if candidates.len > needed: candidates.setLen(needed) return candidates proc edgeFilterSubLoop*(self: SubscriptionManager) {.async.} = ## Reconciles filter subscriptions with the desired state from SubscriptionManager. var lastSynced = initTable[PubsubTopic, HashSet[ContentTopic]]() while true: await self.edgeFilterWakeup.wait() await sleepAsync(EdgeFilterSubLoopDebounce) self.edgeFilterWakeup.clear() trace "edgeFilterSubLoop: woke up" if isNil(self.node.wakuFilterClient): trace "edgeFilterSubLoop: wakuFilterClient is nil, skipping" continue let desired = self.contentTopicSubs trace "edgeFilterSubLoop: desired state", numShards = desired.len let allShards = toHashSet(toSeq(desired.keys)) + toHashSet(toSeq(lastSynced.keys)) # Step 1: read state across all shards at once and # create a list of peer dial tasks and shard tracking to delete. var dialTasks: seq[EdgeDialTask] var shardsToDelete: seq[PubsubTopic] for shard in allShards: let currTopics = desired.getOrDefault(shard) let prevTopics = lastSynced.getOrDefault(shard) if shard notin self.edgeFilterSubStates: self.edgeFilterSubStates[shard] = EdgeFilterSubState(currentHealth: TopicHealth.UNHEALTHY) let addedTopics = toSeq(currTopics - prevTopics) let removedTopics = toSeq(prevTopics - currTopics) self.edgeFilterSubStates.withValue(shard, state): state.peers.keepItIf( self.node.peerManager.switch.peerStore.isConnected(it.peerId) ) state.pending.keepItIf(not it.finished) if addedTopics.len > 0 or removedTopics.len > 0: for peer in state.peers: asyncSpawn self.syncFilterDeltas(peer, shard, addedTopics, removedTopics) if currTopics.len == 0: shardsToDelete.add(shard) else: self.updateShardHealth(shard, state[]) let needed = max(0, HealthyThreshold - state.peers.len - state.pending.len) if needed > 0: let tracked = state.peers.mapIt(it.peerId).toHashSet() + state.pendingPeers let candidates = self.selectFilterCandidates(shard, tracked, needed) let toDial = min(needed, candidates.len) trace "edgeFilterSubLoop: shard reconciliation", shard = shard, num_peers = state.peers.len, num_pending = state.pending.len, num_needed = needed, num_available = candidates.len, toDial = toDial for i in 0 ..< toDial: dialTasks.add( EdgeDialTask( peer: candidates[i], shard: shard, topics: toSeq(currTopics) ) ) # Step 2: execute deferred shard tracking deletion and dial tasks. for shard in shardsToDelete: self.edgeFilterSubStates.withValue(shard, state): for fut in state.pending: if not fut.finished: await fut.cancelAndWait() self.edgeFilterSubStates.del(shard) for task in dialTasks: let fut = self.dialFilterPeer(task.peer, task.shard, task.topics) self.edgeFilterSubStates.withValue(task.shard, state): state.pending.add(fut) lastSynced = desired proc startEdgeFilterLoops(self: SubscriptionManager): Result[void, string] = ## Start the edge filter orchestration loops. ## Caller must ensure this is only called in edge mode (relay nil, filter client present). self.edgeFilterWakeup = newAsyncEvent() self.peerEventListener = WakuPeerEvent.listen( self.node.brokerCtx, proc(evt: WakuPeerEvent) {.async: (raises: []), gcsafe.} = if evt.kind == WakuPeerEventKind.EventDisconnected or evt.kind == WakuPeerEventKind.EventMetadataUpdated: self.edgeFilterWakeup.fire() , ).valueOr: return err("Failed to listen to peer events for edge filter: " & error) self.edgeFilterSubLoopFut = self.edgeFilterSubLoop() self.edgeFilterHealthLoopFut = self.edgeFilterHealthLoop() return ok() proc stopEdgeFilterLoops(self: SubscriptionManager) {.async: (raises: []).} = ## Stop the edge filter orchestration loops and clean up pending futures. if not isNil(self.edgeFilterSubLoopFut): await self.edgeFilterSubLoopFut.cancelAndWait() self.edgeFilterSubLoopFut = nil if not isNil(self.edgeFilterHealthLoopFut): await self.edgeFilterHealthLoopFut.cancelAndWait() self.edgeFilterHealthLoopFut = nil for shard, state in self.edgeFilterSubStates: for fut in state.pending: if not fut.finished: await fut.cancelAndWait() WakuPeerEvent.dropListener(self.node.brokerCtx, self.peerEventListener) # --------------------------------------------------------------------------- # SubscriptionManager Lifecycle (calls Edge behavior above) # # startSubscriptionManager and stopSubscriptionManager orchestrate both the # core (relay) and edge (filter) paths, and register/clear broker providers. # --------------------------------------------------------------------------- proc startSubscriptionManager*(self: SubscriptionManager): Result[void, string] = # Register edge filter broker providers. The shard/content health providers # in WakuNode query these via the broker as a fallback when relay health is # not available. If edge mode is not active, these providers simply return # NOT_SUBSCRIBED / strength 0, which is harmless. RequestEdgeShardHealth.setProvider( self.node.brokerCtx, proc(shard: PubsubTopic): Result[RequestEdgeShardHealth, string] = self.edgeFilterSubStates.withValue(shard, state): return ok(RequestEdgeShardHealth(health: state.currentHealth)) return ok(RequestEdgeShardHealth(health: TopicHealth.NOT_SUBSCRIBED)), ).isOkOr: error "Can't set provider for RequestEdgeShardHealth", error = error RequestEdgeFilterPeerCount.setProvider( self.node.brokerCtx, proc(): Result[RequestEdgeFilterPeerCount, string] = var minPeers = high(int) for state in self.edgeFilterSubStates.values: minPeers = min(minPeers, state.peers.len) if minPeers == high(int): minPeers = 0 return ok(RequestEdgeFilterPeerCount(peerCount: minPeers)), ).isOkOr: error "Can't set provider for RequestEdgeFilterPeerCount", error = error if self.node.wakuRelay.isNil(): return self.startEdgeFilterLoops() # Core mode: auto-subscribe relay to all shards in autosharding. if self.node.wakuAutoSharding.isSome(): let autoSharding = self.node.wakuAutoSharding.get() let clusterId = autoSharding.clusterId let numShards = autoSharding.shardCountGenZero if numShards > 0: var clusterPubsubTopics = newSeqOfCap[PubsubTopic](numShards) for i in 0 ..< numShards: let shardObj = RelayShard(clusterId: clusterId, shardId: uint16(i)) clusterPubsubTopics.add(PubsubTopic($shardObj)) self.subscribePubsubTopics(clusterPubsubTopics).isOkOr: error "Failed to auto-subscribe Relay to cluster shards: ", error = error else: info "SubscriptionManager has no AutoSharding configured; skipping auto-subscribe." return ok() proc stopSubscriptionManager*(self: SubscriptionManager) {.async: (raises: []).} = if self.node.wakuRelay.isNil(): await self.stopEdgeFilterLoops() RequestEdgeShardHealth.clearProvider(self.node.brokerCtx) RequestEdgeFilterPeerCount.clearProvider(self.node.brokerCtx)