mirror of
https://github.com/logos-messaging/logos-delivery.git
synced 2026-06-04 05:00:02 +00:00
688 lines
24 KiB
Nim
688 lines
24 KiB
Nim
import std/[sequtils, sets, tables, options], chronos, chronicles, metrics, results
|
|
import libp2p/[peerid, peerinfo]
|
|
import brokers/broker_context
|
|
|
|
import
|
|
waku/[
|
|
waku_core,
|
|
waku_core/topics/sharding,
|
|
node/node_types,
|
|
node/node_telemetry,
|
|
waku_relay,
|
|
waku_archive,
|
|
waku_store_sync,
|
|
waku_filter_v2/common as filter_common,
|
|
waku_filter_v2/client as filter_client,
|
|
waku_filter_v2/protocol as filter_protocol,
|
|
events/health_events,
|
|
events/message_events,
|
|
events/peer_events,
|
|
requests/health_requests,
|
|
node/peer_manager,
|
|
node/health_monitor/topic_health,
|
|
node/health_monitor/connection_status,
|
|
]
|
|
|
|
{.push raises: [].}
|
|
|
|
proc doRelaySubscribe(
|
|
node: WakuNode, shard: PubsubTopic, appHandler: WakuRelayHandler = nil
|
|
): bool =
|
|
let alreadySubscribed = node.wakuRelay.isSubscribed(shard)
|
|
|
|
if not appHandler.isNil():
|
|
if not alreadySubscribed or not node.legacyAppHandlers.hasKey(shard):
|
|
node.legacyAppHandlers[shard] = appHandler
|
|
else:
|
|
debug "Legacy appHandler already exists for active PubsubTopic, ignoring new handler",
|
|
topic = shard
|
|
|
|
if alreadySubscribed:
|
|
return false
|
|
|
|
proc traceHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
|
|
let msgSizeKB = msg.payload.len / 1000
|
|
|
|
waku_node_messages.inc(labelValues = ["relay"])
|
|
waku_histogram_message_size.observe(msgSizeKB)
|
|
|
|
proc filterHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
|
|
if node.wakuFilter.isNil():
|
|
return
|
|
|
|
await node.wakuFilter.handleMessage(topic, msg)
|
|
|
|
proc archiveHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
|
|
if node.wakuArchive.isNil():
|
|
return
|
|
|
|
await node.wakuArchive.handleMessage(topic, msg)
|
|
|
|
proc syncHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
|
|
if node.wakuStoreReconciliation.isNil():
|
|
return
|
|
|
|
node.wakuStoreReconciliation.messageIngress(topic, msg)
|
|
|
|
proc internalHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
|
|
MessageSeenEvent.emit(node.brokerCtx, topic, msg)
|
|
|
|
let uniqueTopicHandler = proc(
|
|
topic: PubsubTopic, msg: WakuMessage
|
|
): Future[void] {.async, gcsafe.} =
|
|
await traceHandler(topic, msg)
|
|
await filterHandler(topic, msg)
|
|
await archiveHandler(topic, msg)
|
|
await syncHandler(topic, msg)
|
|
await internalHandler(topic, msg)
|
|
|
|
if node.legacyAppHandlers.hasKey(topic) and not node.legacyAppHandlers[topic].isNil():
|
|
await node.legacyAppHandlers[topic](topic, msg)
|
|
|
|
node.wakuRelay.subscribe(shard, uniqueTopicHandler)
|
|
node.topicSubscriptionQueue.emit((kind: PubsubSub, topic: shard))
|
|
return true
|
|
|
|
proc doRelayUnsubscribe(node: WakuNode, shard: PubsubTopic) =
|
|
if node.legacyAppHandlers.hasKey(shard):
|
|
node.legacyAppHandlers.del(shard)
|
|
|
|
if node.wakuRelay.isSubscribed(shard):
|
|
node.wakuRelay.unsubscribe(shard)
|
|
node.topicSubscriptionQueue.emit((kind: PubsubUnsub, topic: shard))
|
|
|
|
proc new*(T: type SubscriptionManager, node: WakuNode): T =
|
|
T(
|
|
node: node,
|
|
shards: initTable[PubsubTopic, ShardSubscription](),
|
|
edgeFilterSubStates: initTable[PubsubTopic, EdgeFilterSubState](),
|
|
edgeFilterWakeup: newAsyncEvent(),
|
|
)
|
|
|
|
func wanted(entry: ShardSubscription): bool =
|
|
## True if the shard has content-topic interest or a direct subscription.
|
|
return entry.contentTopics.len > 0 or entry.directShardSub
|
|
|
|
proc isContentSubscribed*(
|
|
self: SubscriptionManager, shard: PubsubTopic, contentTopic: ContentTopic
|
|
): bool =
|
|
self.shards.withValue(shard, sub):
|
|
return contentTopic in sub.contentTopics
|
|
return false
|
|
|
|
iterator subscribedContentTopics*(
|
|
self: SubscriptionManager
|
|
): (PubsubTopic, HashSet[ContentTopic]) =
|
|
## Yields each shard with its non-empty content-topic set.
|
|
for shard, sub in self.shards.pairs:
|
|
if sub.contentTopics.len > 0:
|
|
yield (shard, sub.contentTopics)
|
|
|
|
func toTopicHealth*(peersCount: int): TopicHealth =
|
|
if peersCount >= HealthyThreshold:
|
|
return TopicHealth.SUFFICIENTLY_HEALTHY
|
|
elif peersCount > 0:
|
|
return TopicHealth.MINIMALLY_HEALTHY
|
|
else:
|
|
return TopicHealth.UNHEALTHY
|
|
|
|
proc edgeFilterPeerCount*(self: SubscriptionManager, shard: PubsubTopic): int =
|
|
self.edgeFilterSubStates.withValue(shard, state):
|
|
return state.peers.len
|
|
return 0
|
|
|
|
proc getShardForContentTopic(
|
|
self: SubscriptionManager, topic: ContentTopic
|
|
): Result[PubsubTopic, string] =
|
|
if self.node.wakuAutoSharding.isSome():
|
|
let shardObj = ?self.node.wakuAutoSharding.get().getShard(topic)
|
|
return ok($shardObj)
|
|
|
|
return err("autosharding is not configured; pass an explicit shard")
|
|
|
|
proc subscribeShard*(
|
|
self: SubscriptionManager, shard: PubsubTopic, handler: WakuRelayHandler = nil
|
|
): Result[void, string] =
|
|
## Subscribes to the shard directly and joins the relay mesh.
|
|
var added = false
|
|
self.shards.withValue(shard, entry):
|
|
if not entry.directShardSub:
|
|
entry.directShardSub = true
|
|
added = true
|
|
do:
|
|
self.shards[shard] = ShardSubscription(
|
|
contentTopics: initHashSet[ContentTopic](), directShardSub: true
|
|
)
|
|
added = true
|
|
if added:
|
|
self.edgeFilterWakeup.fire()
|
|
if not isNil(self.node.wakuRelay):
|
|
discard self.node.doRelaySubscribe(shard, handler)
|
|
return ok()
|
|
|
|
proc unsubscribeShard*(
|
|
self: SubscriptionManager, shard: PubsubTopic
|
|
): Result[void, string] =
|
|
## Drops the direct shard subscription; unsubscribes the mesh if no content topic wants it.
|
|
var removed = false
|
|
var shardEmpty = false
|
|
self.shards.withValue(shard, entry):
|
|
if entry.directShardSub:
|
|
entry.directShardSub = false
|
|
removed = true
|
|
shardEmpty = not entry[].wanted()
|
|
if removed:
|
|
self.edgeFilterWakeup.fire()
|
|
if shardEmpty:
|
|
self.shards.del(shard)
|
|
if not isNil(self.node.wakuRelay):
|
|
self.node.doRelayUnsubscribe(shard)
|
|
return ok()
|
|
|
|
proc subscribe*(
|
|
self: SubscriptionManager,
|
|
shard: PubsubTopic,
|
|
contentTopic: ContentTopic,
|
|
handler: WakuRelayHandler = nil,
|
|
): Result[void, string] =
|
|
## Adds content-topic interest on the shard and joins the relay mesh.
|
|
var added = false
|
|
self.shards.withValue(shard, entry):
|
|
if contentTopic notin entry.contentTopics:
|
|
entry.contentTopics.incl(contentTopic)
|
|
added = true
|
|
do:
|
|
var entry = ShardSubscription(contentTopics: initHashSet[ContentTopic]())
|
|
entry.contentTopics.incl(contentTopic)
|
|
self.shards[shard] = entry
|
|
added = true
|
|
if added:
|
|
self.edgeFilterWakeup.fire()
|
|
if not isNil(self.node.wakuRelay):
|
|
discard self.node.doRelaySubscribe(shard, handler)
|
|
return ok()
|
|
|
|
proc unsubscribe*(
|
|
self: SubscriptionManager, shard: PubsubTopic, contentTopic: ContentTopic
|
|
): Result[void, string] =
|
|
## Drops content-topic interest on the shard; unsubscribes the mesh if nothing else wants it.
|
|
var removed = false
|
|
var shardEmpty = false
|
|
self.shards.withValue(shard, entry):
|
|
if contentTopic in entry.contentTopics:
|
|
entry.contentTopics.excl(contentTopic)
|
|
removed = true
|
|
shardEmpty = not entry[].wanted()
|
|
if removed:
|
|
self.edgeFilterWakeup.fire()
|
|
if shardEmpty:
|
|
self.shards.del(shard)
|
|
if not isNil(self.node.wakuRelay):
|
|
self.node.doRelayUnsubscribe(shard)
|
|
return ok()
|
|
|
|
proc subscribe*(self: SubscriptionManager, topic: ContentTopic): Result[void, string] =
|
|
## Subscribes to a content topic, resolving its shard via autosharding.
|
|
let shard = ?self.getShardForContentTopic(topic)
|
|
return self.subscribe(shard, topic)
|
|
|
|
proc unsubscribe*(
|
|
self: SubscriptionManager, topic: ContentTopic
|
|
): Result[void, string] =
|
|
## Unsubscribes from a content topic, resolving its shard via autosharding.
|
|
let shard = ?self.getShardForContentTopic(topic)
|
|
return self.unsubscribe(shard, topic)
|
|
|
|
proc unsubscribeAll*(
|
|
self: SubscriptionManager, shard: PubsubTopic
|
|
): Result[void, string] =
|
|
## Drops every content topic on the shard, then the direct subscription.
|
|
var snapshot: seq[ContentTopic]
|
|
self.shards.withValue(shard, sub):
|
|
snapshot = toSeq(sub.contentTopics)
|
|
for contentTopic in snapshot:
|
|
?self.unsubscribe(shard, contentTopic)
|
|
return self.unsubscribeShard(shard)
|
|
|
|
proc isSubscribed*(
|
|
self: SubscriptionManager, topic: ContentTopic
|
|
): Result[bool, string] =
|
|
let shard = ?self.getShardForContentTopic(topic)
|
|
return ok(self.isContentSubscribed(shard, topic))
|
|
|
|
proc subscribeAllAutoshards*(self: SubscriptionManager): Result[void, string] =
|
|
## Subscribes the relay to every shard in the configured autosharding cluster.
|
|
if self.node.wakuRelay.isNil() or self.node.wakuAutoSharding.isNone():
|
|
return ok()
|
|
|
|
let autoSharding = self.node.wakuAutoSharding.get()
|
|
let numShards = autoSharding.shardCountGenZero
|
|
if numShards == 0:
|
|
return ok()
|
|
|
|
for i in 0'u32 ..< numShards:
|
|
let shardObj = RelayShard(clusterId: autoSharding.clusterId, shardId: uint16(i))
|
|
self.subscribeShard(PubsubTopic($shardObj)).isOkOr:
|
|
error "failed to auto-subscribe relay to cluster shard",
|
|
shard = $shardObj, error = error
|
|
|
|
ok()
|
|
|
|
{.pop.}
|
|
|
|
const EdgeFilterSubscribeTimeout = chronos.seconds(15)
|
|
## Timeout for a single filter subscribe/unsubscribe RPC to a service peer.
|
|
const EdgeFilterPingTimeout = chronos.seconds(5)
|
|
## Timeout for a filter ping health check.
|
|
const EdgeFilterLoopInterval = chronos.seconds(30)
|
|
## Interval for the edge filter health ping loop.
|
|
const EdgeFilterSubLoopDebounce = chronos.seconds(1)
|
|
## Debounce delay to coalesce rapid-fire wakeups into a single reconciliation pass.
|
|
|
|
type EdgeDialTask = object
|
|
peer: RemotePeerInfo
|
|
shard: PubsubTopic
|
|
topics: seq[ContentTopic]
|
|
|
|
proc updateShardHealth(
|
|
self: SubscriptionManager, shard: PubsubTopic, state: var EdgeFilterSubState
|
|
) =
|
|
## Recompute and emit health for a shard after its peer set changed.
|
|
let newHealth = toTopicHealth(state.peers.len)
|
|
if newHealth != state.currentHealth:
|
|
state.currentHealth = newHealth
|
|
EventShardTopicHealthChange.emit(self.node.brokerCtx, shard, newHealth)
|
|
|
|
proc removePeer(self: SubscriptionManager, shard: PubsubTopic, peerId: PeerId) =
|
|
## Remove a peer from edgeFilterSubStates for the given shard,
|
|
## update health, and wake the sub loop to dial a replacement.
|
|
## Best-effort unsubscribe so the service peer stops pushing to us.
|
|
self.edgeFilterSubStates.withValue(shard, state):
|
|
var idx = -1
|
|
for i, p in state.peers:
|
|
if p.peerId == peerId:
|
|
idx = i
|
|
break
|
|
if idx < 0:
|
|
return
|
|
|
|
let peer = state.peers[idx]
|
|
state.peers.del(idx)
|
|
self.updateShardHealth(shard, state[])
|
|
self.edgeFilterWakeup.fire()
|
|
|
|
if not self.node.wakuFilterClient.isNil():
|
|
self.shards.withValue(shard, sub):
|
|
let ct = toSeq(sub.contentTopics)
|
|
if ct.len > 0:
|
|
proc doUnsubscribe() {.async.} =
|
|
discard await self.node.wakuFilterClient.unsubscribe(peer, shard, ct)
|
|
|
|
asyncSpawn doUnsubscribe()
|
|
|
|
type SendChunkedFilterRpcKind = enum
|
|
FilterSubscribe
|
|
FilterUnsubscribe
|
|
|
|
proc sendChunkedFilterRpc(
|
|
self: SubscriptionManager,
|
|
peer: RemotePeerInfo,
|
|
shard: PubsubTopic,
|
|
topics: seq[ContentTopic],
|
|
kind: SendChunkedFilterRpcKind,
|
|
): Future[bool] {.async.} =
|
|
## Send a chunked filter subscribe or unsubscribe RPC. Returns true on
|
|
## success. On failure the peer is removed and false is returned.
|
|
try:
|
|
var i = 0
|
|
while i < topics.len:
|
|
let chunk =
|
|
topics[i ..< min(i + filter_protocol.MaxContentTopicsPerRequest, topics.len)]
|
|
let fut =
|
|
case kind
|
|
of FilterSubscribe:
|
|
self.node.wakuFilterClient.subscribe(peer, shard, chunk)
|
|
of FilterUnsubscribe:
|
|
self.node.wakuFilterClient.unsubscribe(peer, shard, chunk)
|
|
if not (await fut.withTimeout(EdgeFilterSubscribeTimeout)) or fut.read().isErr():
|
|
trace "sendChunkedFilterRpc: chunk failed",
|
|
op = kind, shard = shard, peer = peer.peerId
|
|
self.removePeer(shard, peer.peerId)
|
|
return false
|
|
i += filter_protocol.MaxContentTopicsPerRequest
|
|
except CatchableError as exc:
|
|
debug "sendChunkedFilterRpc: failed",
|
|
op = kind, shard = shard, peer = peer.peerId, err = exc.msg
|
|
self.removePeer(shard, peer.peerId)
|
|
return false
|
|
return true
|
|
|
|
proc syncFilterDeltas(
|
|
self: SubscriptionManager,
|
|
peer: RemotePeerInfo,
|
|
shard: PubsubTopic,
|
|
added: seq[ContentTopic],
|
|
removed: seq[ContentTopic],
|
|
) {.async.} =
|
|
## Push content topic changes (adds/removes) to an already-tracked peer.
|
|
if added.len > 0:
|
|
if not await self.sendChunkedFilterRpc(peer, shard, added, FilterSubscribe):
|
|
return
|
|
|
|
if removed.len > 0:
|
|
discard await self.sendChunkedFilterRpc(peer, shard, removed, FilterUnsubscribe)
|
|
|
|
proc dialFilterPeer(
|
|
self: SubscriptionManager,
|
|
peer: RemotePeerInfo,
|
|
shard: PubsubTopic,
|
|
contentTopics: seq[ContentTopic],
|
|
) {.async.} =
|
|
## Subscribe a new peer to all content topics on a shard and start tracking it.
|
|
self.edgeFilterSubStates.withValue(shard, state):
|
|
state.pendingPeers.incl(peer.peerId)
|
|
|
|
try:
|
|
if not await self.sendChunkedFilterRpc(peer, shard, contentTopics, FilterSubscribe):
|
|
return
|
|
|
|
self.edgeFilterSubStates.withValue(shard, state):
|
|
if state.peers.anyIt(it.peerId == peer.peerId):
|
|
trace "dialFilterPeer: peer already tracked, skipping duplicate",
|
|
shard = shard, peer = peer.peerId
|
|
return
|
|
|
|
state.peers.add(peer)
|
|
self.updateShardHealth(shard, state[])
|
|
trace "dialFilterPeer: successfully subscribed to all chunks",
|
|
shard = shard, peer = peer.peerId, totalPeers = state.peers.len
|
|
do:
|
|
trace "dialFilterPeer: shard removed while subscribing, discarding result",
|
|
shard = shard, peer = peer.peerId
|
|
finally:
|
|
self.edgeFilterSubStates.withValue(shard, state):
|
|
state.pendingPeers.excl(peer.peerId)
|
|
|
|
proc edgeFilterConnectionLoop(self: SubscriptionManager) {.async.} =
|
|
## Periodically pings all tracked filter service peers to verify they are
|
|
## still alive at the application layer. Peers that fail the ping are removed.
|
|
while true:
|
|
await sleepAsync(EdgeFilterLoopInterval)
|
|
|
|
if self.node.wakuFilterClient.isNil():
|
|
warn "filter client is nil within edge filter connection loop"
|
|
continue
|
|
|
|
var connected = initTable[PeerId, RemotePeerInfo]()
|
|
for state in self.edgeFilterSubStates.values:
|
|
for peer in state.peers:
|
|
if self.node.peerManager.switch.peerStore.isConnected(peer.peerId):
|
|
connected[peer.peerId] = peer
|
|
|
|
var alive = initHashSet[PeerId]()
|
|
|
|
if connected.len > 0:
|
|
var pingTasks: seq[(PeerId, Future[FilterSubscribeResult])]
|
|
for peer in connected.values:
|
|
pingTasks.add(
|
|
(peer.peerId, self.node.wakuFilterClient.ping(peer, EdgeFilterPingTimeout))
|
|
)
|
|
|
|
await allFutures(pingTasks.mapIt(it[1]))
|
|
|
|
for (peerId, task) in pingTasks:
|
|
if task.read().isOk():
|
|
alive.incl(peerId)
|
|
|
|
var changed = false
|
|
for shard, state in self.edgeFilterSubStates.mpairs:
|
|
let oldLen = state.peers.len
|
|
state.peers.keepItIf(it.peerId notin connected or alive.contains(it.peerId))
|
|
|
|
if state.peers.len < oldLen:
|
|
changed = true
|
|
self.updateShardHealth(shard, state)
|
|
trace "Edge Filter health degraded by Ping failure",
|
|
shard = shard, new = state.currentHealth
|
|
|
|
if changed:
|
|
self.edgeFilterWakeup.fire()
|
|
|
|
proc selectFilterCandidates(
|
|
self: SubscriptionManager, shard: PubsubTopic, exclude: HashSet[PeerId], needed: int
|
|
): seq[RemotePeerInfo] =
|
|
## Select filter service peer candidates for a shard.
|
|
|
|
# Start with every filter server peer that can serve the shard
|
|
var allCandidates = self.node.peerManager.selectPeers(
|
|
filter_common.WakuFilterSubscribeCodec, some(shard)
|
|
)
|
|
|
|
# Remove all already used in this shard or being dialed for it
|
|
allCandidates.keepItIf(it.peerId notin exclude)
|
|
|
|
# Collect peer IDs already tracked on other shards
|
|
var trackedOnOther = initHashSet[PeerId]()
|
|
for otherShard, otherState in self.edgeFilterSubStates.pairs:
|
|
if otherShard != shard:
|
|
for peer in otherState.peers:
|
|
trackedOnOther.incl(peer.peerId)
|
|
|
|
# Prefer peers we already have a connection to first, preserving shuffle
|
|
var candidates =
|
|
allCandidates.filterIt(it.peerId in trackedOnOther) &
|
|
allCandidates.filterIt(it.peerId notin trackedOnOther)
|
|
|
|
# We need to return 'needed' peers only
|
|
if candidates.len > needed:
|
|
candidates.setLen(needed)
|
|
return candidates
|
|
|
|
proc edgeFilterSubLoop(self: SubscriptionManager) {.async.} =
|
|
## Reconciles filter subscriptions with the desired state from SubscriptionManager.
|
|
var lastSynced = initTable[PubsubTopic, HashSet[ContentTopic]]()
|
|
|
|
while true:
|
|
await self.edgeFilterWakeup.wait()
|
|
await sleepAsync(EdgeFilterSubLoopDebounce)
|
|
self.edgeFilterWakeup.clear()
|
|
trace "edgeFilterSubLoop: woke up"
|
|
|
|
if isNil(self.node.wakuFilterClient):
|
|
trace "edgeFilterSubLoop: wakuFilterClient is nil, skipping"
|
|
continue
|
|
|
|
var newSynced = initTable[PubsubTopic, HashSet[ContentTopic]]()
|
|
var allShards: HashSet[PubsubTopic]
|
|
for shard, sub in self.shards.pairs:
|
|
if sub.contentTopics.len > 0:
|
|
newSynced[shard] = sub.contentTopics
|
|
allShards.incl(shard)
|
|
for shard in lastSynced.keys:
|
|
allShards.incl(shard)
|
|
|
|
trace "edgeFilterSubLoop: desired state", numShards = newSynced.len
|
|
|
|
# Step 1: read state across all shards at once and
|
|
# create a list of peer dial tasks and shard tracking to delete.
|
|
|
|
var dialTasks: seq[EdgeDialTask]
|
|
var shardsToDelete: seq[PubsubTopic]
|
|
|
|
for shard in allShards:
|
|
# Compute added/removed deltas via direct iteration; no HashSet copies.
|
|
var addedTopics: seq[ContentTopic]
|
|
var removedTopics: seq[ContentTopic]
|
|
newSynced.withValue(shard, curr):
|
|
lastSynced.withValue(shard, prev):
|
|
for t in curr[]:
|
|
if t notin prev[]:
|
|
addedTopics.add(t)
|
|
for t in prev[]:
|
|
if t notin curr[]:
|
|
removedTopics.add(t)
|
|
do:
|
|
for t in curr[]:
|
|
addedTopics.add(t)
|
|
do:
|
|
lastSynced.withValue(shard, prev):
|
|
for t in prev[]:
|
|
removedTopics.add(t)
|
|
|
|
discard self.edgeFilterSubStates.mgetOrPut(
|
|
shard, EdgeFilterSubState(currentHealth: TopicHealth.UNHEALTHY)
|
|
)
|
|
|
|
self.edgeFilterSubStates.withValue(shard, state):
|
|
state.peers.keepItIf(
|
|
self.node.peerManager.switch.peerStore.isConnected(it.peerId)
|
|
)
|
|
state.pending.keepItIf(not it.finished)
|
|
|
|
if addedTopics.len > 0 or removedTopics.len > 0:
|
|
for peer in state.peers:
|
|
asyncSpawn self.syncFilterDeltas(peer, shard, addedTopics, removedTopics)
|
|
|
|
if shard notin newSynced:
|
|
shardsToDelete.add(shard)
|
|
else:
|
|
self.updateShardHealth(shard, state[])
|
|
|
|
let needed = max(0, HealthyThreshold - state.peers.len - state.pending.len)
|
|
|
|
if needed > 0:
|
|
var tracked: HashSet[PeerId]
|
|
for p in state.peers:
|
|
tracked.incl(p.peerId)
|
|
for p in state.pendingPeers:
|
|
tracked.incl(p)
|
|
let candidates = self.selectFilterCandidates(shard, tracked, needed)
|
|
let toDial = min(needed, candidates.len)
|
|
|
|
trace "edgeFilterSubLoop: shard reconciliation",
|
|
shard = shard,
|
|
num_peers = state.peers.len,
|
|
num_pending = state.pending.len,
|
|
num_needed = needed,
|
|
num_available = candidates.len,
|
|
toDial = toDial
|
|
|
|
var dialTopics: seq[ContentTopic]
|
|
newSynced.withValue(shard, curr):
|
|
dialTopics = toSeq(curr[])
|
|
|
|
for i in 0 ..< toDial:
|
|
dialTasks.add(
|
|
EdgeDialTask(peer: candidates[i], shard: shard, topics: dialTopics)
|
|
)
|
|
|
|
# Step 2: execute deferred shard tracking deletion and dial tasks.
|
|
|
|
for shard in shardsToDelete:
|
|
self.edgeFilterSubStates.withValue(shard, state):
|
|
for fut in state.pending:
|
|
if not fut.finished:
|
|
await fut.cancelAndWait()
|
|
self.edgeFilterSubStates.del(shard)
|
|
|
|
for task in dialTasks:
|
|
let fut = self.dialFilterPeer(task.peer, task.shard, task.topics)
|
|
self.edgeFilterSubStates.withValue(task.shard, state):
|
|
state.pending.add(fut)
|
|
|
|
lastSynced = newSynced
|
|
|
|
proc startEdgeFilterLoops(self: SubscriptionManager): Result[void, string] =
|
|
## Start the edge filter orchestration loops.
|
|
## Caller must ensure this is only called in edge mode (relay nil, filter client present).
|
|
self.peerEventListener = WakuPeerEvent.listen(
|
|
self.node.brokerCtx,
|
|
proc(evt: WakuPeerEvent) {.async: (raises: []), gcsafe.} =
|
|
if evt.kind == WakuPeerEventKind.EventDisconnected:
|
|
# We know a peer is gone, so if it was a service filter peer for this
|
|
# edge node, remove it from the list of service filter peers for each
|
|
# shard it served and re-evaluate shard health for the affected shards.
|
|
for shard, state in self.edgeFilterSubStates.mpairs:
|
|
let oldLen = state.peers.len
|
|
state.peers.keepItIf(it.peerId != evt.peerId)
|
|
if state.peers.len < oldLen:
|
|
self.updateShardHealth(shard, state)
|
|
self.edgeFilterWakeup.fire()
|
|
elif evt.kind == WakuPeerEventKind.EventMetadataUpdated:
|
|
self.edgeFilterWakeup.fire()
|
|
,
|
|
).valueOr:
|
|
return err("Failed to listen to peer events for edge filter: " & error)
|
|
|
|
self.edgeFilterSubLoopFut = self.edgeFilterSubLoop()
|
|
self.edgeFilterConnectionLoopFut = self.edgeFilterConnectionLoop()
|
|
return ok()
|
|
|
|
proc stopEdgeFilterLoops(self: SubscriptionManager) {.async: (raises: []).} =
|
|
## Stop the edge filter orchestration loops and clean up pending futures.
|
|
if not isNil(self.edgeFilterSubLoopFut):
|
|
await self.edgeFilterSubLoopFut.cancelAndWait()
|
|
self.edgeFilterSubLoopFut = nil
|
|
|
|
if not isNil(self.edgeFilterConnectionLoopFut):
|
|
await self.edgeFilterConnectionLoopFut.cancelAndWait()
|
|
self.edgeFilterConnectionLoopFut = nil
|
|
|
|
for shard, state in self.edgeFilterSubStates:
|
|
for fut in state.pending:
|
|
if not fut.finished:
|
|
await fut.cancelAndWait()
|
|
|
|
await WakuPeerEvent.dropListener(self.node.brokerCtx, self.peerEventListener)
|
|
|
|
proc start*(self: SubscriptionManager): Result[void, string] =
|
|
let edgeShardHealthRes = RequestEdgeShardHealth.setProvider(
|
|
self.node.brokerCtx,
|
|
proc(shard: PubsubTopic): Result[RequestEdgeShardHealth, string] =
|
|
self.edgeFilterSubStates.withValue(shard, state):
|
|
return ok(RequestEdgeShardHealth(health: state.currentHealth))
|
|
return ok(RequestEdgeShardHealth(health: TopicHealth.NOT_SUBSCRIBED)),
|
|
)
|
|
self.ownsEdgeShardHealthProvider = edgeShardHealthRes.isOk()
|
|
if edgeShardHealthRes.isErr():
|
|
error "Can't set provider for RequestEdgeShardHealth",
|
|
error = edgeShardHealthRes.error
|
|
|
|
let edgeFilterPeerCountRes = RequestEdgeFilterPeerCount.setProvider(
|
|
self.node.brokerCtx,
|
|
proc(): Result[RequestEdgeFilterPeerCount, string] =
|
|
var minPeers = high(int)
|
|
for state in self.edgeFilterSubStates.values:
|
|
minPeers = min(minPeers, state.peers.len)
|
|
if minPeers == high(int):
|
|
minPeers = 0
|
|
return ok(RequestEdgeFilterPeerCount(peerCount: minPeers)),
|
|
)
|
|
self.ownsEdgeFilterPeerCountProvider = edgeFilterPeerCountRes.isOk()
|
|
if edgeFilterPeerCountRes.isErr():
|
|
error "Can't set provider for RequestEdgeFilterPeerCount",
|
|
error = edgeFilterPeerCountRes.error
|
|
|
|
# Start Edge workers only when we are in Edge mode (relay not mounted)
|
|
# AND the filter client is mounted (otherwise the loops have nothing
|
|
# to talk to and just spam "filter client is nil" warnings).
|
|
if self.node.wakuRelay.isNil() and not self.node.wakuFilterClient.isNil():
|
|
return self.startEdgeFilterLoops()
|
|
|
|
return ok()
|
|
|
|
proc stop*(self: SubscriptionManager) {.async: (raises: []).} =
|
|
# Stop Edge workers if we started them in `start` (Edge mode + filter client).
|
|
if self.node.wakuRelay.isNil() and not self.node.wakuFilterClient.isNil():
|
|
await self.stopEdgeFilterLoops()
|
|
|
|
# Only clear providers we actually registered: another SubscriptionManager
|
|
# sharing this brokerCtx may have won the race, and clearing its provider
|
|
# would leave the broker silently provider-less.
|
|
if self.ownsEdgeShardHealthProvider:
|
|
RequestEdgeShardHealth.clearProvider(self.node.brokerCtx)
|
|
self.ownsEdgeShardHealthProvider = false
|
|
if self.ownsEdgeFilterPeerCountProvider:
|
|
RequestEdgeFilterPeerCount.clearProvider(self.node.brokerCtx)
|
|
self.ownsEdgeFilterPeerCountProvider = false
|