mirror of
https://github.com/logos-messaging/logos-delivery.git
synced 2026-03-15 13:33:10 +00:00
* SubscriptionManager tracks shard and content topic interest * RecvService emits MessageReceivedEvent on subscribed content topics * Route MAPI through old Kernel API relay unique-handler infra to avoid code duplication * Encode current gen-zero network policy: on Core node boot, subscribe to all pubsub topics (all shards) * Add test_api_subscriptions.nim (basic relay/core testing only) * Removed any MAPI Edge sub/unsub/receive support code that was there (will add in next PR) * Hook MessageSeenEvent to Kernel API bus * Fix MAPI vs Kernel API unique relay handler support * RecvService delegating topic subs to SubscriptionManager * RecvService emits MessageReceivedEvent (fully filtered) * Rename old SubscriptionManager to LegacySubscriptionManager
165 lines
5.4 KiB
Nim
165 lines
5.4 KiB
Nim
import std/[sets, tables, options, strutils], chronos, chronicles, results
|
|
import
|
|
waku/[
|
|
waku_core,
|
|
waku_core/topics,
|
|
waku_core/topics/sharding,
|
|
waku_node,
|
|
waku_relay,
|
|
common/broker/broker_context,
|
|
events/delivery_events,
|
|
]
|
|
|
|
type SubscriptionManager* = ref object of RootObj
|
|
node: WakuNode
|
|
contentTopicSubs: Table[PubsubTopic, HashSet[ContentTopic]]
|
|
## Map of Shard to ContentTopic needed because e.g. WakuRelay is PubsubTopic only.
|
|
## A present key with an empty HashSet value means pubsubtopic already subscribed
|
|
## (via subscribePubsubTopics()) but there's no specific content topic interest yet.
|
|
|
|
proc new*(T: typedesc[SubscriptionManager], node: WakuNode): T =
|
|
SubscriptionManager(
|
|
node: node, contentTopicSubs: initTable[PubsubTopic, HashSet[ContentTopic]]()
|
|
)
|
|
|
|
proc addContentTopicInterest(
|
|
self: SubscriptionManager, shard: PubsubTopic, topic: ContentTopic
|
|
): Result[void, string] =
|
|
if not self.contentTopicSubs.hasKey(shard):
|
|
self.contentTopicSubs[shard] = initHashSet[ContentTopic]()
|
|
|
|
self.contentTopicSubs.withValue(shard, cTopics):
|
|
if not cTopics[].contains(topic):
|
|
cTopics[].incl(topic)
|
|
|
|
# TODO: Call a "subscribe(shard, topic)" on filter client here,
|
|
# so the filter client can know that subscriptions changed.
|
|
|
|
return ok()
|
|
|
|
proc removeContentTopicInterest(
|
|
self: SubscriptionManager, shard: PubsubTopic, topic: ContentTopic
|
|
): Result[void, string] =
|
|
self.contentTopicSubs.withValue(shard, cTopics):
|
|
if cTopics[].contains(topic):
|
|
cTopics[].excl(topic)
|
|
|
|
if cTopics[].len == 0 and isNil(self.node.wakuRelay):
|
|
self.contentTopicSubs.del(shard) # We're done with cTopics here
|
|
|
|
# TODO: Call a "unsubscribe(shard, topic)" on filter client here,
|
|
# so the filter client can know that subscriptions changed.
|
|
|
|
return ok()
|
|
|
|
proc subscribePubsubTopics(
|
|
self: SubscriptionManager, shards: seq[PubsubTopic]
|
|
): Result[void, string] =
|
|
if isNil(self.node.wakuRelay):
|
|
return err("subscribePubsubTopics requires a Relay")
|
|
|
|
var errors: seq[string] = @[]
|
|
|
|
for shard in shards:
|
|
if not self.contentTopicSubs.hasKey(shard):
|
|
self.node.subscribe((kind: PubsubSub, topic: shard), nil).isOkOr:
|
|
errors.add("shard " & shard & ": " & error)
|
|
continue
|
|
|
|
self.contentTopicSubs[shard] = initHashSet[ContentTopic]()
|
|
|
|
if errors.len > 0:
|
|
return err("subscribeShard errors: " & errors.join("; "))
|
|
|
|
return ok()
|
|
|
|
proc startSubscriptionManager*(self: SubscriptionManager) =
|
|
if isNil(self.node.wakuRelay):
|
|
return
|
|
|
|
if self.node.wakuAutoSharding.isSome():
|
|
# Subscribe relay to all shards in autosharding.
|
|
let autoSharding = self.node.wakuAutoSharding.get()
|
|
let clusterId = autoSharding.clusterId
|
|
let numShards = autoSharding.shardCountGenZero
|
|
|
|
if numShards > 0:
|
|
var clusterPubsubTopics = newSeqOfCap[PubsubTopic](numShards)
|
|
|
|
for i in 0 ..< numShards:
|
|
let shardObj = RelayShard(clusterId: clusterId, shardId: uint16(i))
|
|
clusterPubsubTopics.add(PubsubTopic($shardObj))
|
|
|
|
self.subscribePubsubTopics(clusterPubsubTopics).isOkOr:
|
|
error "Failed to auto-subscribe Relay to cluster shards: ", error = error
|
|
else:
|
|
info "SubscriptionManager has no AutoSharding configured; skipping auto-subscribe."
|
|
|
|
proc stopSubscriptionManager*(self: SubscriptionManager) {.async.} =
|
|
discard
|
|
|
|
proc getActiveSubscriptions*(
|
|
self: SubscriptionManager
|
|
): seq[tuple[pubsubTopic: string, contentTopics: seq[ContentTopic]]] =
|
|
var activeSubs: seq[tuple[pubsubTopic: string, contentTopics: seq[ContentTopic]]] =
|
|
@[]
|
|
|
|
for pubsub, cTopicSet in self.contentTopicSubs.pairs:
|
|
if cTopicSet.len > 0:
|
|
var cTopicSeq = newSeqOfCap[ContentTopic](cTopicSet.len)
|
|
for t in cTopicSet:
|
|
cTopicSeq.add(t)
|
|
activeSubs.add((pubsub, cTopicSeq))
|
|
|
|
return activeSubs
|
|
|
|
proc getShardForContentTopic(
|
|
self: SubscriptionManager, topic: ContentTopic
|
|
): Result[PubsubTopic, string] =
|
|
if self.node.wakuAutoSharding.isSome():
|
|
let shardObj = ?self.node.wakuAutoSharding.get().getShard(topic)
|
|
return ok($shardObj)
|
|
|
|
return err("SubscriptionManager requires AutoSharding")
|
|
|
|
proc isSubscribed*(
|
|
self: SubscriptionManager, topic: ContentTopic
|
|
): Result[bool, string] =
|
|
let shard = ?self.getShardForContentTopic(topic)
|
|
return ok(
|
|
self.contentTopicSubs.hasKey(shard) and self.contentTopicSubs[shard].contains(topic)
|
|
)
|
|
|
|
proc isSubscribed*(
|
|
self: SubscriptionManager, shard: PubsubTopic, contentTopic: ContentTopic
|
|
): bool {.raises: [].} =
|
|
self.contentTopicSubs.withValue(shard, cTopics):
|
|
return cTopics[].contains(contentTopic)
|
|
return false
|
|
|
|
proc subscribe*(self: SubscriptionManager, topic: ContentTopic): Result[void, string] =
|
|
if isNil(self.node.wakuRelay) and isNil(self.node.wakuFilterClient):
|
|
return err("SubscriptionManager requires either Relay or Filter Client.")
|
|
|
|
let shard = ?self.getShardForContentTopic(topic)
|
|
|
|
if not isNil(self.node.wakuRelay) and not self.contentTopicSubs.hasKey(shard):
|
|
?self.subscribePubsubTopics(@[shard])
|
|
|
|
?self.addContentTopicInterest(shard, topic)
|
|
|
|
return ok()
|
|
|
|
proc unsubscribe*(
|
|
self: SubscriptionManager, topic: ContentTopic
|
|
): Result[void, string] =
|
|
if isNil(self.node.wakuRelay) and isNil(self.node.wakuFilterClient):
|
|
return err("SubscriptionManager requires either Relay or Filter Client.")
|
|
|
|
let shard = ?self.getShardForContentTopic(topic)
|
|
|
|
if self.isSubscribed(shard, topic):
|
|
?self.removeContentTopicInterest(shard, topic)
|
|
|
|
return ok()
|