mirror of
https://github.com/logos-messaging/logos-delivery.git
synced 2026-06-04 13:09:32 +00:00
904 lines
33 KiB
Nim
904 lines
33 KiB
Nim
## Subscription engine: content-topic interest tracking, relay-mode pubsub
|
|
## subscription bookkeeping, edge-mode filter peer subscription maintenance.
|
|
## Type bodies live in ./waku_node.nim.
|
|
|
|
import std/[sequtils, sets, tables, options], chronos, chronicles, results, metrics
|
|
import libp2p/[peerid, peerinfo]
|
|
import brokers/broker_context
|
|
|
|
import
|
|
waku/[
|
|
waku_core,
|
|
waku_core/topics,
|
|
waku_core/topics/sharding,
|
|
waku_node,
|
|
waku_relay,
|
|
waku_filter_v2/common as filter_common,
|
|
waku_filter_v2/protocol as filter_protocol,
|
|
waku_archive,
|
|
waku_store_sync,
|
|
api/events/health,
|
|
events/peer_events,
|
|
api/events/message,
|
|
api/requests/health,
|
|
requests/health_requests,
|
|
node/peer_manager,
|
|
node/health_monitor/topic_health,
|
|
node/health_monitor/connection_status,
|
|
]
|
|
import waku/api/requests/filter as kernel_filter_api
|
|
import waku/api/requests/subscription
|
|
|
|
func toTopicHealth*(peersCount: int): TopicHealth =
|
|
if peersCount >= HealthyThreshold:
|
|
TopicHealth.SUFFICIENTLY_HEALTHY
|
|
elif peersCount > 0:
|
|
TopicHealth.MINIMALLY_HEALTHY
|
|
else:
|
|
TopicHealth.UNHEALTHY
|
|
|
|
proc isRelayMounted(self: WakuSubscriptionManager): bool =
|
|
not self.node.wakuRelay.isNil()
|
|
|
|
proc isFilterMounted(self: WakuSubscriptionManager): bool =
|
|
not self.node.wakuFilterClient.isNil()
|
|
|
|
iterator relaySubscribedTopics*(
|
|
self: WakuSubscriptionManager
|
|
): (PubsubTopic, HashSet[ContentTopic]) =
|
|
## Iterate relay-subscribed content topics, batched per shard. Skips shards with no interest.
|
|
for pubsub, topics in self.relayContentTopicSubs.pairs:
|
|
if topics.len == 0:
|
|
continue
|
|
yield (pubsub, topics)
|
|
|
|
iterator edgeSubscribedTopics*(
|
|
self: WakuSubscriptionManager
|
|
): (PubsubTopic, HashSet[ContentTopic]) =
|
|
## Iterate edge-subscribed content topics, batched per shard. Skips shards with no interest.
|
|
for pubsub, topics in self.edgeContentTopicSubs.pairs:
|
|
if topics.len == 0:
|
|
continue
|
|
yield (pubsub, topics)
|
|
|
|
proc edgeFilterPeerCount*(sm: WakuSubscriptionManager, shard: PubsubTopic): int =
|
|
sm.edgeFilterSubStates.withValue(shard, state):
|
|
return state.peers.len
|
|
return 0
|
|
|
|
proc new*(T: typedesc[WakuSubscriptionManager], node: WakuNode): T =
|
|
WakuSubscriptionManager(
|
|
node: node,
|
|
relayContentTopicSubs: initTable[PubsubTopic, HashSet[ContentTopic]](),
|
|
edgeContentTopicSubs: initTable[PubsubTopic, HashSet[ContentTopic]](),
|
|
directShardSubs: initHashSet[PubsubTopic](),
|
|
)
|
|
|
|
# Relay mesh subscription bookkeeping
|
|
|
|
proc registerRelayHandler(
|
|
self: WakuSubscriptionManager,
|
|
shard: PubsubTopic,
|
|
appHandler: WakuRelayHandler = nil,
|
|
): bool =
|
|
## Subscribe the relay mesh to shard with the single fan-out handler. Returns
|
|
## true iff a fresh mesh subscription was created; false if already subscribed
|
|
## (only the optional appHandler is re-recorded). The fan-out handler forwards
|
|
## each message to filter, archive and store-sync, emits MessageSeenEvent, then
|
|
## invokes the optional kernel-API app handler.
|
|
let node = self.node
|
|
let alreadySubscribed = node.wakuRelay.isSubscribed(shard)
|
|
|
|
if not appHandler.isNil():
|
|
if not alreadySubscribed or not node.legacyAppHandlers.hasKey(shard):
|
|
node.legacyAppHandlers[shard] = appHandler
|
|
else:
|
|
debug "Legacy appHandler already exists for active shard, ignoring new handler",
|
|
shard = shard
|
|
|
|
if alreadySubscribed:
|
|
return false
|
|
|
|
proc traceHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
|
|
let msgSizeKB = msg.payload.len / 1000
|
|
waku_node_messages.inc(labelValues = ["relay"])
|
|
waku_histogram_message_size.observe(msgSizeKB)
|
|
|
|
proc filterHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
|
|
if node.wakuFilter.isNil():
|
|
return
|
|
await node.wakuFilter.handleMessage(topic, msg)
|
|
|
|
proc archiveHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
|
|
if node.wakuArchive.isNil():
|
|
return
|
|
await node.wakuArchive.handleMessage(topic, msg)
|
|
|
|
proc syncHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
|
|
if node.wakuStoreReconciliation.isNil():
|
|
return
|
|
node.wakuStoreReconciliation.messageIngress(topic, msg)
|
|
|
|
proc internalHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
|
|
MessageSeenEvent.emit(node.brokerCtx, topic, msg)
|
|
|
|
let uniqueTopicHandler = proc(
|
|
topic: PubsubTopic, msg: WakuMessage
|
|
): Future[void] {.async, gcsafe.} =
|
|
await traceHandler(topic, msg)
|
|
await filterHandler(topic, msg)
|
|
await archiveHandler(topic, msg)
|
|
await syncHandler(topic, msg)
|
|
await internalHandler(topic, msg)
|
|
|
|
# Invoke the kernel-API app handler if one is registered.
|
|
if node.legacyAppHandlers.hasKey(topic) and not node.legacyAppHandlers[topic].isNil():
|
|
await node.legacyAppHandlers[topic](topic, msg)
|
|
|
|
node.wakuRelay.subscribe(shard, uniqueTopicHandler)
|
|
return true
|
|
|
|
proc meshSubscribe(
|
|
self: WakuSubscriptionManager, shard: PubsubTopic, handler: WakuRelayHandler = nil
|
|
) =
|
|
## Idempotent relay-mesh subscribe. Emits PubsubSub only on a fresh mesh sub.
|
|
if self.registerRelayHandler(shard, handler):
|
|
self.node.topicSubscriptionQueue.emit((kind: SubscriptionKind.PubsubSub, topic: shard))
|
|
|
|
proc meshUnsubscribe(self: WakuSubscriptionManager, shard: PubsubTopic) =
|
|
## Tear down the relay-mesh subscription for shard and drop its app handler.
|
|
## Emits PubsubUnsub only if the mesh was actually subscribed.
|
|
if self.node.legacyAppHandlers.hasKey(shard):
|
|
self.node.legacyAppHandlers.del(shard)
|
|
if self.node.wakuRelay.isSubscribed(shard):
|
|
self.node.wakuRelay.unsubscribe(shard)
|
|
self.node.topicSubscriptionQueue.emit(
|
|
(kind: SubscriptionKind.PubsubUnsub, topic: shard)
|
|
)
|
|
|
|
proc held(self: WakuSubscriptionManager, shard: PubsubTopic): bool =
|
|
## A shard's relay-mesh subscription is held while it has a direct shard
|
|
## subscription or any relay content-topic interest. Edge interest does not
|
|
## hold the mesh.
|
|
self.directShardSubs.contains(shard) or
|
|
self.relayContentTopicSubs.getOrDefault(shard).len > 0
|
|
|
|
proc resolveShard(
|
|
self: WakuSubscriptionManager,
|
|
topic: ContentTopic,
|
|
shardOp: Option[PubsubTopic],
|
|
): Result[PubsubTopic, string] =
|
|
## Derive the shard for a content topic: use shardOp when given (required
|
|
## under static/manual sharding), otherwise auto-shard.
|
|
let shardObj = ?deduceRelayShard(self.node, topic, shardOp)
|
|
return ok(PubsubTopic($shardObj))
|
|
|
|
# Relay content-topic interest
|
|
|
|
proc addRelayContentTopicInterest(
|
|
self: WakuSubscriptionManager, shard: PubsubTopic, topic: ContentTopic
|
|
) =
|
|
if not self.relayContentTopicSubs.hasKey(shard):
|
|
self.relayContentTopicSubs[shard] = initHashSet[ContentTopic]()
|
|
self.relayContentTopicSubs.withValue(shard, cTopics):
|
|
cTopics[].incl(topic)
|
|
|
|
proc removeRelayContentTopicInterest(
|
|
self: WakuSubscriptionManager, shard: PubsubTopic, topic: ContentTopic
|
|
) =
|
|
self.relayContentTopicSubs.withValue(shard, cTopics):
|
|
cTopics[].excl(topic)
|
|
if cTopics[].len == 0:
|
|
self.relayContentTopicSubs.del(shard)
|
|
|
|
# Edge content-topic interest (drives the filter maintenance loop)
|
|
|
|
proc addEdgeContentTopicInterest(
|
|
self: WakuSubscriptionManager, shard: PubsubTopic, topic: ContentTopic
|
|
) =
|
|
var changed = false
|
|
if not self.edgeContentTopicSubs.hasKey(shard):
|
|
self.edgeContentTopicSubs[shard] = initHashSet[ContentTopic]()
|
|
changed = true
|
|
self.edgeContentTopicSubs.withValue(shard, cTopics):
|
|
if not cTopics[].contains(topic):
|
|
cTopics[].incl(topic)
|
|
changed = true
|
|
if changed and not isNil(self.edgeFilterWakeup):
|
|
self.edgeFilterWakeup.fire()
|
|
|
|
proc removeEdgeContentTopicInterest(
|
|
self: WakuSubscriptionManager, shard: PubsubTopic, topic: ContentTopic
|
|
) =
|
|
var changed = false
|
|
self.edgeContentTopicSubs.withValue(shard, cTopics):
|
|
if cTopics[].contains(topic):
|
|
cTopics[].excl(topic)
|
|
changed = true
|
|
if cTopics[].len == 0:
|
|
self.edgeContentTopicSubs.del(shard)
|
|
if changed and not isNil(self.edgeFilterWakeup):
|
|
self.edgeFilterWakeup.fire()
|
|
|
|
proc isRelaySubscribed*(
|
|
self: WakuSubscriptionManager, shard: PubsubTopic, contentTopic: ContentTopic
|
|
): bool {.raises: [].} =
|
|
self.relayContentTopicSubs.withValue(shard, cTopics):
|
|
return cTopics[].contains(contentTopic)
|
|
return false
|
|
|
|
proc isEdgeSubscribed*(
|
|
self: WakuSubscriptionManager, shard: PubsubTopic, contentTopic: ContentTopic
|
|
): bool {.raises: [].} =
|
|
self.edgeContentTopicSubs.withValue(shard, cTopics):
|
|
return cTopics[].contains(contentTopic)
|
|
return false
|
|
|
|
# The four-operation subscription surface.
|
|
# subscribeShard/unsubscribeShard: direct (0/1) shard interest.
|
|
# subscribeContentTopic/unsubscribeContentTopic: per-content-topic interest.
|
|
# Content-topic ops take an optional shard: derived under auto-sharding,
|
|
# supplied under static/manual sharding. A shard's relay-mesh subscription is
|
|
# held while a direct shard subscription or any content-topic interest keeps it;
|
|
# the pubsub topic is torn down when nothing holds it.
|
|
|
|
proc subscribeShard*(
|
|
self: WakuSubscriptionManager,
|
|
shard: PubsubTopic,
|
|
handler: WakuRelayHandler = nil,
|
|
): Result[void, string] =
|
|
if not self.isRelayMounted() and not self.isFilterMounted():
|
|
return err("WakuSubscriptionManager requires either Relay or Filter Client.")
|
|
|
|
self.directShardSubs.incl(shard)
|
|
if self.isRelayMounted():
|
|
self.meshSubscribe(shard, handler)
|
|
|
|
return ok()
|
|
|
|
proc unsubscribeShard*(
|
|
self: WakuSubscriptionManager, shard: PubsubTopic
|
|
): Result[void, string] =
|
|
if not self.isRelayMounted() and not self.isFilterMounted():
|
|
return err("WakuSubscriptionManager requires either Relay or Filter Client.")
|
|
|
|
# Remove the direct interest only; the pubsub topic stays up if content-topic interest holds it.
|
|
self.directShardSubs.excl(shard)
|
|
if self.isRelayMounted() and not self.held(shard):
|
|
self.meshUnsubscribe(shard)
|
|
|
|
return ok()
|
|
|
|
# Relay content-topic subscription (gossipsub mesh)
|
|
|
|
proc relaySubscribeContentTopic*(
|
|
self: WakuSubscriptionManager,
|
|
topic: ContentTopic,
|
|
shardOp: Option[PubsubTopic] = none[PubsubTopic](),
|
|
): Result[void, string] =
|
|
if not self.isRelayMounted():
|
|
return err("relaySubscribeContentTopic requires Relay mounted.")
|
|
|
|
let shard = ?self.resolveShard(topic, shardOp)
|
|
self.meshSubscribe(shard, nil)
|
|
self.addRelayContentTopicInterest(shard, topic)
|
|
return ok()
|
|
|
|
proc relayUnsubscribeContentTopic*(
|
|
self: WakuSubscriptionManager,
|
|
topic: ContentTopic,
|
|
shardOp: Option[PubsubTopic] = none[PubsubTopic](),
|
|
): Result[void, string] =
|
|
if not self.isRelayMounted():
|
|
return err("relayUnsubscribeContentTopic requires Relay mounted.")
|
|
|
|
let shard = ?self.resolveShard(topic, shardOp)
|
|
self.removeRelayContentTopicInterest(shard, topic)
|
|
|
|
# Tear the mesh down only when nothing holds it.
|
|
if not self.held(shard):
|
|
self.meshUnsubscribe(shard)
|
|
|
|
return ok()
|
|
|
|
# Edge content-topic subscription (filter; driver reconciles peers)
|
|
|
|
proc edgeSubscribe*(
|
|
self: WakuSubscriptionManager,
|
|
topic: ContentTopic,
|
|
shardOp: Option[PubsubTopic] = none[PubsubTopic](),
|
|
): Result[void, string] =
|
|
if not self.isFilterMounted():
|
|
return err("edgeSubscribe requires a Filter Client mounted.")
|
|
|
|
let shard = ?self.resolveShard(topic, shardOp)
|
|
self.addEdgeContentTopicInterest(shard, topic)
|
|
return ok()
|
|
|
|
proc edgeUnsubscribe*(
|
|
self: WakuSubscriptionManager,
|
|
topic: ContentTopic,
|
|
shardOp: Option[PubsubTopic] = none[PubsubTopic](),
|
|
): Result[void, string] =
|
|
if not self.isFilterMounted():
|
|
return err("edgeUnsubscribe requires a Filter Client mounted.")
|
|
|
|
let shard = ?self.resolveShard(topic, shardOp)
|
|
self.removeEdgeContentTopicInterest(shard, topic)
|
|
return ok()
|
|
|
|
# Edge Filter driver
|
|
|
|
const EdgeFilterSubscribeTimeout = chronos.seconds(15)
|
|
## Timeout for a single filter subscribe/unsubscribe RPC to a service peer.
|
|
const EdgeFilterPingTimeout = chronos.seconds(5)
|
|
## Timeout for a filter ping.
|
|
const EdgeFilterLoopInterval = chronos.seconds(30)
|
|
## Interval for the edge filter maintenance loop.
|
|
const EdgeFilterSubLoopDebounce = chronos.seconds(1)
|
|
## Debounce delay to coalesce wakeups into a single reconciliation pass.
|
|
|
|
type EdgeDialTask = object
|
|
peer: RemotePeerInfo
|
|
shard: PubsubTopic
|
|
topics: seq[ContentTopic]
|
|
|
|
proc updateShardHealth(
|
|
self: WakuSubscriptionManager, shard: PubsubTopic, state: var EdgeFilterSubState
|
|
) =
|
|
## Recompute and emit health for a shard after its peer set changed.
|
|
let newHealth = toTopicHealth(state.peers.len)
|
|
if newHealth != state.currentHealth:
|
|
state.currentHealth = newHealth
|
|
EventShardTopicHealthChange.emit(self.node.brokerCtx, shard, newHealth)
|
|
|
|
proc removePeer(self: WakuSubscriptionManager, shard: PubsubTopic, peerId: PeerId) =
|
|
## Remove a peer from edgeFilterSubStates for the shard, update health, and
|
|
## wake the sub loop to dial a replacement. Best-effort unsubscribe.
|
|
self.edgeFilterSubStates.withValue(shard, state):
|
|
var peer: RemotePeerInfo
|
|
var found = false
|
|
for p in state.peers:
|
|
if p.peerId == peerId:
|
|
peer = p
|
|
found = true
|
|
break
|
|
if not found:
|
|
return
|
|
|
|
state.peers.keepItIf(it.peerId != peerId)
|
|
self.updateShardHealth(shard, state[])
|
|
self.edgeFilterWakeup.fire()
|
|
|
|
if self.isFilterMounted():
|
|
self.edgeContentTopicSubs.withValue(shard, topics):
|
|
let ct = toSeq(topics[])
|
|
if ct.len > 0:
|
|
let brokerCtx = self.node.brokerCtx
|
|
proc doUnsubscribe() {.async.} =
|
|
discard await kernel_filter_api.RequestFilterUnsubscribe.request(
|
|
brokerCtx, peer, shard, ct
|
|
)
|
|
|
|
asyncSpawn doUnsubscribe()
|
|
|
|
type SendChunkedFilterRpcKind = enum
|
|
FilterSubscribe
|
|
FilterUnsubscribe
|
|
|
|
proc sendChunkedFilterRpc(
|
|
self: WakuSubscriptionManager,
|
|
peer: RemotePeerInfo,
|
|
shard: PubsubTopic,
|
|
topics: seq[ContentTopic],
|
|
kind: SendChunkedFilterRpcKind,
|
|
): Future[bool] {.async.} =
|
|
## Send a chunked filter subscribe or unsubscribe RPC. Returns true on
|
|
## success. On failure the peer is removed and false returned.
|
|
try:
|
|
var i = 0
|
|
while i < topics.len:
|
|
let chunk =
|
|
topics[i ..< min(i + filter_protocol.MaxContentTopicsPerRequest, topics.len)]
|
|
var failed = false
|
|
case kind
|
|
of FilterSubscribe:
|
|
let fut = kernel_filter_api.RequestFilterSubscribe.request(
|
|
self.node.brokerCtx, peer, shard, chunk
|
|
)
|
|
if not (await fut.withTimeout(EdgeFilterSubscribeTimeout)) or fut.read().isErr():
|
|
failed = true
|
|
of FilterUnsubscribe:
|
|
let fut = kernel_filter_api.RequestFilterUnsubscribe.request(
|
|
self.node.brokerCtx, peer, shard, chunk
|
|
)
|
|
if not (await fut.withTimeout(EdgeFilterSubscribeTimeout)) or fut.read().isErr():
|
|
failed = true
|
|
if failed:
|
|
trace "sendChunkedFilterRpc: chunk failed",
|
|
op = kind, shard = shard, peer = peer.peerId
|
|
self.removePeer(shard, peer.peerId)
|
|
return false
|
|
i += filter_protocol.MaxContentTopicsPerRequest
|
|
except CatchableError as exc:
|
|
debug "sendChunkedFilterRpc: failed",
|
|
op = kind, shard = shard, peer = peer.peerId, err = exc.msg
|
|
self.removePeer(shard, peer.peerId)
|
|
return false
|
|
return true
|
|
|
|
proc syncFilterDeltas(
|
|
self: WakuSubscriptionManager,
|
|
peer: RemotePeerInfo,
|
|
shard: PubsubTopic,
|
|
added: seq[ContentTopic],
|
|
removed: seq[ContentTopic],
|
|
) {.async.} =
|
|
## Push content topic changes (adds/removes) to an already-tracked peer.
|
|
if added.len > 0:
|
|
if not await self.sendChunkedFilterRpc(peer, shard, added, FilterSubscribe):
|
|
return
|
|
|
|
if removed.len > 0:
|
|
discard await self.sendChunkedFilterRpc(peer, shard, removed, FilterUnsubscribe)
|
|
|
|
proc dialFilterPeer(
|
|
self: WakuSubscriptionManager,
|
|
peer: RemotePeerInfo,
|
|
shard: PubsubTopic,
|
|
contentTopics: seq[ContentTopic],
|
|
) {.async.} =
|
|
## Subscribe a new peer to all content topics on a shard and start tracking it.
|
|
self.edgeFilterSubStates.withValue(shard, state):
|
|
state.pendingPeers.incl(peer.peerId)
|
|
|
|
try:
|
|
if not await self.sendChunkedFilterRpc(peer, shard, contentTopics, FilterSubscribe):
|
|
return
|
|
|
|
self.edgeFilterSubStates.withValue(shard, state):
|
|
if state.peers.anyIt(it.peerId == peer.peerId):
|
|
trace "dialFilterPeer: peer already tracked, skipping duplicate",
|
|
shard = shard, peer = peer.peerId
|
|
return
|
|
|
|
state.peers.add(peer)
|
|
self.updateShardHealth(shard, state[])
|
|
trace "dialFilterPeer: successfully subscribed to all chunks",
|
|
shard = shard, peer = peer.peerId, totalPeers = state.peers.len
|
|
do:
|
|
trace "dialFilterPeer: shard removed while subscribing, discarding result",
|
|
shard = shard, peer = peer.peerId
|
|
finally:
|
|
self.edgeFilterSubStates.withValue(shard, state):
|
|
state.pendingPeers.excl(peer.peerId)
|
|
|
|
proc pingFilterPeer(
|
|
self: WakuSubscriptionManager, peerId: PeerId, peer: RemotePeerInfo
|
|
): Future[(PeerId, bool)] {.async: (raises: []).} =
|
|
let req = (
|
|
await kernel_filter_api.RequestFilterPing.request(
|
|
self.node.brokerCtx, peer, EdgeFilterPingTimeout
|
|
)
|
|
).valueOr:
|
|
return (peerId, false)
|
|
return (peerId, req.pingOk)
|
|
|
|
proc edgeFilterMaintenanceLoop*(self: WakuSubscriptionManager) {.async.} =
|
|
## Periodically pings all connected filter service peers. Peers that fail the ping are removed.
|
|
while true:
|
|
await sleepAsync(EdgeFilterLoopInterval)
|
|
|
|
if not self.isFilterMounted():
|
|
warn "filter client is nil within edge filter maintenance loop"
|
|
continue
|
|
|
|
var connected = initTable[PeerId, RemotePeerInfo]()
|
|
for state in self.edgeFilterSubStates.values:
|
|
for peer in state.peers:
|
|
if self.node.peerManager.switch.peerStore.isConnected(peer.peerId):
|
|
connected[peer.peerId] = peer
|
|
|
|
var alive = initHashSet[PeerId]()
|
|
|
|
if connected.len > 0:
|
|
# Ping all connected peers concurrently; survivors go in `alive`.
|
|
var pingFuts: seq[Future[(PeerId, bool)]]
|
|
for peerId, peer in connected:
|
|
pingFuts.add(self.pingFilterPeer(peerId, peer))
|
|
for f in pingFuts:
|
|
let (peerId, ok) = await f
|
|
if ok:
|
|
alive.incl(peerId)
|
|
|
|
var changed = false
|
|
for shard, state in self.edgeFilterSubStates.mpairs:
|
|
let oldLen = state.peers.len
|
|
state.peers.keepItIf(it.peerId notin connected or alive.contains(it.peerId))
|
|
|
|
if state.peers.len < oldLen:
|
|
changed = true
|
|
self.updateShardHealth(shard, state)
|
|
trace "Edge Filter health degraded by Ping failure",
|
|
shard = shard, new = state.currentHealth
|
|
|
|
if changed:
|
|
self.edgeFilterWakeup.fire()
|
|
|
|
proc selectFilterCandidates(
|
|
self: WakuSubscriptionManager, shard: PubsubTopic, exclude: HashSet[PeerId], needed: int
|
|
): seq[RemotePeerInfo] =
|
|
## Select filter service peer candidates for a shard.
|
|
|
|
# Start with every filter server peer that can serve the shard
|
|
var allCandidates = self.node.peerManager.selectPeers(
|
|
filter_common.WakuFilterSubscribeCodec, some(shard)
|
|
)
|
|
|
|
# Remove all already used in this shard or being dialed for it
|
|
allCandidates.keepItIf(it.peerId notin exclude)
|
|
|
|
# Collect peer IDs already tracked on other shards
|
|
var trackedOnOther = initHashSet[PeerId]()
|
|
for otherShard, otherState in self.edgeFilterSubStates.pairs:
|
|
if otherShard != shard:
|
|
for peer in otherState.peers:
|
|
trackedOnOther.incl(peer.peerId)
|
|
|
|
# Prefer peers we already have a connection to first, preserving shuffle
|
|
var candidates =
|
|
allCandidates.filterIt(it.peerId in trackedOnOther) &
|
|
allCandidates.filterIt(it.peerId notin trackedOnOther)
|
|
|
|
# We need to return 'needed' peers only
|
|
if candidates.len > needed:
|
|
candidates.setLen(needed)
|
|
return candidates
|
|
|
|
proc edgeFilterSubLoop*(self: WakuSubscriptionManager) {.async.} =
|
|
## Reconciles filter subscriptions with the desired state.
|
|
var lastSynced = initTable[PubsubTopic, HashSet[ContentTopic]]()
|
|
|
|
while true:
|
|
await self.edgeFilterWakeup.wait()
|
|
await sleepAsync(EdgeFilterSubLoopDebounce)
|
|
self.edgeFilterWakeup.clear()
|
|
trace "edgeFilterSubLoop: woke up"
|
|
|
|
if not self.isFilterMounted():
|
|
trace "edgeFilterSubLoop: wakuFilterClient is nil, skipping"
|
|
continue
|
|
|
|
let desired = self.edgeContentTopicSubs
|
|
|
|
trace "edgeFilterSubLoop: desired state", numShards = desired.len
|
|
|
|
let allShards = toHashSet(toSeq(desired.keys)) + toHashSet(toSeq(lastSynced.keys))
|
|
|
|
# Step 1: read state across all shards; build dial tasks and shards to delete.
|
|
|
|
var dialTasks: seq[EdgeDialTask]
|
|
var shardsToDelete: seq[PubsubTopic]
|
|
|
|
for shard in allShards:
|
|
let currTopics = desired.getOrDefault(shard)
|
|
let prevTopics = lastSynced.getOrDefault(shard)
|
|
|
|
if shard notin self.edgeFilterSubStates:
|
|
self.edgeFilterSubStates[shard] =
|
|
EdgeFilterSubState(currentHealth: TopicHealth.UNHEALTHY)
|
|
|
|
let addedTopics = toSeq(currTopics - prevTopics)
|
|
let removedTopics = toSeq(prevTopics - currTopics)
|
|
|
|
self.edgeFilterSubStates.withValue(shard, state):
|
|
state.peers.keepItIf(
|
|
self.node.peerManager.switch.peerStore.isConnected(it.peerId)
|
|
)
|
|
state.pending.keepItIf(not it.finished)
|
|
|
|
if addedTopics.len > 0 or removedTopics.len > 0:
|
|
for peer in state.peers:
|
|
asyncSpawn self.syncFilterDeltas(peer, shard, addedTopics, removedTopics)
|
|
|
|
if currTopics.len == 0:
|
|
shardsToDelete.add(shard)
|
|
else:
|
|
self.updateShardHealth(shard, state[])
|
|
|
|
let needed = max(0, HealthyThreshold - state.peers.len - state.pending.len)
|
|
|
|
if needed > 0:
|
|
let tracked = state.peers.mapIt(it.peerId).toHashSet() + state.pendingPeers
|
|
let candidates = self.selectFilterCandidates(shard, tracked, needed)
|
|
let toDial = min(needed, candidates.len)
|
|
|
|
trace "edgeFilterSubLoop: shard reconciliation",
|
|
shard = shard,
|
|
num_peers = state.peers.len,
|
|
num_pending = state.pending.len,
|
|
num_needed = needed,
|
|
num_available = candidates.len,
|
|
toDial = toDial
|
|
|
|
for i in 0 ..< toDial:
|
|
dialTasks.add(
|
|
EdgeDialTask(
|
|
peer: candidates[i], shard: shard, topics: toSeq(currTopics)
|
|
)
|
|
)
|
|
|
|
# Step 2: execute deferred shard deletion and dial tasks.
|
|
|
|
for shard in shardsToDelete:
|
|
self.edgeFilterSubStates.withValue(shard, state):
|
|
for fut in state.pending:
|
|
if not fut.finished:
|
|
await fut.cancelAndWait()
|
|
self.edgeFilterSubStates.del(shard)
|
|
|
|
for task in dialTasks:
|
|
let fut = self.dialFilterPeer(task.peer, task.shard, task.topics)
|
|
self.edgeFilterSubStates.withValue(task.shard, state):
|
|
state.pending.add(fut)
|
|
|
|
lastSynced = desired
|
|
|
|
proc startEdgeFilterLoops(self: WakuSubscriptionManager): Result[void, string] =
|
|
## Start the edge filter orchestration loops.
|
|
## Only valid in edge mode (relay nil, filter client present).
|
|
self.edgeFilterWakeup = newAsyncEvent()
|
|
|
|
self.peerEventListener = EventWakuPeer.listen(
|
|
self.node.brokerCtx,
|
|
proc(evt: EventWakuPeer) {.async: (raises: []), gcsafe.} =
|
|
if evt.kind == EventWakuPeerKind.EventDisconnected or
|
|
evt.kind == EventWakuPeerKind.EventMetadataUpdated:
|
|
self.edgeFilterWakeup.fire()
|
|
,
|
|
).valueOr:
|
|
return err("Failed to listen to peer events for edge filter: " & error)
|
|
|
|
self.edgeFilterSubLoopFut = self.edgeFilterSubLoop()
|
|
self.edgeFilterMaintenanceLoopFut = self.edgeFilterMaintenanceLoop()
|
|
return ok()
|
|
|
|
proc stopEdgeFilterLoops(self: WakuSubscriptionManager) {.async: (raises: []).} =
|
|
## Stop the edge filter orchestration loops and clean up pending futures.
|
|
if not isNil(self.edgeFilterSubLoopFut):
|
|
await self.edgeFilterSubLoopFut.cancelAndWait()
|
|
self.edgeFilterSubLoopFut = nil
|
|
|
|
if not isNil(self.edgeFilterMaintenanceLoopFut):
|
|
await self.edgeFilterMaintenanceLoopFut.cancelAndWait()
|
|
self.edgeFilterMaintenanceLoopFut = nil
|
|
|
|
for shard, state in self.edgeFilterSubStates:
|
|
for fut in state.pending:
|
|
if not fut.finished:
|
|
await fut.cancelAndWait()
|
|
|
|
await EventWakuPeer.dropListener(self.node.brokerCtx, self.peerEventListener)
|
|
|
|
# WakuSubscriptionManager lifecycle.
|
|
# start/stopWakuSubscriptionManager orchestrate the relay and edge paths and
|
|
# register/clear broker providers.
|
|
|
|
proc startWakuSubscriptionManager*(self: WakuSubscriptionManager): Result[void, string] =
|
|
RequestEdgeShardHealth.setProvider(
|
|
self.node.brokerCtx,
|
|
proc(shard: PubsubTopic): Result[RequestEdgeShardHealth, string] =
|
|
self.edgeFilterSubStates.withValue(shard, state):
|
|
return ok(RequestEdgeShardHealth(health: state.currentHealth))
|
|
return ok(RequestEdgeShardHealth(health: TopicHealth.NOT_SUBSCRIBED)),
|
|
).isOkOr:
|
|
error "Can't set provider for RequestEdgeShardHealth", error = error
|
|
|
|
RequestEdgeFilterPeerCount.setProvider(
|
|
self.node.brokerCtx,
|
|
proc(): Result[RequestEdgeFilterPeerCount, string] =
|
|
var minPeers = high(int)
|
|
for state in self.edgeFilterSubStates.values:
|
|
minPeers = min(minPeers, state.peers.len)
|
|
if minPeers == high(int):
|
|
minPeers = 0
|
|
return ok(RequestEdgeFilterPeerCount(peerCount: minPeers)),
|
|
).isOkOr:
|
|
error "Can't set provider for RequestEdgeFilterPeerCount", error = error
|
|
|
|
# The four-operation subscription surface on the broker.
|
|
RequestRelaySubscribeShard.setProvider(
|
|
self.node.brokerCtx,
|
|
proc(shard: PubsubTopic): Result[RequestRelaySubscribeShard, string] =
|
|
self.subscribeShard(shard).isOkOr:
|
|
return err(error)
|
|
return ok(RequestRelaySubscribeShard(subscribed: true)),
|
|
).isOkOr:
|
|
error "Can't set provider for RequestRelaySubscribeShard", error = error
|
|
|
|
RequestRelayUnsubscribeShard.setProvider(
|
|
self.node.brokerCtx,
|
|
proc(shard: PubsubTopic): Result[RequestRelayUnsubscribeShard, string] =
|
|
self.unsubscribeShard(shard).isOkOr:
|
|
return err(error)
|
|
return ok(RequestRelayUnsubscribeShard(unsubscribed: true)),
|
|
).isOkOr:
|
|
error "Can't set provider for RequestRelayUnsubscribeShard", error = error
|
|
|
|
RequestRelaySubscribeContentTopic.setProvider(
|
|
self.node.brokerCtx,
|
|
proc(
|
|
contentTopic: ContentTopic, shard: Option[PubsubTopic]
|
|
): Result[RequestRelaySubscribeContentTopic, string] =
|
|
self.relaySubscribeContentTopic(contentTopic, shard).isOkOr:
|
|
return err(error)
|
|
return ok(RequestRelaySubscribeContentTopic(subscribed: true)),
|
|
).isOkOr:
|
|
error "Can't set provider for RequestRelaySubscribeContentTopic", error = error
|
|
|
|
RequestRelayUnsubscribeContentTopic.setProvider(
|
|
self.node.brokerCtx,
|
|
proc(
|
|
contentTopic: ContentTopic, shard: Option[PubsubTopic]
|
|
): Result[RequestRelayUnsubscribeContentTopic, string] =
|
|
self.relayUnsubscribeContentTopic(contentTopic, shard).isOkOr:
|
|
return err(error)
|
|
return ok(RequestRelayUnsubscribeContentTopic(unsubscribed: true)),
|
|
).isOkOr:
|
|
error "Can't set provider for RequestRelayUnsubscribeContentTopic", error = error
|
|
|
|
RequestEdgeSubscribe.setProvider(
|
|
self.node.brokerCtx,
|
|
proc(
|
|
contentTopic: ContentTopic, shard: Option[PubsubTopic]
|
|
): Result[RequestEdgeSubscribe, string] =
|
|
self.edgeSubscribe(contentTopic, shard).isOkOr:
|
|
return err(error)
|
|
return ok(RequestEdgeSubscribe(subscribed: true)),
|
|
).isOkOr:
|
|
error "Can't set provider for RequestEdgeSubscribe", error = error
|
|
|
|
RequestEdgeUnsubscribe.setProvider(
|
|
self.node.brokerCtx,
|
|
proc(
|
|
contentTopic: ContentTopic, shard: Option[PubsubTopic]
|
|
): Result[RequestEdgeUnsubscribe, string] =
|
|
self.edgeUnsubscribe(contentTopic, shard).isOkOr:
|
|
return err(error)
|
|
return ok(RequestEdgeUnsubscribe(unsubscribed: true)),
|
|
).isOkOr:
|
|
error "Can't set provider for RequestEdgeUnsubscribe", error = error
|
|
|
|
RequestIsRelaySubscribed.setProvider(
|
|
self.node.brokerCtx,
|
|
proc(
|
|
contentTopic: ContentTopic, shard: Option[PubsubTopic]
|
|
): Result[RequestIsRelaySubscribed, string] =
|
|
let resolved = ?self.resolveShard(contentTopic, shard)
|
|
return ok(
|
|
RequestIsRelaySubscribed(subscribed: self.isRelaySubscribed(resolved, contentTopic))
|
|
),
|
|
).isOkOr:
|
|
error "Can't set provider for RequestIsRelaySubscribed", error = error
|
|
|
|
RequestIsEdgeSubscribed.setProvider(
|
|
self.node.brokerCtx,
|
|
proc(
|
|
contentTopic: ContentTopic, shard: Option[PubsubTopic]
|
|
): Result[RequestIsEdgeSubscribed, string] =
|
|
let resolved = ?self.resolveShard(contentTopic, shard)
|
|
return ok(
|
|
RequestIsEdgeSubscribed(subscribed: self.isEdgeSubscribed(resolved, contentTopic))
|
|
),
|
|
).isOkOr:
|
|
error "Can't set provider for RequestIsEdgeSubscribed", error = error
|
|
|
|
RequestIsSubscribed.setProvider(
|
|
self.node.brokerCtx,
|
|
proc(
|
|
contentTopic: ContentTopic, shard: Option[PubsubTopic]
|
|
): Result[RequestIsSubscribed, string] =
|
|
let resolved = ?self.resolveShard(contentTopic, shard)
|
|
# Default multiplexing: relay if mounted, else edge.
|
|
return ok(
|
|
RequestIsSubscribed(
|
|
subscribed:
|
|
if self.isRelayMounted():
|
|
self.isRelaySubscribed(resolved, contentTopic)
|
|
else:
|
|
self.isEdgeSubscribed(resolved, contentTopic)
|
|
)
|
|
),
|
|
).isOkOr:
|
|
error "Can't set provider for RequestIsSubscribed", error = error
|
|
|
|
RequestRelaySubscribedTopics.setProvider(
|
|
self.node.brokerCtx,
|
|
proc(): Result[RequestRelaySubscribedTopics, string] =
|
|
var topics: seq[tuple[shard: PubsubTopic, contentTopics: seq[ContentTopic]]]
|
|
for shard, cTopics in self.relaySubscribedTopics:
|
|
topics.add((shard: shard, contentTopics: toSeq(cTopics)))
|
|
return ok(RequestRelaySubscribedTopics(topics: topics)),
|
|
).isOkOr:
|
|
error "Can't set provider for RequestRelaySubscribedTopics", error = error
|
|
|
|
# Default multiplexing: relay if mounted, else edge.
|
|
RequestSubscribedTopics.setProvider(
|
|
self.node.brokerCtx,
|
|
proc(): Result[RequestSubscribedTopics, string] =
|
|
var topics: seq[tuple[shard: PubsubTopic, contentTopics: seq[ContentTopic]]]
|
|
if self.isRelayMounted():
|
|
for shard, cTopics in self.relaySubscribedTopics:
|
|
topics.add((shard: shard, contentTopics: toSeq(cTopics)))
|
|
else:
|
|
for shard, cTopics in self.edgeSubscribedTopics:
|
|
topics.add((shard: shard, contentTopics: toSeq(cTopics)))
|
|
return ok(RequestSubscribedTopics(topics: topics)),
|
|
).isOkOr:
|
|
error "Can't set provider for RequestSubscribedTopics", error = error
|
|
|
|
RequestEdgeSubscribedTopics.setProvider(
|
|
self.node.brokerCtx,
|
|
proc(): Result[RequestEdgeSubscribedTopics, string] =
|
|
var topics: seq[tuple[shard: PubsubTopic, contentTopics: seq[ContentTopic]]]
|
|
for shard, cTopics in self.edgeSubscribedTopics:
|
|
topics.add((shard: shard, contentTopics: toSeq(cTopics)))
|
|
return ok(RequestEdgeSubscribedTopics(topics: topics)),
|
|
).isOkOr:
|
|
error "Can't set provider for RequestEdgeSubscribedTopics", error = error
|
|
|
|
# Fan out shard-health changes to per-content-topic health events. A content
|
|
# topic's health is its shard's health. Set up in both modes.
|
|
self.shardHealthListener = EventShardTopicHealthChange.listen(
|
|
self.node.brokerCtx,
|
|
proc(evt: EventShardTopicHealthChange) {.async: (raises: []), gcsafe.} =
|
|
let cTopics =
|
|
self.relayContentTopicSubs.getOrDefault(evt.topic) +
|
|
self.edgeContentTopicSubs.getOrDefault(evt.topic)
|
|
for ct in cTopics:
|
|
EventContentTopicHealthChange.emit(self.node.brokerCtx, ct, evt.health)
|
|
,
|
|
).valueOr:
|
|
return err("Failed to listen to shard health events: " & error)
|
|
|
|
if not self.isRelayMounted():
|
|
return self.startEdgeFilterLoops()
|
|
|
|
# Core mode: auto-subscribe relay to all autosharding shards.
|
|
if self.node.wakuAutoSharding.isSome():
|
|
let autoSharding = self.node.wakuAutoSharding.get()
|
|
let clusterId = autoSharding.clusterId
|
|
let numShards = autoSharding.shardCountGenZero
|
|
|
|
if numShards > 0:
|
|
for i in 0 ..< numShards:
|
|
let shardObj = RelayShard(clusterId: clusterId, shardId: uint16(i))
|
|
self.subscribeShard(PubsubTopic($shardObj)).isOkOr:
|
|
error "Failed to auto-subscribe Relay to cluster shard: ",
|
|
shard = $shardObj, error = error
|
|
else:
|
|
info "WakuSubscriptionManager has no AutoSharding configured; skipping auto-subscribe."
|
|
|
|
return ok()
|
|
|
|
proc stopWakuSubscriptionManager*(self: WakuSubscriptionManager) {.async: (raises: []).} =
|
|
if not self.isRelayMounted():
|
|
await self.stopEdgeFilterLoops()
|
|
await EventShardTopicHealthChange.dropListener(
|
|
self.node.brokerCtx, self.shardHealthListener
|
|
)
|
|
RequestEdgeShardHealth.clearProvider(self.node.brokerCtx)
|
|
RequestEdgeFilterPeerCount.clearProvider(self.node.brokerCtx)
|
|
RequestRelaySubscribeShard.clearProvider(self.node.brokerCtx)
|
|
RequestRelayUnsubscribeShard.clearProvider(self.node.brokerCtx)
|
|
RequestRelaySubscribeContentTopic.clearProvider(self.node.brokerCtx)
|
|
RequestRelayUnsubscribeContentTopic.clearProvider(self.node.brokerCtx)
|
|
RequestEdgeSubscribe.clearProvider(self.node.brokerCtx)
|
|
RequestEdgeUnsubscribe.clearProvider(self.node.brokerCtx)
|
|
RequestIsRelaySubscribed.clearProvider(self.node.brokerCtx)
|
|
RequestIsEdgeSubscribed.clearProvider(self.node.brokerCtx)
|
|
RequestIsSubscribed.clearProvider(self.node.brokerCtx)
|
|
RequestRelaySubscribedTopics.clearProvider(self.node.brokerCtx)
|
|
RequestEdgeSubscribedTopics.clearProvider(self.node.brokerCtx)
|
|
RequestSubscribedTopics.clearProvider(self.node.brokerCtx)
|