Fixes from pair session with Ivan

This commit is contained in:
Fabiana Cecin 2026-02-27 10:46:56 -03:00
parent 3ee4959c9a
commit 964f0e131a
No known key found for this signature in database
GPG Key ID: BCAB8A55CB51B6C7
10 changed files with 88 additions and 183 deletions

View File

@ -16,7 +16,7 @@ import
]
import waku/api/api_conf, waku/factory/waku_conf
# TODO: Edge testing (after EdgeDriver is completed)
# TODO: Edge testing (after MAPI edge support is completed)
const TestTimeout = chronos.seconds(10)
const NegativeTestTimeout = chronos.seconds(2)

View File

@ -29,6 +29,6 @@ EventBroker:
EventBroker:
# Internal event emitted when a message arrives from the network via any protocol
type MessageReceivedInternalEvent* = object
type MessageSeenEvent* = object
topic*: PubsubTopic
message*: WakuMessage

View File

@ -420,37 +420,6 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises:
if not waku[].deliveryService.isNil():
waku[].deliveryService.startDeliveryService()
## Subscription Service
if not waku.node.wakuRelay.isNil() and not waku.deliveryService.isNil():
let subService = waku.deliveryService.subscriptionService
if waku.node.wakuAutoSharding.isSome():
# Subscribe relay to all shards in autosharding.
let autoSharding = waku.node.wakuAutoSharding.get()
let clusterId = autoSharding.clusterId
let numShards = autoSharding.shardCountGenZero
if numShards > 0:
var clusterPubsubTopics = newSeqOfCap[PubsubTopic](numShards)
for i in 0 ..< numShards:
let shardObj = RelayShard(clusterId: clusterId, shardId: uint16(i))
clusterPubsubTopics.add(PubsubTopic($shardObj))
subService.subscribeShard(clusterPubsubTopics).isOkOr:
return err("Failed to auto-subscribe Relay to cluster shards: " & error)
else:
# Fallback to configured shards when no autosharding.
if waku.conf.subscribeShards.len > 0:
let manualShards = waku.conf.subscribeShards.mapIt(
PubsubTopic(
$(RelayShard(clusterId: waku.conf.clusterId, shardId: uint16(it)))
)
)
subService.subscribeShard(manualShards).isOkOr:
return err("Failed to subscribe Relay to manual shards: " & error)
## Health Monitor
waku[].healthMonitor.startHealthMonitor().isOkOr:
return err("failed to start health monitor: " & $error)

View File

@ -38,9 +38,11 @@ proc new*(
)
proc startDeliveryService*(self: DeliveryService) =
self.sendService.startSendService()
self.subscriptionService.startSubscriptionService()
self.recvService.startRecvService()
self.sendService.startSendService()
proc stopDeliveryService*(self: DeliveryService) {.async.} =
await self.sendService.stopSendService()
await self.recvService.stopRecvService()
await self.subscriptionService.stopSubscriptionService()

View File

