Implement sub/unsub/receive

* Apply insights and constraints from early reviews
* Add support for Core/Relay sub/unsub/receive
* WIP Edge support (new EdgeDriver placeholder)
* Hook MessageReceivedInternalEvent to Kernel API bus
* Fix MAPI vs Kernel API unique relay handler support
* RecvService delegating topic subs to SubscriptionService
* SubscriptionService abstracts Core vs. Edge switching
* RecvService emits MessageReceivedEvent (fully filtered)
* Delete SubscriptionService.shardSubs
* Track relay shard sub with empty contentTopicSubs val
* Ensure relay shards never unsubscribed for Core for now
This commit is contained in:
Fabiana Cecin 2026-02-24 19:48:34 -03:00
parent 4fc13e9afc
commit fd6370f3fa
No known key found for this signature in database
GPG Key ID: BCAB8A55CB51B6C7
9 changed files with 472 additions and 220 deletions

View File

@ -6,10 +6,20 @@ import libp2p/[peerid, peerinfo, multiaddress, crypto/crypto]
import ../testlib/[common, wakucore, wakunode, testasync]
import
waku, waku/[waku_node, waku_core, common/broker/broker_context, events/message_events]
waku,
waku/[
waku_node,
waku_core,
common/broker/broker_context,
events/message_events,
waku_relay/protocol,
]
import waku/api/api_conf, waku/factory/waku_conf
# TODO: Edge testing (after EdgeDriver is completed)
const TestTimeout = chronos.seconds(10)
const NegativeTestTimeout = chronos.seconds(2)
const DefaultShard = PubsubTopic("/waku/2/rs/1/0")
type ReceiveEventListenerManager = ref object
@ -69,26 +79,25 @@ proc setupSubscriberNode(conf: NodeConfig): Future[Waku] {.async.} =
(await startWaku(addr node)).expect("Failed to start subscriber node")
return node
proc waitForMesh*(node: WakuNode, shard: PubsubTopic) {.async.} =
for _ in 0 ..< 50:
if node.wakuRelay.getNumPeersInMesh(shard).valueOr(0) > 0:
return
await sleepAsync(100.milliseconds)
raise newException(ValueError, "GossipSub Mesh failed to stabilize")
proc publishWhenMeshReady(
publisher: WakuNode,
pubsubTopic: PubsubTopic,
contentTopic: ContentTopic,
payload: seq[byte],
maxRetries: int = 50,
retryDelay: Duration = 200.milliseconds,
): Future[Result[int, string]] {.async.} =
for _ in 0 ..< maxRetries:
let msg = WakuMessage(
payload: payload, contentTopic: contentTopic, version: 0, timestamp: now()
)
await waitForMesh(publisher, pubsubTopic)
let publishRes = await publisher.publish(some(pubsubTopic), msg)
if publishRes.isOk() and publishRes.value > 0:
return publishRes
await sleepAsync(retryDelay)
return err("Timed out waiting for mesh")
let msg = WakuMessage(
payload: payload, contentTopic: contentTopic, version: 0, timestamp: now()
)
return await publisher.publish(some(pubsubTopic), msg)
suite "Messaging API, SubscriptionService":
var
@ -111,9 +120,7 @@ suite "Messaging API, SubscriptionService":
publisherPeerInfo = publisherNode.peerInfo.toRemotePeerInfo()
publisherPeerId = publisherNode.peerInfo.peerId
proc dummyHandler(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
proc dummyHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
discard
publisherNode.subscribe((kind: PubsubSub, topic: DefaultShard), dummyHandler).expect(
@ -142,19 +149,171 @@ suite "Messaging API, SubscriptionService":
defer:
eventManager.teardown()
const testMessageStr = "Hello, world!"
let msgPayload = testMessageStr.toBytes()
discard (
await publishWhenMeshReady(
publisherNode, DefaultShard, testTopic, "Hello, world!".toBytes()
)
).expect("Publish failed")
require await eventManager.waitForEvents(TestTimeout)
require eventManager.receivedMessages.len == 1
check eventManager.receivedMessages[0].contentTopic == testTopic
asyncTest "Subscription API, relay node ignores unsubscribed content topics on same shard":
subscriberNode = await setupSubscriberNode(createApiNodeConf(WakuMode.Core))
await subscriberNode.node.connectToNodes(@[publisherPeerInfo])
let subbedTopic = ContentTopic("/waku/2/subbed-topic/proto")
let ignoredTopic = ContentTopic("/waku/2/ignored-topic/proto")
(await subscriberNode.subscribe(subbedTopic)).expect("failed to subscribe")
let eventManager = newReceiveEventListenerManager(subscriberNode.brokerCtx, 1)
defer:
eventManager.teardown()
discard (
await publishWhenMeshReady(publisherNode, DefaultShard, testTopic, msgPayload)
).expect("Timed out waiting for mesh to stabilize")
await publishWhenMeshReady(
publisherNode, DefaultShard, ignoredTopic, "Ghost Msg".toBytes()
)
).expect("Publish failed")
let receivedInTime = await eventManager.waitForEvents(TestTimeout)
check not await eventManager.waitForEvents(NegativeTestTimeout)
check eventManager.receivedMessages.len == 0
# Hard abort if these conditions aren't met to prevent an IndexDefect below
require receivedInTime
asyncTest "Subscription API, relay node unsubscribe stops message receipt":
subscriberNode = await setupSubscriberNode(createApiNodeConf(WakuMode.Core))
await subscriberNode.node.connectToNodes(@[publisherPeerInfo])
let testTopic = ContentTopic("/waku/2/unsub-test/proto")
(await subscriberNode.subscribe(testTopic)).expect("failed to subscribe")
subscriberNode.unsubscribe(testTopic).expect("failed to unsubscribe")
let eventManager = newReceiveEventListenerManager(subscriberNode.brokerCtx, 1)
defer:
eventManager.teardown()
discard (
await publishWhenMeshReady(
publisherNode, DefaultShard, testTopic, "Should be dropped".toBytes()
)
).expect("Publish failed")
check not await eventManager.waitForEvents(NegativeTestTimeout)
check eventManager.receivedMessages.len == 0
asyncTest "Subscription API, overlapping topics on same shard maintain correct isolation":
subscriberNode = await setupSubscriberNode(createApiNodeConf(WakuMode.Core))
await subscriberNode.node.connectToNodes(@[publisherPeerInfo])
let topicA = ContentTopic("/waku/2/topic-a/proto")
let topicB = ContentTopic("/waku/2/topic-b/proto")
(await subscriberNode.subscribe(topicA)).expect("failed to sub A")
(await subscriberNode.subscribe(topicB)).expect("failed to sub B")
let eventManager = newReceiveEventListenerManager(subscriberNode.brokerCtx, 1)
defer:
eventManager.teardown()
await waitForMesh(publisherNode, DefaultShard)
subscriberNode.unsubscribe(topicA).expect("failed to unsub A")
discard (
await publisherNode.publish(
some(DefaultShard),
WakuMessage(
payload: "Dropped Message".toBytes(),
contentTopic: topicA,
version: 0,
timestamp: now(),
),
)
).expect("Publish A failed")
discard (
await publisherNode.publish(
some(DefaultShard),
WakuMessage(
payload: "Kept Msg".toBytes(),
contentTopic: topicB,
version: 0,
timestamp: now(),
),
)
).expect("Publish B failed")
require await eventManager.waitForEvents(TestTimeout)
require eventManager.receivedMessages.len == 1
check eventManager.receivedMessages[0].contentTopic == topicB
let receivedMsg = eventManager.receivedMessages[0]
check receivedMsg.contentTopic == testTopic
check string.fromBytes(receivedMsg.payload) == testMessageStr
asyncTest "Subscription API, redundant subs tolerated and subs are removed":
subscriberNode = await setupSubscriberNode(createApiNodeConf(WakuMode.Core))
await subscriberNode.node.connectToNodes(@[publisherPeerInfo])
let glitchTopic = ContentTopic("/waku/2/glitch/proto")
(await subscriberNode.subscribe(glitchTopic)).expect("failed to sub")
(await subscriberNode.subscribe(glitchTopic)).expect("failed to double sub")
subscriberNode.unsubscribe(glitchTopic).expect("failed to unsub")
let eventManager = newReceiveEventListenerManager(subscriberNode.brokerCtx, 1)
defer:
eventManager.teardown()
discard (
await publishWhenMeshReady(
publisherNode, DefaultShard, glitchTopic, "Ghost Msg".toBytes()
)
).expect("Publish failed")
check not await eventManager.waitForEvents(NegativeTestTimeout)
check eventManager.receivedMessages.len == 0
asyncTest "Subscription API, resubscribe to an unsubscribed topic":
subscriberNode = await setupSubscriberNode(createApiNodeConf(WakuMode.Core))
await subscriberNode.node.connectToNodes(@[publisherPeerInfo])
let testTopic = ContentTopic("/waku/2/resub-test/proto")
# Subscribe
(await subscriberNode.subscribe(testTopic)).expect("Initial sub failed")
var eventManager = newReceiveEventListenerManager(subscriberNode.brokerCtx, 1)
discard (
await publishWhenMeshReady(
publisherNode, DefaultShard, testTopic, "Msg 1".toBytes()
)
).expect("Pub 1 failed")
require await eventManager.waitForEvents(TestTimeout)
eventManager.teardown()
# Unsubscribe and verify teardown
subscriberNode.unsubscribe(testTopic).expect("Unsub failed")
eventManager = newReceiveEventListenerManager(subscriberNode.brokerCtx, 1)
discard (
await publisherNode.publish(
some(DefaultShard),
WakuMessage(
payload: "Ghost".toBytes(),
contentTopic: testTopic,
version: 0,
timestamp: now(),
),
)
).expect("Ghost pub failed")
check not await eventManager.waitForEvents(NegativeTestTimeout)
eventManager.teardown()
# Resubscribe
(await subscriberNode.subscribe(testTopic)).expect("Resub failed")
eventManager = newReceiveEventListenerManager(subscriberNode.brokerCtx, 1)
discard (
await publishWhenMeshReady(
publisherNode, DefaultShard, testTopic, "Msg 2".toBytes()
)
).expect("Pub 2 failed")
require await eventManager.waitForEvents(TestTimeout)
check eventManager.receivedMessages[0].payload == "Msg 2".toBytes()

View File

@ -53,9 +53,9 @@ proc send*(
.valueOr(false)
if not isSubbed:
info "Auto-subscribing to topic on send", contentTopic = envelope.contentTopic
let subRes = w.deliveryService.subscriptionService.subscribe(envelope.contentTopic)
if subRes.isErr():
warn "Failed to auto-subscribe", error = subRes.error
w.deliveryService.subscriptionService.subscribe(envelope.contentTopic).isOkOr:
warn "Failed to auto-subscribe", error = error
return err("Failed to auto-subscribe before sending: " & error)
let requestId = RequestId.new(w.rng)

View File

@ -1,6 +1,4 @@
import waku/common/broker/event_broker
import waku/api/types
import waku/waku_core/message
import waku/[api/types, waku_core/message, waku_core/topics, common/broker/event_broker]
export types
@ -28,3 +26,9 @@ EventBroker:
type MessageReceivedEvent* = object
messageHash*: string
message*: WakuMessage
EventBroker:
# Internal event emitted when a message arrives from the network via any protocol
type MessageReceivedInternalEvent* = object
topic*: PubsubTopic
message*: WakuMessage

View File

@ -2,7 +2,7 @@
## receive and is backed by store-v3 requests to get an additional degree of certainty
##
import std/[tables, sequtils, options]
import std/[tables, sequtils, options, sets]
import chronos, chronicles, libp2p/utility
import ../[subscription_service]
import
@ -13,6 +13,7 @@ import
waku_filter_v2/client,
waku_core/topics,
events/delivery_events,
events/message_events,
waku_node,
common/broker/broker_context,
]
@ -35,13 +36,8 @@ type RecvMessage = object
type RecvService* = ref object of RootObj
brokerCtx: BrokerContext
topicsInterest: Table[PubsubTopic, seq[ContentTopic]]
## Tracks message verification requests and when was the last time a
## pubsub topic was verified for missing messages
## The key contains pubsub-topics
node: WakuNode
onSubscribeListener: OnFilterSubscribeEventListener
onUnsubscribeListener: OnFilterUnsubscribeEventListener
internalMsgListener: MessageReceivedInternalEventListener
subscriptionService: SubscriptionService
recentReceivedMsgs: seq[RecvMessage]
@ -95,20 +91,20 @@ proc msgChecker(self: RecvService) {.async.} =
self.endTimeToCheck = getNowInNanosecondTime()
var msgHashesInStore = newSeq[WakuMessageHash](0)
for pubsubTopic, cTopics in self.topicsInterest.pairs:
for sub in self.subscriptionService.getActiveSubscriptions():
let storeResp: StoreQueryResponse = (
await self.node.wakuStoreClient.queryToAny(
StoreQueryRequest(
includeData: false,
pubsubTopic: some(PubsubTopic(pubsubTopic)),
contentTopics: cTopics,
pubsubTopic: some(PubsubTopic(sub.pubsubTopic)),
contentTopics: sub.contentTopics,
startTime: some(self.startTimeToCheck - DelayExtra.nanos),
endTime: some(self.endTimeToCheck + DelayExtra.nanos),
)
)
).valueOr:
error "msgChecker failed to get remote msgHashes",
pubsubTopic, cTopics, error = $error
pubsubTopic = sub.pubsubTopic, cTopics = sub.contentTopics, error = $error
continue
msgHashesInStore.add(storeResp.messages.mapIt(it.messageHash))
@ -133,29 +129,18 @@ proc msgChecker(self: RecvService) {.async.} =
## update next check times
self.startTimeToCheck = self.endTimeToCheck
proc onSubscribe(
self: RecvService, pubsubTopic: string, contentTopics: seq[string]
) {.gcsafe, raises: [].} =
info "onSubscribe", pubsubTopic, contentTopics
self.topicsInterest.withValue(pubsubTopic, contentTopicsOfInterest):
contentTopicsOfInterest[].add(contentTopics)
do:
self.topicsInterest[pubsubTopic] = contentTopics
proc processIncomingMessageOfInterest(
self: RecvService, pubsubTopic: string, message: WakuMessage
) =
## Resolve an incoming network message that was already filtered by topic.
## Deduplicate (by hash), store (saves in recently-seen messages) and emit
## the MAPI MessageReceivedEvent for every unique incoming message.
proc onUnsubscribe(
self: RecvService, pubsubTopic: string, contentTopics: seq[string]
) {.gcsafe, raises: [].} =
info "onUnsubscribe", pubsubTopic, contentTopics
self.topicsInterest.withValue(pubsubTopic, contentTopicsOfInterest):
let remainingCTopics =
contentTopicsOfInterest[].filterIt(not contentTopics.contains(it))
contentTopicsOfInterest[] = remainingCTopics
if remainingCTopics.len == 0:
self.topicsInterest.del(pubsubTopic)
do:
error "onUnsubscribe unsubscribing from wrong topic", pubsubTopic, contentTopics
let msgHash = computeMessageHash(pubsubTopic, message)
if not self.recentReceivedMsgs.anyIt(it.msgHash == msgHash):
let rxMsg = RecvMessage(msgHash: msgHash, rxTime: message.timestamp)
self.recentReceivedMsgs.add(rxMsg)
MessageReceivedEvent.emit(self.brokerCtx, msgHash.to0xHex(), message)
proc new*(T: typedesc[RecvService], node: WakuNode, s: SubscriptionService): T =
## The storeClient will help to acquire any possible missed messages
@ -166,7 +151,6 @@ proc new*(T: typedesc[RecvService], node: WakuNode, s: SubscriptionService): T =
startTimeToCheck: now,
brokerCtx: node.brokerCtx,
subscriptionService: s,
topicsInterest: initTable[PubsubTopic, seq[ContentTopic]](),
recentReceivedMsgs: @[],
)
@ -175,10 +159,7 @@ proc new*(T: typedesc[RecvService], node: WakuNode, s: SubscriptionService): T =
pubsubTopic: PubsubTopic, message: WakuMessage
) {.async, closure.} =
## Captures all the messages received through filter
let msgHash = computeMessageHash(pubSubTopic, message)
let rxMsg = RecvMessage(msgHash: msgHash, rxTime: message.timestamp)
recvService.recentReceivedMsgs.add(rxMsg)
recvService.processIncomingMessageOfInterest(pubSubTopic, message)
node.wakuFilterClient.registerPushHandler(filterPushHandler)
@ -194,25 +175,21 @@ proc startRecvService*(self: RecvService) =
self.msgCheckerHandler = self.msgChecker()
self.msgPrunerHandler = self.loopPruneOldMessages()
self.onSubscribeListener = OnFilterSubscribeEvent.listen(
self.internalMsgListener = MessageReceivedInternalEvent.listen(
self.brokerCtx,
proc(subsEv: OnFilterSubscribeEvent) {.async: (raises: []).} =
self.onSubscribe(subsEv.pubsubTopic, subsEv.contentTopics),
).valueOr:
error "Failed to set OnFilterSubscribeEvent listener", error = error
quit(QuitFailure)
proc(event: MessageReceivedInternalEvent) {.async: (raises: []).} =
if not self.subscriptionService.isSubscribed(
event.topic, event.message.contentTopic
):
return
self.onUnsubscribeListener = OnFilterUnsubscribeEvent.listen(
self.brokerCtx,
proc(subsEv: OnFilterUnsubscribeEvent) {.async: (raises: []).} =
self.onUnsubscribe(subsEv.pubsubTopic, subsEv.contentTopics),
self.processIncomingMessageOfInterest(event.topic, event.message),
).valueOr:
error "Failed to set OnFilterUnsubscribeEvent listener", error = error
error "Failed to set MessageReceivedInternalEvent listener", error = error
quit(QuitFailure)
proc stopRecvService*(self: RecvService) {.async.} =
OnFilterSubscribeEvent.dropListener(self.brokerCtx, self.onSubscribeListener)
OnFilterUnsubscribeEvent.dropListener(self.brokerCtx, self.onUnsubscribeListener)
MessageReceivedInternalEvent.dropListener(self.brokerCtx, self.internalMsgListener)
if not self.msgCheckerHandler.isNil():
await self.msgCheckerHandler.cancelAndWait()
if not self.msgPrunerHandler.isNil():

View File

@ -4,39 +4,109 @@ import
waku_core,
waku_core/topics,
waku_core/topics/sharding,
events/message_events,
waku_node,
waku_relay,
common/broker/broker_context,
events/delivery_events,
node/edge_driver,
]
type SubscriptionService* = ref object of RootObj
node: WakuNode
shardSubs: HashSet[PubsubTopic]
contentTopicSubs: Table[PubsubTopic, HashSet[ContentTopic]]
relayHandler: WakuRelayHandler
## Map of Shard to ContentTopic needed because e.g. WakuRelay is PubsubTopic only.
## A present key with an empty HashSet value means pubsubtopic already subscribed
## (via subscribeShard()) but there's no specific content topic interest yet.
filterSubListener: OnFilterSubscribeEventListener
filterUnsubListener: OnFilterUnsubscribeEventListener
proc new*(T: typedesc[SubscriptionService], node: WakuNode): T =
let service = SubscriptionService(
node: node,
shardSubs: initHashSet[PubsubTopic](),
contentTopicSubs: initTable[PubsubTopic, HashSet[ContentTopic]](),
SubscriptionService(
node: node, contentTopicSubs: initTable[PubsubTopic, HashSet[ContentTopic]]()
)
service.relayHandler = proc(
topic: PubsubTopic, msg: WakuMessage
) {.async.} =
if not service.contentTopicSubs.hasKey(topic) or
not service.contentTopicSubs[topic].contains(msg.contentTopic):
return
proc addContentTopicInterest(
self: SubscriptionService, shard: PubsubTopic, topic: ContentTopic
) =
try:
if not self.contentTopicSubs.hasKey(shard):
self.contentTopicSubs[shard] = initHashSet[ContentTopic]()
let msgHash = computeMessageHash(topic, msg).to0xHex()
info "MessageReceivedEvent",
pubsubTopic = topic, contentTopic = msg.contentTopic, msgHash = msgHash
if not self.contentTopicSubs[shard].contains(topic):
self.contentTopicSubs[shard].incl(topic)
MessageReceivedEvent.emit(service.node.brokerCtx, msgHash, msg)
# Always notify EdgeDriver if filter is mounted
if not isNil(self.node.wakuFilterClient):
self.node.edgeDriver.subscribe(shard, topic)
except KeyError:
discard
return service
proc removeContentTopicInterest(
self: SubscriptionService, shard: PubsubTopic, topic: ContentTopic
) =
try:
if self.contentTopicSubs.hasKey(shard) and
self.contentTopicSubs[shard].contains(topic):
self.contentTopicSubs[shard].excl(topic)
# Only delete the shard tracking if we are not running a Relay.
# If Relay is mounted, we keep the empty HashSet to signal the relay shard sub.
if self.contentTopicSubs[shard].len == 0 and isNil(self.node.wakuRelay):
self.contentTopicSubs.del(shard)
if not isNil(self.node.wakuFilterClient):
self.node.edgeDriver.unsubscribe(shard, topic)
except KeyError:
discard
proc startProvidersAndListeners*(self: SubscriptionService): Result[void, string] =
self.filterSubListener = OnFilterSubscribeEvent.listen(
self.node.brokerCtx,
proc(event: OnFilterSubscribeEvent) {.async: (raises: []), gcsafe.} =
for cTopic in event.contentTopics:
self.addContentTopicInterest(event.pubsubTopic, cTopic),
).valueOr:
return
err("SubscriptionService failed to listen to OnFilterSubscribeEvent: " & error)
self.filterUnsubListener = OnFilterUnsubscribeEvent.listen(
self.node.brokerCtx,
proc(event: OnFilterUnsubscribeEvent) {.async: (raises: []), gcsafe.} =
for cTopic in event.contentTopics:
self.removeContentTopicInterest(event.pubsubTopic, cTopic),
).valueOr:
return
err("SubscriptionService failed to listen to OnFilterUnsubscribeEvent: " & error)
return ok()
proc stopProvidersAndListeners*(self: SubscriptionService) =
OnFilterSubscribeEvent.dropListener(self.node.brokerCtx, self.filterSubListener)
OnFilterUnsubscribeEvent.dropListener(self.node.brokerCtx, self.filterUnsubListener)
proc start*(self: SubscriptionService) =
self.startProvidersAndListeners().isOkOr:
error "Fatal error in SubscriptionService.startProvidersAndListeners(): ",
error = error
raise newException(ValueError, "SubscriptionService.start() failed: " & error)
proc stop*(self: SubscriptionService) =
self.stopProvidersAndListeners()
proc getActiveSubscriptions*(
self: SubscriptionService
): seq[tuple[pubsubTopic: string, contentTopics: seq[ContentTopic]]] =
var activeSubs: seq[tuple[pubsubTopic: string, contentTopics: seq[ContentTopic]]] =
@[]
for pubsub, cTopicSet in self.contentTopicSubs.pairs:
if cTopicSet.len > 0:
var cTopicSeq = newSeqOfCap[ContentTopic](cTopicSet.len)
for t in cTopicSet:
cTopicSeq.add(t)
activeSubs.add((pubsub, cTopicSeq))
return activeSubs
proc getShardForContentTopic(
self: SubscriptionService, topic: ContentTopic
@ -45,123 +115,89 @@ proc getShardForContentTopic(
let shardObj = ?self.node.wakuAutoSharding.get().getShard(topic)
return ok($shardObj)
return
err("Manual sharding is not supported in this API. Autosharding must be enabled.")
proc doSubscribe(self: SubscriptionService, shard: PubsubTopic): Result[void, string] =
self.node.subscribe((kind: PubsubSub, topic: shard), self.relayHandler).isOkOr:
error "Failed to subscribe to Relay shard", shard = shard, error = error
return err("Failed to subscribe: " & error)
return ok()
proc doUnsubscribe(
self: SubscriptionService, shard: PubsubTopic
): Result[void, string] =
self.node.unsubscribe((kind: PubsubUnsub, topic: shard)).isOkOr:
error "Failed to unsubscribe from Relay shard", shard = shard, error = error
return err("Failed to unsubscribe: " & error)
return ok()
return err("SubscriptionService requires AutoSharding")
proc isSubscribed*(
self: SubscriptionService, topic: ContentTopic
): Result[bool, string] =
if self.node.wakuRelay.isNil():
return err("SubscriptionService currently only supports Relay (Core) mode.")
let shard = ?self.getShardForContentTopic(topic)
return ok(
self.contentTopicSubs.hasKey(shard) and self.contentTopicSubs[shard].contains(topic)
)
if self.contentTopicSubs.hasKey(shard) and self.contentTopicSubs[shard].contains(
topic
):
return ok(true)
return ok(false)
proc subscribe*(self: SubscriptionService, topic: ContentTopic): Result[void, string] =
if self.node.wakuRelay.isNil():
return err("SubscriptionService currently only supports Relay (Core) mode.")
let shard = ?self.getShardForContentTopic(topic)
let needShardSub =
not self.shardSubs.contains(shard) and not self.contentTopicSubs.hasKey(shard)
if needShardSub:
?self.doSubscribe(shard)
self.contentTopicSubs.mgetOrPut(shard, initHashSet[ContentTopic]()).incl(topic)
return ok()
proc unsubscribe*(
self: SubscriptionService, topic: ContentTopic
): Result[void, string] =
if self.node.wakuRelay.isNil():
return err("SubscriptionService currently only supports Relay (Core) mode.")
let shard = ?self.getShardForContentTopic(topic)
if self.contentTopicSubs.hasKey(shard) and self.contentTopicSubs[shard].contains(
topic
):
let isLastTopic = self.contentTopicSubs[shard].len == 1
let needShardUnsub = isLastTopic and not self.shardSubs.contains(shard)
if needShardUnsub:
?self.doUnsubscribe(shard)
self.contentTopicSubs[shard].excl(topic)
if self.contentTopicSubs[shard].len == 0:
self.contentTopicSubs.del(shard)
return ok()
proc isSubscribed*(
self: SubscriptionService, shard: PubsubTopic, contentTopic: ContentTopic
): bool {.raises: [].} =
try:
return
self.contentTopicSubs.hasKey(shard) and
self.contentTopicSubs[shard].contains(contentTopic)
except KeyError:
discard
proc subscribeShard*(
self: SubscriptionService, shards: seq[PubsubTopic]
): Result[void, string] =
if self.node.wakuRelay.isNil():
return err("SubscriptionService currently only supports Relay (Core) mode.")
if isNil(self.node.wakuRelay):
return err("subscribeShard requires a Relay")
var errors: seq[string] = @[]
for shard in shards:
if not self.shardSubs.contains(shard):
let needShardSub = not self.contentTopicSubs.hasKey(shard)
if not self.contentTopicSubs.hasKey(shard):
self.node.subscribe((kind: PubsubSub, topic: shard), nil).isOkOr:
errors.add("shard " & shard & ": " & error)
continue
if needShardSub:
let res = self.doSubscribe(shard)
if res.isErr():
errors.add("Shard " & shard & " failed: " & res.error)
continue
self.shardSubs.incl(shard)
self.contentTopicSubs[shard] = initHashSet[ContentTopic]()
if errors.len > 0:
return err("Batch subscribe had errors: " & errors.join("; "))
return err("subscribeShard errors: " & errors.join("; "))
return ok()
proc unsubscribeShard*(
self: SubscriptionService, shards: seq[PubsubTopic]
): Result[void, string] =
if self.node.wakuRelay.isNil():
return err("SubscriptionService currently only supports Relay (Core) mode.")
if isNil(self.node.wakuRelay):
return err("unsubscribeShard requires a Relay")
var errors: seq[string] = @[]
for shard in shards:
if self.shardSubs.contains(shard):
let needShardUnsub = not self.contentTopicSubs.hasKey(shard)
if self.contentTopicSubs.hasKey(shard):
self.node.unsubscribe((kind: PubsubUnsub, topic: shard)).isOkOr:
errors.add("shard " & shard & ": " & error)
if needShardUnsub:
let res = self.doUnsubscribe(shard)
if res.isErr():
errors.add("Shard " & shard & " failed: " & res.error)
continue
self.shardSubs.excl(shard)
self.contentTopicSubs.del(shard)
if errors.len > 0:
return err("Batch unsubscribe had errors: " & errors.join("; "))
return err("unsubscribeShard errors: " & errors.join("; "))
return ok()
proc subscribe*(self: SubscriptionService, topic: ContentTopic): Result[void, string] =
if isNil(self.node.wakuRelay) and isNil(self.node.wakuFilterClient):
return err("SubscriptionService requires either Relay or Filter Client.")
let shard = ?self.getShardForContentTopic(topic)
if not isNil(self.node.wakuRelay) and not self.contentTopicSubs.hasKey(shard):
?self.subscribeShard(@[shard])
self.addContentTopicInterest(shard, topic)
return ok()
proc unsubscribe*(
self: SubscriptionService, topic: ContentTopic
): Result[void, string] =
if isNil(self.node.wakuRelay) and isNil(self.node.wakuFilterClient):
return err("SubscriptionService requires either Relay or Filter Client.")
let shard = ?self.getShardForContentTopic(topic)
if self.isSubscribed(shard, topic):
self.removeContentTopicInterest(shard, topic)
return ok()

View File

@ -0,0 +1,3 @@
import ./edge_driver/edge_driver
export edge_driver

View File

@ -0,0 +1,28 @@
{.push raises: [].}
import chronicles, waku/waku_core/topics
# Plan:
# - drive the continuous fulfillment and healing of edge peering and topic subscriptions
# - offload the edgeXXX stuff from WakuNode into this and finish it
type EdgeDriver* = ref object of RootObj # TODO: bg worker, ...
proc new*(T: typedesc[EdgeDriver]): T =
return EdgeDriver()
proc start*(self: EdgeDriver) =
# TODO
debug "TODO: EdgeDriver: start bg worker"
proc stop*(self: EdgeDriver) =
# TODO
debug "TODO: EdgeDriver: stop bg worker"
proc subscribe*(self: EdgeDriver, shard: PubsubTopic, topic: ContentTopic) =
# TODO: this is an event that can be used to drive an event-driven edge health checker
debug "TODO: EdgeDriver: got subscribe notification", shard = shard, topic = topic
proc unsubscribe*(self: EdgeDriver, shard: PubsubTopic, topic: ContentTopic) =
# TODO: this is an event that can be used to drive an event-driven edge health checker
debug "TODO: EdgeDriver: got unsubscribe notification", shard = shard, topic = topic

View File

@ -19,16 +19,20 @@ import
libp2p/utility
import
../waku_node,
../../waku_relay,
../../waku_core,
../../waku_core/topics/sharding,
../../waku_filter_v2,
../../waku_archive_legacy,
../../waku_archive,
../../waku_store_sync,
../peer_manager,
../../waku_rln_relay
waku/[
waku_relay,
waku_core,
waku_core/topics/sharding,
waku_filter_v2,
waku_archive_legacy,
waku_archive,
waku_store_sync,
waku_rln_relay,
node/waku_node,
node/peer_manager,
common/broker/broker_context,
events/message_events,
]
export waku_relay.WakuRelayHandler
@ -44,14 +48,25 @@ logScope:
## Waku relay
proc registerRelayHandler(
node: WakuNode, topic: PubsubTopic, appHandler: WakuRelayHandler
) =
node: WakuNode, topic: PubsubTopic, appHandler: WakuRelayHandler = nil
): bool =
## Registers the only handler for the given topic.
## Notice that this handler internally calls other handlers, such as filter,
## archive, etc, plus the handler provided by the application.
## Returns `true` if a mesh subscription was created or `false` if the relay
## was already subscribed to the topic.
if node.wakuRelay.isSubscribed(topic):
return
let alreadySubscribed = node.wakuRelay.isSubscribed(topic)
if not appHandler.isNil():
if not alreadySubscribed or not node.legacyAppHandlers.hasKey(topic):
node.legacyAppHandlers[topic] = appHandler
else:
debug "Legacy appHandler already exists for active PubsubTopic, ignoring new handler",
topic = topic
if alreadySubscribed:
return false
proc traceHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
let msgSizeKB = msg.payload.len / 1000
@ -82,6 +97,9 @@ proc registerRelayHandler(
node.wakuStoreReconciliation.messageIngress(topic, msg)
proc internalHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
MessageReceivedInternalEvent.emit(node.brokerCtx, topic, msg)
let uniqueTopicHandler = proc(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
@ -89,7 +107,15 @@ proc registerRelayHandler(
await filterHandler(topic, msg)
await archiveHandler(topic, msg)
await syncHandler(topic, msg)
await appHandler(topic, msg)
await internalHandler(topic, msg)
# Call the legacy (kernel API) app handler if it exists.
# Normally, hasKey is false and the MessageReceivedInternalEvent bus (new API) is used instead.
# But we need to support legacy behavior (kernel API use), hence this.
# NOTE: We can delete `legacyAppHandlers` if instead we refactor WakuRelay to support multiple
# PubsubTopic handlers, since that's actually supported by libp2p PubSub (bigger refactor...)
if node.legacyAppHandlers.hasKey(topic) and not node.legacyAppHandlers[topic].isNil():
await node.legacyAppHandlers[topic](topic, msg)
node.wakuRelay.subscribe(topic, uniqueTopicHandler)
@ -115,8 +141,11 @@ proc subscribe*(
): Result[void, string] =
## Subscribes to a PubSub or Content topic. Triggers handler when receiving messages on
## this topic. WakuRelayHandler is a method that takes a topic and a Waku message.
## If `handler` is nil, the API call will subscribe to the topic in the relay mesh
## but no app handler will be registered at this time (it can be registered later with
## another call to this proc for the same gossipsub topic).
if node.wakuRelay.isNil():
if isNil(node.wakuRelay):
error "Invalid API call to `subscribe`. WakuRelay not mounted."
return err("Invalid API call to `subscribe`. WakuRelay not mounted.")
@ -124,13 +153,15 @@ proc subscribe*(
error "Failed to decode subscription event", error = error
return err("Failed to decode subscription event: " & error)
if node.wakuRelay.isSubscribed(pubsubTopic):
warn "No-effect API call to subscribe. Already subscribed to topic", pubsubTopic
return ok()
info "subscribe", pubsubTopic, contentTopicOp
node.registerRelayHandler(pubsubTopic, handler)
node.topicSubscriptionQueue.emit((kind: PubsubSub, topic: pubsubTopic))
if node.registerRelayHandler(pubsubTopic, handler):
info "subscribe", pubsubTopic, contentTopicOp
node.topicSubscriptionQueue.emit((kind: PubsubSub, topic: pubsubTopic))
else:
if isNil(handler):
warn "No-effect API call to subscribe. Already subscribed to topic", pubsubTopic
else:
info "subscribe (was already subscribed in the mesh; appHandler set)",
pubsubTopic = pubsubTopic
return ok()
@ -138,8 +169,10 @@ proc unsubscribe*(
node: WakuNode, subscription: SubscriptionEvent
): Result[void, string] =
## Unsubscribes from a specific PubSub or Content topic.
## This will both unsubscribe from the relay mesh and remove the app handler, if any.
## NOTE: This works because using MAPI and Kernel API at the same time is unsupported.
if node.wakuRelay.isNil():
if isNil(node.wakuRelay):
error "Invalid API call to `unsubscribe`. WakuRelay not mounted."
return err("Invalid API call to `unsubscribe`. WakuRelay not mounted.")
@ -147,13 +180,20 @@ proc unsubscribe*(
error "Failed to decode unsubscribe event", error = error
return err("Failed to decode unsubscribe event: " & error)
if not node.wakuRelay.isSubscribed(pubsubTopic):
warn "No-effect API call to `unsubscribe`. Was not subscribed", pubsubTopic
return ok()
let hadHandler = node.legacyAppHandlers.hasKey(pubsubTopic)
if hadHandler:
node.legacyAppHandlers.del(pubsubTopic)
info "unsubscribe", pubsubTopic, contentTopicOp
node.wakuRelay.unsubscribe(pubsubTopic)
node.topicSubscriptionQueue.emit((kind: PubsubUnsub, topic: pubsubTopic))
if node.wakuRelay.isSubscribed(pubsubTopic):
info "unsubscribe", pubsubTopic, contentTopicOp
node.wakuRelay.unsubscribe(pubsubTopic)
node.topicSubscriptionQueue.emit((kind: PubsubUnsub, topic: pubsubTopic))
else:
if not hadHandler:
warn "No-effect API call to `unsubscribe`. Was not subscribed", pubsubTopic
else:
info "unsubscribe (was not subscribed in the mesh; appHandler removed)",
pubsubTopic = pubsubTopic
return ok()

View File

@ -68,6 +68,7 @@ import
],
./net_config,
./peer_manager,
./edge_driver,
./health_monitor/health_status,
./health_monitor/topic_health
@ -113,6 +114,7 @@ type
WakuNode* = ref object
peerManager*: PeerManager
switch*: Switch
edgeDriver*: EdgeDriver
wakuRelay*: WakuRelay
wakuArchive*: waku_archive.WakuArchive
wakuLegacyArchive*: waku_archive_legacy.WakuArchive
@ -144,6 +146,8 @@ type
started*: bool # Indicates that node has started listening
topicSubscriptionQueue*: AsyncEventQueue[SubscriptionEvent]
rateLimitSettings*: ProtocolRateLimitSettings
legacyAppHandlers*: Table[PubsubTopic, WakuRelayHandler]
## Kernel API Relay appHandlers (if any)
wakuMix*: WakuMix
edgeTopicsHealth*: Table[PubsubTopic, TopicHealth]
edgeHealthEvent*: AsyncEvent
@ -227,6 +231,7 @@ proc new*(
let node = WakuNode(
peerManager: peerManager,
switch: switch,
edgeDriver: EdgeDriver.new(),
rng: rng,
brokerCtx: brokerCtx,
enr: enr,