import std/[sequtils, sets, tables, options, strutils], chronos, chronicles, results import libp2p/[peerid, peerinfo] import waku/[ waku_core, waku_core/topics, waku_core/topics/sharding, waku_node, waku_relay, waku_filter_v2/common as filter_common, waku_filter_v2/client as filter_client, waku_filter_v2/protocol as filter_protocol, common/broker/broker_context, events/health_events, events/peer_events, requests/health_requests, node/peer_manager, node/health_monitor/topic_health, node/health_monitor/connection_status, ] # --------------------------------------------------------------------------- # Logos Messaging API SubscriptionManager # # Maps all topic subscription intent and centralizes all consistency # maintenance of the pubsub and content topic subscription model across # the various network drivers that handle topics (Edge/Filter and Core/Relay). # --------------------------------------------------------------------------- type EdgeFilterSubState* = object peers: seq[RemotePeerInfo] ## Filter service peers with confirmed subscriptions on this shard. pending: seq[Future[void]] ## In-flight dial futures for peers not yet confirmed. pendingPeers: HashSet[PeerId] ## PeerIds of peers currently being dialed. currentHealth: TopicHealth ## Cached health derived from peers.len; updated on every peer set change. func toTopicHealth*(peersCount: int): TopicHealth = if peersCount >= HealthyThreshold: TopicHealth.SUFFICIENTLY_HEALTHY elif peersCount > 0: TopicHealth.MINIMALLY_HEALTHY else: TopicHealth.UNHEALTHY type SubscriptionManager* = ref object of RootObj node: WakuNode contentTopicSubs: Table[PubsubTopic, HashSet[ContentTopic]] ## Map of Shard to ContentTopic needed because e.g. WakuRelay is PubsubTopic only. ## A present key with an empty HashSet value means pubsubtopic already subscribed ## (via subscribePubsubTopics()) but there's no specific content topic interest yet. edgeFilterSubStates*: Table[PubsubTopic, EdgeFilterSubState] ## Per-shard filter subscription state for edge mode. edgeFilterWakeup: AsyncEvent ## Signalled when the edge filter sub loop should re-reconcile. edgeFilterSubLoopFut: Future[void] edgeFilterHealthLoopFut: Future[void] peerEventListener: WakuPeerEventListener ## Listener for peer connect/disconnect events (edge filter wakeup). iterator subscribedTopics*( self: SubscriptionManager ): (PubsubTopic, HashSet[ContentTopic]) = for pubsub, topics in self.contentTopicSubs.pairs: yield (pubsub, topics) proc edgeFilterPeerCount*(sm: SubscriptionManager, shard: PubsubTopic): int = sm.edgeFilterSubStates.withValue(shard, state): return state.peers.len return 0 proc new*(T: typedesc[SubscriptionManager], node: WakuNode): T = SubscriptionManager( node: node, contentTopicSubs: initTable[PubsubTopic, HashSet[ContentTopic]]() ) proc addContentTopicInterest( self: SubscriptionManager, shard: PubsubTopic, topic: ContentTopic ): Result[void, string] = var changed = false if not self.contentTopicSubs.hasKey(shard): self.contentTopicSubs[shard] = initHashSet[ContentTopic]() changed = true self.contentTopicSubs.withValue(shard, cTopics): if not cTopics[].contains(topic): cTopics[].incl(topic) changed = true if changed and not isNil(self.edgeFilterWakeup): self.edgeFilterWakeup.fire() return ok() proc removeContentTopicInterest( self: SubscriptionManager, shard: PubsubTopic, topic: ContentTopic ): Result[void, string] = var changed = false self.contentTopicSubs.withValue(shard, cTopics): if cTopics[].contains(topic): cTopics[].excl(topic) changed = true if cTopics[].len == 0 and isNil(self.node.wakuRelay): self.contentTopicSubs.del(shard) # We're done with cTopics here if changed and not isNil(self.edgeFilterWakeup): self.edgeFilterWakeup.fire() return ok() proc subscribePubsubTopics( self: SubscriptionManager, shards: seq[PubsubTopic] ): Result[void, string] = if isNil(self.node.wakuRelay): return err("subscribePubsubTopics requires a Relay") var errors: seq[string] = @[] for shard in shards: if not self.contentTopicSubs.hasKey(shard): self.node.subscribe((kind: PubsubSub, topic: shard), nil).isOkOr: errors.add("shard " & shard & ": " & error) continue self.contentTopicSubs[shard] = initHashSet[ContentTopic]() if errors.len > 0: return err("subscribeShard errors: " & errors.join("; ")) return ok() proc getShardForContentTopic( self: SubscriptionManager, topic: ContentTopic ): Result[PubsubTopic, string] = if self.node.wakuAutoSharding.isSome(): let shardObj = ?self.node.wakuAutoSharding.get().getShard(topic) return ok($shardObj) return err("SubscriptionManager requires AutoSharding") proc isSubscribed*( self: SubscriptionManager, topic: ContentTopic ): Result[bool, string] = let shard = ?self.getShardForContentTopic(topic) return ok( self.contentTopicSubs.hasKey(shard) and self.contentTopicSubs[shard].contains(topic) ) proc isSubscribed*( self: SubscriptionManager, shard: PubsubTopic, contentTopic: ContentTopic ): bool {.raises: [].} = self.contentTopicSubs.withValue(shard, cTopics): return cTopics[].contains(contentTopic) return false proc subscribe*(self: SubscriptionManager, topic: ContentTopic): Result[void, string] = if isNil(self.node.wakuRelay) and isNil(self.node.wakuFilterClient): return err("SubscriptionManager requires either Relay or Filter Client.") let shard = ?self.getShardForContentTopic(topic) if not isNil(self.node.wakuRelay) and not self.contentTopicSubs.hasKey(shard): ?self.subscribePubsubTopics(@[shard]) ?self.addContentTopicInterest(shard, topic) return ok() proc unsubscribe*( self: SubscriptionManager, topic: ContentTopic ): Result[void, string] = if isNil(self.node.wakuRelay) and isNil(self.node.wakuFilterClient): return err("SubscriptionManager requires either Relay or Filter Client.") let shard = ?self.getShardForContentTopic(topic) if self.isSubscribed(shard, topic): ?self.removeContentTopicInterest(shard, topic) return ok() # --------------------------------------------------------------------------- # Edge Filter driver for the Logos Messaging API # # The SubscriptionManager absorbs natively the responsibility of using the # Edge Filter protocol to effect subscriptions and message receipt for edge. # --------------------------------------------------------------------------- const EdgeFilterSubscribeTimeout = chronos.seconds(15) ## Timeout for a single filter subscribe/unsubscribe RPC to a service peer. const EdgeFilterPingTimeout = chronos.seconds(5) ## Timeout for a filter ping health check. const EdgeFilterLoopInterval = chronos.seconds(30) ## Interval for the edge filter health ping loop. const EdgeFilterSubLoopDebounce = chronos.seconds(1) ## Debounce delay to coalesce rapid-fire wakeups into a single reconciliation pass. proc updateShardHealth( self: SubscriptionManager, shard: PubsubTopic, state: var EdgeFilterSubState ) = ## Recompute and emit health for a shard after its peer set changed. let newHealth = toTopicHealth(state.peers.len) if newHealth != state.currentHealth: state.currentHealth = newHealth EventShardTopicHealthChange.emit(self.node.brokerCtx, shard, newHealth) proc removePeer(self: SubscriptionManager, shard: PubsubTopic, peerId: PeerId) = ## Remove a peer from edgeFilterSubStates for the given shard, ## update health, and wake the sub loop to dial a replacement. ## Best-effort unsubscribe so the service peer stops pushing to us. self.edgeFilterSubStates.withValue(shard, state): var peer: RemotePeerInfo var found = false for p in state.peers: if p.peerId == peerId: peer = p found = true break if not found: return state.peers.keepItIf(it.peerId != peerId) self.updateShardHealth(shard, state[]) self.edgeFilterWakeup.fire() if not self.node.wakuFilterClient.isNil(): self.contentTopicSubs.withValue(shard, topics): let ct = toSeq(topics[]) if ct.len > 0: proc doUnsubscribe() {.async.} = discard await self.node.wakuFilterClient.unsubscribe(peer, shard, ct) asyncSpawn doUnsubscribe() type SendChunkedFilterRpcKind = enum FilterSubscribe FilterUnsubscribe proc sendChunkedFilterRpc( self: SubscriptionManager, peer: RemotePeerInfo, shard: PubsubTopic, topics: seq[ContentTopic], kind: SendChunkedFilterRpcKind, ): Future[bool] {.async.} = ## Send a chunked filter subscribe or unsubscribe RPC. Returns true on ## success. On failure the peer is removed and false is returned. try: var i = 0 while i < topics.len: let chunk = topics[i ..< min(i + filter_protocol.MaxContentTopicsPerRequest, topics.len)] let fut = case kind of FilterSubscribe: self.node.wakuFilterClient.subscribe(peer, shard, chunk) of FilterUnsubscribe: self.node.wakuFilterClient.unsubscribe(peer, shard, chunk) if not (await fut.withTimeout(EdgeFilterSubscribeTimeout)) or fut.read().isErr(): trace "sendChunkedFilterRpc: chunk failed", op = kind, shard = shard, peer = peer.peerId self.removePeer(shard, peer.peerId) return false i += filter_protocol.MaxContentTopicsPerRequest except CatchableError as exc: debug "sendChunkedFilterRpc: failed", op = kind, shard = shard, peer = peer.peerId, err = exc.msg self.removePeer(shard, peer.peerId) return false return true proc syncFilterDeltas( self: SubscriptionManager, peer: RemotePeerInfo, shard: PubsubTopic, added: seq[ContentTopic], removed: seq[ContentTopic], ) {.async.} = ## Push content topic changes (adds/removes) to an already-tracked peer. if added.len > 0: if not await self.sendChunkedFilterRpc(peer, shard, added, FilterSubscribe): return if removed.len > 0: discard await self.sendChunkedFilterRpc(peer, shard, removed, FilterUnsubscribe) proc dialFilterPeer( self: SubscriptionManager, peer: RemotePeerInfo, shard: PubsubTopic, contentTopics: seq[ContentTopic], ) {.async.} = ## Subscribe a new peer to all content topics on a shard and start tracking it. self.edgeFilterSubStates.withValue(shard, state): state.pendingPeers.incl(peer.peerId) try: if not await self.sendChunkedFilterRpc(peer, shard, contentTopics, FilterSubscribe): return self.edgeFilterSubStates.withValue(shard, state): if state.peers.anyIt(it.peerId == peer.peerId): trace "dialFilterPeer: peer already tracked, skipping duplicate", shard = shard, peer = peer.peerId return state.peers.add(peer) self.updateShardHealth(shard, state[]) trace "dialFilterPeer: successfully subscribed to all chunks", shard = shard, peer = peer.peerId, totalPeers = state.peers.len do: trace "dialFilterPeer: shard removed while subscribing, discarding result", shard = shard, peer = peer.peerId finally: self.edgeFilterSubStates.withValue(shard, state): state.pendingPeers.excl(peer.peerId) proc edgeFilterHealthLoop*(self: SubscriptionManager) {.async.} = ## Periodically pings all connected filter service peers to verify they are ## still alive at the application layer. Peers that fail the ping are removed. while true: await sleepAsync(EdgeFilterLoopInterval) if self.node.wakuFilterClient.isNil(): warn "filter client is nil within edge filter health loop" continue var connected = initTable[PeerId, RemotePeerInfo]() for state in self.edgeFilterSubStates.values: for peer in state.peers: if self.node.peerManager.switch.peerStore.isConnected(peer.peerId): connected[peer.peerId] = peer var alive = initHashSet[PeerId]() if connected.len > 0: var pingTasks: seq[(PeerId, Future[FilterSubscribeResult])] = @[] for peer in connected.values: pingTasks.add( (peer.peerId, self.node.wakuFilterClient.ping(peer, EdgeFilterPingTimeout)) ) # extract future tasks from (PeerId, Future) tuples and await them await allFutures(pingTasks.mapIt(it[1])) for (peerId, task) in pingTasks: if task.read().isOk(): alive.incl(peerId) var changed = false for shard, state in self.edgeFilterSubStates.mpairs: let oldLen = state.peers.len state.peers.keepItIf(it.peerId notin connected or alive.contains(it.peerId)) if state.peers.len < oldLen: changed = true self.updateShardHealth(shard, state) trace "Edge Filter health degraded by Ping failure", shard = shard, new = state.currentHealth if changed: self.edgeFilterWakeup.fire() proc edgeFilterSubLoop*(self: SubscriptionManager) {.async.} = ## Reconciles filter subscriptions with the desired state from SubscriptionManager. var lastSynced = initTable[PubsubTopic, HashSet[ContentTopic]]() while true: await self.edgeFilterWakeup.wait() await sleepAsync(EdgeFilterSubLoopDebounce) self.edgeFilterWakeup.clear() trace "edgeFilterSubLoop: woke up" if isNil(self.node.wakuFilterClient): trace "edgeFilterSubLoop: wakuFilterClient is nil, skipping" continue let desired = self.contentTopicSubs trace "edgeFilterSubLoop: desired state", numShards = desired.len let allShards = toHashSet(toSeq(desired.keys)) + toHashSet(toSeq(lastSynced.keys)) for shard in allShards: let currTopics = desired.getOrDefault(shard) let prevTopics = lastSynced.getOrDefault(shard) if shard notin self.edgeFilterSubStates: self.edgeFilterSubStates[shard] = EdgeFilterSubState(currentHealth: TopicHealth.UNHEALTHY) let addedTopics = toSeq(currTopics - prevTopics) let removedTopics = toSeq(prevTopics - currTopics) self.edgeFilterSubStates.withValue(shard, state): state.peers.keepItIf( self.node.peerManager.switch.peerStore.isConnected(it.peerId) ) state.pending.keepItIf(not it.finished) if addedTopics.len > 0 or removedTopics.len > 0: for peer in state.peers: asyncSpawn self.syncFilterDeltas(peer, shard, addedTopics, removedTopics) if currTopics.len == 0: for fut in state.pending: if not fut.finished: await fut.cancelAndWait() self.edgeFilterSubStates.del(shard) # invalidates `state` — do not use after this else: self.updateShardHealth(shard, state[]) let needed = max(0, HealthyThreshold - state.peers.len - state.pending.len) if needed > 0: let tracked = state.peers.mapIt(it.peerId).toHashSet() + state.pendingPeers var candidates = self.node.peerManager.selectPeers( filter_common.WakuFilterSubscribeCodec, some(shard) ) candidates.keepItIf(it.peerId notin tracked) let toDial = min(needed, candidates.len) trace "edgeFilterSubLoop: shard reconciliation", shard = shard, num_peers = state.peers.len, num_pending = state.pending.len, num_needed = needed, num_available = candidates.len, toDial = toDial for i in 0 ..< toDial: let fut = self.dialFilterPeer(candidates[i], shard, toSeq(currTopics)) state.pending.add(fut) lastSynced = desired proc startEdgeFilterLoops(self: SubscriptionManager): Result[void, string] = ## Start the edge filter orchestration loops. ## Caller must ensure this is only called in edge mode (relay nil, filter client present). self.edgeFilterWakeup = newAsyncEvent() self.peerEventListener = WakuPeerEvent.listen( self.node.brokerCtx, proc(evt: WakuPeerEvent) {.async: (raises: []), gcsafe.} = if evt.kind == WakuPeerEventKind.EventDisconnected or evt.kind == WakuPeerEventKind.EventMetadataUpdated: self.edgeFilterWakeup.fire() , ).valueOr: return err("Failed to listen to peer events for edge filter: " & error) self.edgeFilterSubLoopFut = self.edgeFilterSubLoop() self.edgeFilterHealthLoopFut = self.edgeFilterHealthLoop() return ok() proc stopEdgeFilterLoops(self: SubscriptionManager) {.async: (raises: []).} = ## Stop the edge filter orchestration loops and clean up pending futures. if not isNil(self.edgeFilterSubLoopFut): await self.edgeFilterSubLoopFut.cancelAndWait() self.edgeFilterSubLoopFut = nil if not isNil(self.edgeFilterHealthLoopFut): await self.edgeFilterHealthLoopFut.cancelAndWait() self.edgeFilterHealthLoopFut = nil for shard, state in self.edgeFilterSubStates: for fut in state.pending: if not fut.finished: await fut.cancelAndWait() WakuPeerEvent.dropListener(self.node.brokerCtx, self.peerEventListener) # --------------------------------------------------------------------------- # SubscriptionManager Lifecycle (calls Edge behavior above) # # startSubscriptionManager and stopSubscriptionManager orchestrate both the # core (relay) and edge (filter) paths, and register/clear broker providers. # --------------------------------------------------------------------------- proc startSubscriptionManager*(self: SubscriptionManager): Result[void, string] = # Register edge filter broker providers. The shard/content health providers # in WakuNode query these via the broker as a fallback when relay health is # not available. If edge mode is not active, these providers simply return # NOT_SUBSCRIBED / strength 0, which is harmless. RequestEdgeShardHealth.setProvider( self.node.brokerCtx, proc(shard: PubsubTopic): Result[RequestEdgeShardHealth, string] = self.edgeFilterSubStates.withValue(shard, state): return ok(RequestEdgeShardHealth(health: state.currentHealth)) return ok(RequestEdgeShardHealth(health: TopicHealth.NOT_SUBSCRIBED)), ).isOkOr: error "Can't set provider for RequestEdgeShardHealth", error = error RequestEdgeFilterPeerCount.setProvider( self.node.brokerCtx, proc(): Result[RequestEdgeFilterPeerCount, string] = var minPeers = high(int) for state in self.edgeFilterSubStates.values: minPeers = min(minPeers, state.peers.len) if minPeers == high(int): minPeers = 0 return ok(RequestEdgeFilterPeerCount(peerCount: minPeers)), ).isOkOr: error "Can't set provider for RequestEdgeFilterPeerCount", error = error if self.node.wakuRelay.isNil(): return self.startEdgeFilterLoops() # Core mode: auto-subscribe relay to all shards in autosharding. if self.node.wakuAutoSharding.isSome(): let autoSharding = self.node.wakuAutoSharding.get() let clusterId = autoSharding.clusterId let numShards = autoSharding.shardCountGenZero if numShards > 0: var clusterPubsubTopics = newSeqOfCap[PubsubTopic](numShards) for i in 0 ..< numShards: let shardObj = RelayShard(clusterId: clusterId, shardId: uint16(i)) clusterPubsubTopics.add(PubsubTopic($shardObj)) self.subscribePubsubTopics(clusterPubsubTopics).isOkOr: error "Failed to auto-subscribe Relay to cluster shards: ", error = error else: info "SubscriptionManager has no AutoSharding configured; skipping auto-subscribe." return ok() proc stopSubscriptionManager*(self: SubscriptionManager) {.async: (raises: []).} = if self.node.wakuRelay.isNil(): await self.stopEdgeFilterLoops() RequestEdgeShardHealth.clearProvider(self.node.brokerCtx) RequestEdgeFilterPeerCount.clearProvider(self.node.brokerCtx)