@ -37,7 +37,7 @@ type RecvMessage = object
type RecvService* = ref object of RootObj
brokerCtx: BrokerContext
node: WakuNode
internalMsgListener: MessageReceivedInternalEventListener
seenMsgListener: MessageSeenEventListener
subscriptionService: SubscriptionService
recentReceivedMsgs: seq[RecvMessage]
@ -176,9 +176,9 @@ proc startRecvService*(self: RecvService) =
self.msgCheckerHandler = self.msgChecker()
self.msgPrunerHandler = self.loopPruneOldMessages()
self.internalMsgListener = MessageReceivedInternalEvent.listen(
self.seenMsgListener = MessageSeenEvent.listen(
self.brokerCtx,
proc(event: MessageReceivedInternalEvent) {.async: (raises: []).} =
proc(event: MessageSeenEvent) {.async: (raises: []).} =
if not self.subscriptionService.isSubscribed(
event.topic, event.message.contentTopic
):
@ -186,11 +186,11 @@ proc startRecvService*(self: RecvService) =
self.processIncomingMessageOfInterest(event.topic, event.message),
).valueOr:
error "Failed to set MessageReceivedInternalEvent listener", error = error
error "Failed to set MessageSeenEvent listener", error = error
quit(QuitFailure)
proc stopRecvService*(self: RecvService) {.async.} =
MessageReceivedInternalEvent.dropListener(self.brokerCtx, self.internalMsgListener)
MessageSeenEvent.dropListener(self.brokerCtx, self.seenMsgListener)
if not self.msgCheckerHandler.isNil():
await self.msgCheckerHandler.cancelAndWait()
self.msgCheckerHandler = nil

View File

@ -8,7 +8,6 @@ import
waku_relay,
common/broker/broker_context,
events/delivery_events,
node/edge_driver,
]
type SubscriptionService* = ref object of RootObj
@ -16,9 +15,7 @@ type SubscriptionService* = ref object of RootObj
contentTopicSubs: Table[PubsubTopic, HashSet[ContentTopic]]
## Map of Shard to ContentTopic needed because e.g. WakuRelay is PubsubTopic only.
## A present key with an empty HashSet value means pubsubtopic already subscribed
## (via subscribeShard()) but there's no specific content topic interest yet.
filterSubListener: OnFilterSubscribeEventListener
filterUnsubListener: OnFilterUnsubscribeEventListener
## (via subscribePubsubTopics()) but there's no specific content topic interest yet.
proc new*(T: typedesc[SubscriptionService], node: WakuNode): T =
SubscriptionService(
@ -27,74 +24,86 @@ proc new*(T: typedesc[SubscriptionService], node: WakuNode): T =
proc addContentTopicInterest(
self: SubscriptionService, shard: PubsubTopic, topic: ContentTopic
) =
try:
if not self.contentTopicSubs.hasKey(shard):
self.contentTopicSubs[shard] = initHashSet[ContentTopic]()
): Result[void, string] =
if not self.contentTopicSubs.hasKey(shard):
self.contentTopicSubs[shard] = initHashSet[ContentTopic]()
if not self.contentTopicSubs[shard].contains(topic):
self.contentTopicSubs[shard].incl(topic)
self.contentTopicSubs.withValue(shard, cTopics):
if not cTopics[].contains(topic):
cTopics[].incl(topic)
# Always notify EdgeDriver if filter is mounted
if not isNil(self.node.wakuFilterClient):
self.node.edgeDriver.subscribe(shard, topic)
except KeyError:
discard
proc removeContentTopicInterest(
self: SubscriptionService, shard: PubsubTopic, topic: ContentTopic
) =
try:
if self.contentTopicSubs.hasKey(shard) and
self.contentTopicSubs[shard].contains(topic):
self.contentTopicSubs[shard].excl(topic)
# Only delete the shard tracking if we are not running a Relay.
# If Relay is mounted, we keep the empty HashSet to signal the relay shard sub.
if self.contentTopicSubs[shard].len == 0 and isNil(self.node.wakuRelay):
self.contentTopicSubs.del(shard)
if not isNil(self.node.wakuFilterClient):
self.node.edgeDriver.unsubscribe(shard, topic)
except KeyError:
discard
proc startProvidersAndListeners*(self: SubscriptionService): Result[void, string] =
self.filterSubListener = OnFilterSubscribeEvent.listen(
self.node.brokerCtx,
proc(event: OnFilterSubscribeEvent) {.async: (raises: []), gcsafe.} =
for cTopic in event.contentTopics:
self.addContentTopicInterest(event.pubsubTopic, cTopic),
).valueOr:
return
err("SubscriptionService failed to listen to OnFilterSubscribeEvent: " & error)
self.filterUnsubListener = OnFilterUnsubscribeEvent.listen(
self.node.brokerCtx,
proc(event: OnFilterUnsubscribeEvent) {.async: (raises: []), gcsafe.} =
for cTopic in event.contentTopics:
self.removeContentTopicInterest(event.pubsubTopic, cTopic),
).valueOr:
return
err("SubscriptionService failed to listen to OnFilterUnsubscribeEvent: " & error)
# TODO: Call a "subscribe(shard, topic)" on filter client here,
# so the filter client can know that subscriptions changed.
return ok()
proc stopProvidersAndListeners*(self: SubscriptionService) =
OnFilterSubscribeEvent.dropListener(self.node.brokerCtx, self.filterSubListener)
OnFilterUnsubscribeEvent.dropListener(self.node.brokerCtx, self.filterUnsubListener)
proc removeContentTopicInterest(
self: SubscriptionService, shard: PubsubTopic, topic: ContentTopic
): Result[void, string] =
self.contentTopicSubs.withValue(shard, cTopics):
if cTopics[].contains(topic):
cTopics[].excl(topic)
if cTopics[].len == 0 and isNil(self.node.wakuRelay):
self.contentTopicSubs.del(shard) # We're done with cTopics here
# TODO: Call a "unsubscribe(shard, topic)" on filter client here,
# so the filter client can know that subscriptions changed.
return ok()
proc subscribePubsubTopics*(
self: SubscriptionService, shards: seq[PubsubTopic]
): Result[void, string] =
if isNil(self.node.wakuRelay):
return err("subscribeShard requires a Relay")
var errors: seq[string] = @[]
for shard in shards:
if not self.contentTopicSubs.hasKey(shard):
self.node.subscribe((kind: PubsubSub, topic: shard), nil).isOkOr:
errors.add("shard " & shard & ": " & error)
continue
self.contentTopicSubs[shard] = initHashSet[ContentTopic]()
if errors.len > 0:
return err("subscribeShard errors: " & errors.join("; "))
return ok()
proc startSubscriptionService*(self: SubscriptionService) =
if not isNil(self.node.wakuRelay):
if self.node.wakuAutoSharding.isSome():
# Subscribe relay to all shards in autosharding.
let autoSharding = self.node.wakuAutoSharding.get()
let clusterId = autoSharding.clusterId
let numShards = autoSharding.shardCountGenZero
if numShards > 0:
var clusterPubsubTopics = newSeqOfCap[PubsubTopic](numShards)
for i in 0 ..< numShards:
let shardObj = RelayShard(clusterId: clusterId, shardId: uint16(i))
clusterPubsubTopics.add(PubsubTopic($shardObj))
self.subscribePubsubTopics(clusterPubsubTopics).isOkOr:
error "Failed to auto-subscribe Relay to cluster shards: ", error = error
else:
# NOTE: We can't fallback to configured shards when no autosharding here since
# we don't currently have access to Waku.conf here. However, we don't support
# manual/static sharding at the MAPI level anyway so wiring that up now is not needed.
# When we no longer auto-subscribe to all shards in Core boot, we will probably
# scan the shard config due to fleet nodes; then shard conf will have to be reachable here.
# For non-fleet, interactive Core nodes (e.g. Desktop apps) this can't matter
# as much since shard subscriptions originate from subscription to content topics, but
# I guess even in that case subbing to some conf shards may make sense for some apps.
info "SubscriptionService has no AutoSharding for Relay, won't subscribe to shards by default."
proc start*(self: SubscriptionService) =
# TODO: re-enable for MAPI edge support.
#self.startProvidersAndListeners().isOkOr:
# error "Fatal error in SubscriptionService.startProvidersAndListeners(): ",
# error = error
# raise newException(ValueError, "SubscriptionService.start() failed: " & error)
discard
proc stop*(self: SubscriptionService) =
# TODO: re-enable for MAPI edge support.
#self.stopProvidersAndListeners()
proc stopSubscriptionService*(self: SubscriptionService) {.async.} =
discard
proc getActiveSubscriptions*(
@ -139,47 +148,6 @@ proc isSubscribed*(
except KeyError:
discard
proc subscribeShard*(
self: SubscriptionService, shards: seq[PubsubTopic]
): Result[void, string] =
if isNil(self.node.wakuRelay):
return err("subscribeShard requires a Relay")
var errors: seq[string] = @[]
for shard in shards:
if not self.contentTopicSubs.hasKey(shard):
self.node.subscribe((kind: PubsubSub, topic: shard), nil).isOkOr:
errors.add("shard " & shard & ": " & error)
continue
self.contentTopicSubs[shard] = initHashSet[ContentTopic]()
if errors.len > 0:
return err("subscribeShard errors: " & errors.join("; "))
return ok()
proc unsubscribeShard*(
self: SubscriptionService, shards: seq[PubsubTopic]
): Result[void, string] =
if isNil(self.node.wakuRelay):
return err("unsubscribeShard requires a Relay")
var errors: seq[string] = @[]
for shard in shards:
if self.contentTopicSubs.hasKey(shard):
self.node.unsubscribe((kind: PubsubUnsub, topic: shard)).isOkOr:
errors.add("shard " & shard & ": " & error)
self.contentTopicSubs.del(shard)
if errors.len > 0:
return err("unsubscribeShard errors: " & errors.join("; "))
return ok()
proc subscribe*(self: SubscriptionService, topic: ContentTopic): Result[void, string] =
if isNil(self.node.wakuRelay) and isNil(self.node.wakuFilterClient):
return err("SubscriptionService requires either Relay or Filter Client.")
@ -187,9 +155,9 @@ proc subscribe*(self: SubscriptionService, topic: ContentTopic): Result[void, st
let shard = ?self.getShardForContentTopic(topic)
if not isNil(self.node.wakuRelay) and not self.contentTopicSubs.hasKey(shard):
?self.subscribeShard(@[shard])
?self.subscribePubsubTopics(@[shard])
self.addContentTopicInterest(shard, topic)
?self.addContentTopicInterest(shard, topic)
return ok()
@ -202,6 +170,6 @@ proc unsubscribe*(
let shard = ?self.getShardForContentTopic(topic)
if self.isSubscribed(shard, topic):
self.removeContentTopicInterest(shard, topic)
?self.removeContentTopicInterest(shard, topic)
return ok()

View File

@ -1,3 +0,0 @@
import ./edge_driver/edge_driver
export edge_driver

View File

@ -1,28 +0,0 @@
{.push raises: [].}
import chronicles, waku/waku_core/topics
# Plan:
# - drive the continuous fulfillment and healing of edge peering and topic subscriptions
# - offload the edgeXXX stuff from WakuNode into this and finish it
type EdgeDriver* = ref object of RootObj # TODO: bg worker, ...
proc new*(T: typedesc[EdgeDriver]): T =
return EdgeDriver()
proc start*(self: EdgeDriver) =
# TODO
debug "TODO: EdgeDriver: start bg worker"
proc stop*(self: EdgeDriver) =
# TODO
debug "TODO: EdgeDriver: stop bg worker"
proc subscribe*(self: EdgeDriver, shard: PubsubTopic, topic: ContentTopic) =
# TODO: this is an event that can be used to drive an event-driven edge health checker
debug "TODO: EdgeDriver: got subscribe notification", shard = shard, topic = topic
proc unsubscribe*(self: EdgeDriver, shard: PubsubTopic, topic: ContentTopic) =
# TODO: this is an event that can be used to drive an event-driven edge health checker
debug "TODO: EdgeDriver: got unsubscribe notification", shard = shard, topic = topic

View File

@ -98,7 +98,7 @@ proc registerRelayHandler(
node.wakuStoreReconciliation.messageIngress(topic, msg)
proc internalHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
MessageReceivedInternalEvent.emit(node.brokerCtx, topic, msg)
MessageSeenEvent.emit(node.brokerCtx, topic, msg)
let uniqueTopicHandler = proc(
topic: PubsubTopic, msg: WakuMessage
@ -110,7 +110,7 @@ proc registerRelayHandler(
await internalHandler(topic, msg)
# Call the legacy (kernel API) app handler if it exists.
# Normally, hasKey is false and the MessageReceivedInternalEvent bus (new API) is used instead.
# Normally, hasKey is false and the MessageSeenEvent bus (new API) is used instead.
# But we need to support legacy behavior (kernel API use), hence this.
# NOTE: We can delete `legacyAppHandlers` if instead we refactor WakuRelay to support multiple
# PubsubTopic handlers, since that's actually supported by libp2p PubSub (bigger refactor...)

View File

@ -69,7 +69,6 @@ import
waku/discovery/waku_kademlia,
./net_config,
./peer_manager,
./edge_driver,
./health_monitor/health_status,
./health_monitor/topic_health
@ -115,7 +114,6 @@ type
WakuNode* = ref object
peerManager*: PeerManager
switch*: Switch
edgeDriver*: EdgeDriver
wakuRelay*: WakuRelay
wakuArchive*: waku_archive.WakuArchive
wakuLegacyArchive*: waku_archive_legacy.WakuArchive
@ -235,7 +233,6 @@ proc new*(
let node = WakuNode(
peerManager: peerManager,
switch: switch,
edgeDriver: EdgeDriver.new(),
rng: rng,
brokerCtx: brokerCtx,
enr: enr,