Implement stateful SubscriptionService for Core mode (#3732)

* SubscriptionManager tracks shard and content topic interest
* RecvService emits MessageReceivedEvent on subscribed content topics
* Route MAPI through old Kernel API relay unique-handler infra to avoid code duplication
* Encode current gen-zero network policy: on Core node boot, subscribe to all pubsub topics (all shards)
* Add test_api_subscriptions.nim (basic relay/core testing only)
* Removed any MAPI Edge sub/unsub/receive support code that was there (will add in next PR)
* Hook MessageSeenEvent to Kernel API bus
* Fix MAPI vs Kernel API unique relay handler support
* RecvService delegating topic subs to SubscriptionManager
* RecvService emits MessageReceivedEvent (fully filtered)
* Rename old SubscriptionManager to LegacySubscriptionManager
This commit is contained in:
Fabiana Cecin 2026-03-02 14:52:36 -03:00 committed by GitHub
parent ba85873f03
commit 51ec09c39d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 735 additions and 187 deletions

View File

@ -1,3 +1,8 @@
{.used.}
import ./test_entry_nodes, ./test_node_conf, ./test_api_send, ./test_api_health
import
./test_entry_nodes,
./test_node_conf,
./test_api_send,
./test_api_subscription,
./test_api_health

View File

@ -0,0 +1,399 @@
{.used.}
import std/[strutils, net, options, sets]
import chronos, testutils/unittests, stew/byteutils
import libp2p/[peerid, peerinfo, multiaddress, crypto/crypto]
import ../testlib/[common, wakucore, wakunode, testasync]
import
waku,
waku/[
waku_node,
waku_core,
common/broker/broker_context,
events/message_events,
waku_relay/protocol,
]
import waku/api/api_conf, waku/factory/waku_conf
# TODO: Edge testing (after MAPI edge support is completed)
const TestTimeout = chronos.seconds(10)
const NegativeTestTimeout = chronos.seconds(2)
type ReceiveEventListenerManager = ref object
brokerCtx: BrokerContext
receivedListener: MessageReceivedEventListener
receivedEvent: AsyncEvent
receivedMessages: seq[WakuMessage]
targetCount: int
proc newReceiveEventListenerManager(
brokerCtx: BrokerContext, expectedCount: int = 1
): ReceiveEventListenerManager =
let manager = ReceiveEventListenerManager(
brokerCtx: brokerCtx, receivedMessages: @[], targetCount: expectedCount
)
manager.receivedEvent = newAsyncEvent()
manager.receivedListener = MessageReceivedEvent
.listen(
brokerCtx,
proc(event: MessageReceivedEvent) {.async: (raises: []).} =
manager.receivedMessages.add(event.message)
if manager.receivedMessages.len >= manager.targetCount:
manager.receivedEvent.fire()
,
)
.expect("Failed to listen to MessageReceivedEvent")
return manager
proc teardown(manager: ReceiveEventListenerManager) =
MessageReceivedEvent.dropListener(manager.brokerCtx, manager.receivedListener)
proc waitForEvents(
manager: ReceiveEventListenerManager, timeout: Duration
): Future[bool] {.async.} =
return await manager.receivedEvent.wait().withTimeout(timeout)
type TestNetwork = ref object
publisher: WakuNode
subscriber: Waku
publisherPeerInfo: RemotePeerInfo
proc createApiNodeConf(
mode: WakuMode = WakuMode.Core, numShards: uint16 = 1
): NodeConfig =
let netConf = NetworkingConfig(listenIpv4: "0.0.0.0", p2pTcpPort: 0, discv5UdpPort: 0)
result = NodeConfig.init(
mode = mode,
protocolsConfig = ProtocolsConfig.init(
entryNodes = @[],
clusterId = 1,
autoShardingConfig = AutoShardingConfig(numShardsInCluster: numShards),
),
networkingConfig = netConf,
p2pReliability = true,
)
proc setupSubscriberNode(conf: NodeConfig): Future[Waku] {.async.} =
var node: Waku
lockNewGlobalBrokerContext:
node = (await createNode(conf)).expect("Failed to create subscriber node")
(await startWaku(addr node)).expect("Failed to start subscriber node")
return node
proc setupNetwork(
numShards: uint16 = 1, mode: WakuMode = WakuMode.Core
): Future[TestNetwork] {.async.} =
var net = TestNetwork()
lockNewGlobalBrokerContext:
net.publisher =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
net.publisher.mountMetadata(1, @[0'u16]).expect("Failed to mount metadata")
(await net.publisher.mountRelay()).expect("Failed to mount relay")
await net.publisher.mountLibp2pPing()
await net.publisher.start()
net.publisherPeerInfo = net.publisher.peerInfo.toRemotePeerInfo()
proc dummyHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
discard
# Subscribe the publisher to all shards to guarantee a GossipSub mesh with the subscriber.
# Currently, Core/Relay nodes auto-subscribe to all network shards on boot, but if
# that changes, this will be needed to cause the publisher to have shard interest
# for any shards the subscriber may want to use, which is required for waitForMesh to work.
for i in 0 ..< numShards.int:
let shard = PubsubTopic("/waku/2/rs/1/" & $i)
net.publisher.subscribe((kind: PubsubSub, topic: shard), dummyHandler).expect(
"Failed to sub publisher"
)
net.subscriber = await setupSubscriberNode(createApiNodeConf(mode, numShards))
await net.subscriber.node.connectToNodes(@[net.publisherPeerInfo])
return net
proc teardown(net: TestNetwork) {.async.} =
if not isNil(net.subscriber):
(await net.subscriber.stop()).expect("Failed to stop subscriber node")
net.subscriber = nil
if not isNil(net.publisher):
await net.publisher.stop()
net.publisher = nil
proc getRelayShard(node: WakuNode, contentTopic: ContentTopic): PubsubTopic =
let autoSharding = node.wakuAutoSharding.get()
let shardObj = autoSharding.getShard(contentTopic).expect("Failed to get shard")
return PubsubTopic($shardObj)
proc waitForMesh(node: WakuNode, shard: PubsubTopic) {.async.} =
for _ in 0 ..< 50:
if node.wakuRelay.getNumPeersInMesh(shard).valueOr(0) > 0:
return
await sleepAsync(100.milliseconds)
raise newException(ValueError, "GossipSub Mesh failed to stabilize on " & shard)
proc publishToMesh(
net: TestNetwork, contentTopic: ContentTopic, payload: seq[byte]
): Future[Result[int, string]] {.async.} =
let shard = net.subscriber.node.getRelayShard(contentTopic)
await waitForMesh(net.publisher, shard)
let msg = WakuMessage(
payload: payload, contentTopic: contentTopic, version: 0, timestamp: now()
)
return await net.publisher.publish(some(shard), msg)
suite "Messaging API, SubscriptionManager":
asyncTest "Subscription API, relay node auto subscribe and receive message":
let net = await setupNetwork(1)
defer:
await net.teardown()
let testTopic = ContentTopic("/waku/2/test-content/proto")
(await net.subscriber.subscribe(testTopic)).expect(
"subscriberNode failed to subscribe"
)
let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1)
defer:
eventManager.teardown()
discard (await net.publishToMesh(testTopic, "Hello, world!".toBytes())).expect(
"Publish failed"
)
require await eventManager.waitForEvents(TestTimeout)
require eventManager.receivedMessages.len == 1
check eventManager.receivedMessages[0].contentTopic == testTopic
asyncTest "Subscription API, relay node ignores unsubscribed content topics on same shard":
let net = await setupNetwork(1)
defer:
await net.teardown()
let subbedTopic = ContentTopic("/waku/2/subbed-topic/proto")
let ignoredTopic = ContentTopic("/waku/2/ignored-topic/proto")
(await net.subscriber.subscribe(subbedTopic)).expect("failed to subscribe")
let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1)
defer:
eventManager.teardown()
discard (await net.publishToMesh(ignoredTopic, "Ghost Msg".toBytes())).expect(
"Publish failed"
)
check not await eventManager.waitForEvents(NegativeTestTimeout)
check eventManager.receivedMessages.len == 0
asyncTest "Subscription API, relay node unsubscribe stops message receipt":
let net = await setupNetwork(1)
defer:
await net.teardown()
let testTopic = ContentTopic("/waku/2/unsub-test/proto")
(await net.subscriber.subscribe(testTopic)).expect("failed to subscribe")
net.subscriber.unsubscribe(testTopic).expect("failed to unsubscribe")
let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1)
defer:
eventManager.teardown()
discard (await net.publishToMesh(testTopic, "Should be dropped".toBytes())).expect(
"Publish failed"
)
check not await eventManager.waitForEvents(NegativeTestTimeout)
check eventManager.receivedMessages.len == 0
asyncTest "Subscription API, overlapping topics on same shard maintain correct isolation":
let net = await setupNetwork(1)
defer:
await net.teardown()
let topicA = ContentTopic("/waku/2/topic-a/proto")
let topicB = ContentTopic("/waku/2/topic-b/proto")
(await net.subscriber.subscribe(topicA)).expect("failed to sub A")
(await net.subscriber.subscribe(topicB)).expect("failed to sub B")
let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1)
defer:
eventManager.teardown()
net.subscriber.unsubscribe(topicA).expect("failed to unsub A")
discard (await net.publishToMesh(topicA, "Dropped Message".toBytes())).expect(
"Publish A failed"
)
discard
(await net.publishToMesh(topicB, "Kept Msg".toBytes())).expect("Publish B failed")
require await eventManager.waitForEvents(TestTimeout)
require eventManager.receivedMessages.len == 1
check eventManager.receivedMessages[0].contentTopic == topicB
asyncTest "Subscription API, redundant subs tolerated and subs are removed":
let net = await setupNetwork(1)
defer:
await net.teardown()
let glitchTopic = ContentTopic("/waku/2/glitch/proto")
(await net.subscriber.subscribe(glitchTopic)).expect("failed to sub")
(await net.subscriber.subscribe(glitchTopic)).expect("failed to double sub")
net.subscriber.unsubscribe(glitchTopic).expect("failed to unsub")
let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1)
defer:
eventManager.teardown()
discard (await net.publishToMesh(glitchTopic, "Ghost Msg".toBytes())).expect(
"Publish failed"
)
check not await eventManager.waitForEvents(NegativeTestTimeout)
check eventManager.receivedMessages.len == 0
asyncTest "Subscription API, resubscribe to an unsubscribed topic":
let net = await setupNetwork(1)
defer:
await net.teardown()
let testTopic = ContentTopic("/waku/2/resub-test/proto")
# Subscribe
(await net.subscriber.subscribe(testTopic)).expect("Initial sub failed")
var eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1)
discard
(await net.publishToMesh(testTopic, "Msg 1".toBytes())).expect("Pub 1 failed")
require await eventManager.waitForEvents(TestTimeout)
eventManager.teardown()
# Unsubscribe and verify teardown
net.subscriber.unsubscribe(testTopic).expect("Unsub failed")
eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1)
discard
(await net.publishToMesh(testTopic, "Ghost".toBytes())).expect("Ghost pub failed")
check not await eventManager.waitForEvents(NegativeTestTimeout)
eventManager.teardown()
# Resubscribe
(await net.subscriber.subscribe(testTopic)).expect("Resub failed")
eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1)
discard
(await net.publishToMesh(testTopic, "Msg 2".toBytes())).expect("Pub 2 failed")
require await eventManager.waitForEvents(TestTimeout)
check eventManager.receivedMessages[0].payload == "Msg 2".toBytes()
asyncTest "Subscription API, two content topics in different shards":
let net = await setupNetwork(8)
defer:
await net.teardown()
var topicA = ContentTopic("/appA/2/shard-test-a/proto")
var topicB = ContentTopic("/appB/2/shard-test-b/proto")
# generate two content topics that land in two different shards
var i = 0
while net.subscriber.node.getRelayShard(topicA) ==
net.subscriber.node.getRelayShard(topicB):
topicB = ContentTopic("/appB" & $i & "/2/shard-test-b/proto")
inc i
(await net.subscriber.subscribe(topicA)).expect("failed to sub A")
(await net.subscriber.subscribe(topicB)).expect("failed to sub B")
let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 2)
defer:
eventManager.teardown()
discard (await net.publishToMesh(topicA, "Msg on Shard A".toBytes())).expect(
"Publish A failed"
)
discard (await net.publishToMesh(topicB, "Msg on Shard B".toBytes())).expect(
"Publish B failed"
)
require await eventManager.waitForEvents(TestTimeout)
require eventManager.receivedMessages.len == 2
asyncTest "Subscription API, many content topics in many shards":
let net = await setupNetwork(8)
defer:
await net.teardown()
var allTopics: seq[ContentTopic]
for i in 0 ..< 100:
allTopics.add(ContentTopic("/stress-app-" & $i & "/2/state-test/proto"))
var activeSubs: seq[ContentTopic]
proc verifyNetworkState(expected: seq[ContentTopic]) {.async.} =
let eventManager =
newReceiveEventListenerManager(net.subscriber.brokerCtx, expected.len)
for topic in allTopics:
discard (await net.publishToMesh(topic, "Stress Payload".toBytes())).expect(
"publish failed"
)
require await eventManager.waitForEvents(TestTimeout)
# here we just give a chance for any messages that we don't expect to arrive
await sleepAsync(1.seconds)
eventManager.teardown()
# weak check (but catches most bugs)
require eventManager.receivedMessages.len == expected.len
# strict expected receipt test
var receivedTopics = initHashSet[ContentTopic]()
for msg in eventManager.receivedMessages:
receivedTopics.incl(msg.contentTopic)
var expectedTopics = initHashSet[ContentTopic]()
for t in expected:
expectedTopics.incl(t)
check receivedTopics == expectedTopics
# subscribe to all content topics we generated
for t in allTopics:
(await net.subscriber.subscribe(t)).expect("sub failed")
activeSubs.add(t)
await verifyNetworkState(activeSubs)
# unsubscribe from some content topics
for i in 0 ..< 50:
let t = allTopics[i]
net.subscriber.unsubscribe(t).expect("unsub failed")
let idx = activeSubs.find(t)
if idx >= 0:
activeSubs.del(idx)
await verifyNetworkState(activeSubs)
# re-subscribe to some content topics
for i in 0 ..< 25:
let t = allTopics[i]
(await net.subscriber.subscribe(t)).expect("resub failed")
activeSubs.add(t)
await verifyNetworkState(activeSubs)

View File

@ -374,6 +374,12 @@ procSuite "WakuNode - Store":
waitFor allFutures(client.stop(), server.stop())
test "Store protocol queries overrun request rate limitation":
when defined(macosx):
# on macos CI, this test is resulting a code 200 (OK) instead of a 429 error
# means the runner is somehow too slow to cause a request limit failure
skip()
return
## Setup
let
serverKey = generateSecp256k1Key()

View File

@ -3,7 +3,7 @@ import chronicles, chronos, results, std/strutils
import waku/factory/waku
import waku/[requests/health_requests, waku_core, waku_node]
import waku/node/delivery_service/send_service
import waku/node/delivery_service/subscription_service
import waku/node/delivery_service/subscription_manager
import libp2p/peerid
import ./[api_conf, types]
@ -36,18 +36,27 @@ proc subscribe*(
): Future[Result[void, string]] {.async.} =
?checkApiAvailability(w)
return w.deliveryService.subscriptionService.subscribe(contentTopic)
return w.deliveryService.subscriptionManager.subscribe(contentTopic)
proc unsubscribe*(w: Waku, contentTopic: ContentTopic): Result[void, string] =
?checkApiAvailability(w)
return w.deliveryService.subscriptionService.unsubscribe(contentTopic)
return w.deliveryService.subscriptionManager.unsubscribe(contentTopic)
proc send*(
w: Waku, envelope: MessageEnvelope
): Future[Result[RequestId, string]] {.async.} =
?checkApiAvailability(w)
let isSubbed = w.deliveryService.subscriptionManager
.isSubscribed(envelope.contentTopic)
.valueOr(false)
if not isSubbed:
info "Auto-subscribing to topic on send", contentTopic = envelope.contentTopic
w.deliveryService.subscriptionManager.subscribe(envelope.contentTopic).isOkOr:
warn "Failed to auto-subscribe", error = error
return err("Failed to auto-subscribe before sending: " & error)
let requestId = RequestId.new(w.rng)
let deliveryTask = DeliveryTask.new(requestId, envelope, w.brokerCtx).valueOr:

View File

@ -1,6 +1,4 @@
import waku/common/broker/event_broker
import waku/api/types
import waku/waku_core/message
import waku/[api/types, waku_core/message, waku_core/topics, common/broker/event_broker]
export types
@ -28,3 +26,9 @@ EventBroker:
type MessageReceivedEvent* = object
messageHash*: string
message*: WakuMessage
EventBroker:
# Internal event emitted when a message arrives from the network via any protocol
type MessageSeenEvent* = object
topic*: PubsubTopic
message*: WakuMessage

View File

@ -35,6 +35,7 @@ import
node/health_monitor,
node/waku_metrics,
node/delivery_service/delivery_service,
node/delivery_service/subscription_manager,
rest_api/message_cache,
rest_api/endpoint/server,
rest_api/endpoint/builder as rest_server_builder,
@ -453,7 +454,7 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises:
).isOkOr:
error "Failed to set RequestProtocolHealth provider", error = error
## Setup RequestHealthReport provider (The lost child)
## Setup RequestHealthReport provider
RequestHealthReport.setProvider(
globalBrokerContext(),
@ -514,6 +515,10 @@ proc stop*(waku: Waku): Future[Result[void, string]] {.async: (raises: []).} =
if not waku.wakuDiscv5.isNil():
await waku.wakuDiscv5.stop()
if not waku.deliveryService.isNil():
await waku.deliveryService.stopDeliveryService()
waku.deliveryService = nil
if not waku.node.isNil():
await waku.node.stop()

View File

@ -5,7 +5,7 @@ import chronos
import
./recv_service,
./send_service,
./subscription_service,
./subscription_manager,
waku/[
waku_core,
waku_node,
@ -18,29 +18,31 @@ import
type DeliveryService* = ref object
sendService*: SendService
recvService: RecvService
subscriptionService*: SubscriptionService
subscriptionManager*: SubscriptionManager
proc new*(
T: type DeliveryService, useP2PReliability: bool, w: WakuNode
): Result[T, string] =
## storeClient is needed to give store visitility to DeliveryService
## wakuRelay and wakuLightpushClient are needed to give a mechanism to SendService to re-publish
let subscriptionService = SubscriptionService.new(w)
let sendService = ?SendService.new(useP2PReliability, w, subscriptionService)
let recvService = RecvService.new(w, subscriptionService)
let subscriptionManager = SubscriptionManager.new(w)
let sendService = ?SendService.new(useP2PReliability, w, subscriptionManager)
let recvService = RecvService.new(w, subscriptionManager)
return ok(
DeliveryService(
sendService: sendService,
recvService: recvService,
subscriptionService: subscriptionService,
subscriptionManager: subscriptionManager,
)
)
proc startDeliveryService*(self: DeliveryService) =
self.sendService.startSendService()
self.subscriptionManager.startSubscriptionManager()
self.recvService.startRecvService()
self.sendService.startSendService()
proc stopDeliveryService*(self: DeliveryService) {.async.} =
self.sendService.stopSendService()
await self.sendService.stopSendService()
await self.recvService.stopRecvService()
await self.subscriptionManager.stopSubscriptionManager()

View File

@ -2,9 +2,9 @@
## receive and is backed by store-v3 requests to get an additional degree of certainty
##
import std/[tables, sequtils, options]
import std/[tables, sequtils, options, sets]
import chronos, chronicles, libp2p/utility
import ../[subscription_service]
import ../[subscription_manager]
import
waku/[
waku_core,
@ -13,6 +13,7 @@ import
waku_filter_v2/client,
waku_core/topics,
events/delivery_events,
events/message_events,
waku_node,
common/broker/broker_context,
]
@ -35,14 +36,9 @@ type RecvMessage = object
type RecvService* = ref object of RootObj
brokerCtx: BrokerContext
topicsInterest: Table[PubsubTopic, seq[ContentTopic]]
## Tracks message verification requests and when was the last time a
## pubsub topic was verified for missing messages
## The key contains pubsub-topics
node: WakuNode
onSubscribeListener: OnFilterSubscribeEventListener
onUnsubscribeListener: OnFilterUnsubscribeEventListener
subscriptionService: SubscriptionService
seenMsgListener: MessageSeenEventListener
subscriptionManager: SubscriptionManager
recentReceivedMsgs: seq[RecvMessage]
@ -95,20 +91,20 @@ proc msgChecker(self: RecvService) {.async.} =
self.endTimeToCheck = getNowInNanosecondTime()
var msgHashesInStore = newSeq[WakuMessageHash](0)
for pubsubTopic, cTopics in self.topicsInterest.pairs:
for sub in self.subscriptionManager.getActiveSubscriptions():
let storeResp: StoreQueryResponse = (
await self.node.wakuStoreClient.queryToAny(
StoreQueryRequest(
includeData: false,
pubsubTopic: some(PubsubTopic(pubsubTopic)),
contentTopics: cTopics,
pubsubTopic: some(PubsubTopic(sub.pubsubTopic)),
contentTopics: sub.contentTopics,
startTime: some(self.startTimeToCheck - DelayExtra.nanos),
endTime: some(self.endTimeToCheck + DelayExtra.nanos),
)
)
).valueOr:
error "msgChecker failed to get remote msgHashes",
pubsubTopic, cTopics, error = $error
pubsubTopic = sub.pubsubTopic, cTopics = sub.contentTopics, error = $error
continue
msgHashesInStore.add(storeResp.messages.mapIt(it.messageHash))
@ -133,31 +129,20 @@ proc msgChecker(self: RecvService) {.async.} =
## update next check times
self.startTimeToCheck = self.endTimeToCheck
proc onSubscribe(
self: RecvService, pubsubTopic: string, contentTopics: seq[string]
) {.gcsafe, raises: [].} =
info "onSubscribe", pubsubTopic, contentTopics
self.topicsInterest.withValue(pubsubTopic, contentTopicsOfInterest):
contentTopicsOfInterest[].add(contentTopics)
do:
self.topicsInterest[pubsubTopic] = contentTopics
proc processIncomingMessageOfInterest(
self: RecvService, pubsubTopic: string, message: WakuMessage
) =
## Resolve an incoming network message that was already filtered by topic.
## Deduplicate (by hash), store (saves in recently-seen messages) and emit
## the MAPI MessageReceivedEvent for every unique incoming message.
proc onUnsubscribe(
self: RecvService, pubsubTopic: string, contentTopics: seq[string]
) {.gcsafe, raises: [].} =
info "onUnsubscribe", pubsubTopic, contentTopics
let msgHash = computeMessageHash(pubsubTopic, message)
if not self.recentReceivedMsgs.anyIt(it.msgHash == msgHash):
let rxMsg = RecvMessage(msgHash: msgHash, rxTime: message.timestamp)
self.recentReceivedMsgs.add(rxMsg)
MessageReceivedEvent.emit(self.brokerCtx, msgHash.to0xHex(), message)
self.topicsInterest.withValue(pubsubTopic, contentTopicsOfInterest):
let remainingCTopics =
contentTopicsOfInterest[].filterIt(not contentTopics.contains(it))
contentTopicsOfInterest[] = remainingCTopics
if remainingCTopics.len == 0:
self.topicsInterest.del(pubsubTopic)
do:
error "onUnsubscribe unsubscribing from wrong topic", pubsubTopic, contentTopics
proc new*(T: typedesc[RecvService], node: WakuNode, s: SubscriptionService): T =
proc new*(T: typedesc[RecvService], node: WakuNode, s: SubscriptionManager): T =
## The storeClient will help to acquire any possible missed messages
let now = getNowInNanosecondTime()
@ -165,22 +150,13 @@ proc new*(T: typedesc[RecvService], node: WakuNode, s: SubscriptionService): T =
node: node,
startTimeToCheck: now,
brokerCtx: node.brokerCtx,
subscriptionService: s,
topicsInterest: initTable[PubsubTopic, seq[ContentTopic]](),
subscriptionManager: s,
recentReceivedMsgs: @[],
)
if not node.wakuFilterClient.isNil():
let filterPushHandler = proc(
pubsubTopic: PubsubTopic, message: WakuMessage
) {.async, closure.} =
## Captures all the messages received through filter
let msgHash = computeMessageHash(pubSubTopic, message)
let rxMsg = RecvMessage(msgHash: msgHash, rxTime: message.timestamp)
recvService.recentReceivedMsgs.add(rxMsg)
node.wakuFilterClient.registerPushHandler(filterPushHandler)
# TODO: For MAPI Edge support, either call node.wakuFilterClient.registerPushHandler
# so that the RecvService listens to incoming filter messages,
# or have the filter client emit MessageSeenEvent.
return recvService
@ -194,26 +170,26 @@ proc startRecvService*(self: RecvService) =
self.msgCheckerHandler = self.msgChecker()
self.msgPrunerHandler = self.loopPruneOldMessages()
self.onSubscribeListener = OnFilterSubscribeEvent.listen(
self.seenMsgListener = MessageSeenEvent.listen(
self.brokerCtx,
proc(subsEv: OnFilterSubscribeEvent) {.async: (raises: []).} =
self.onSubscribe(subsEv.pubsubTopic, subsEv.contentTopics),
).valueOr:
error "Failed to set OnFilterSubscribeEvent listener", error = error
quit(QuitFailure)
proc(event: MessageSeenEvent) {.async: (raises: []).} =
if not self.subscriptionManager.isSubscribed(
event.topic, event.message.contentTopic
):
trace "skipping message as I am not subscribed",
shard = event.topic, contenttopic = event.message.contentTopic
return
self.onUnsubscribeListener = OnFilterUnsubscribeEvent.listen(
self.brokerCtx,
proc(subsEv: OnFilterUnsubscribeEvent) {.async: (raises: []).} =
self.onUnsubscribe(subsEv.pubsubTopic, subsEv.contentTopics),
self.processIncomingMessageOfInterest(event.topic, event.message),
).valueOr:
error "Failed to set OnFilterUnsubscribeEvent listener", error = error
error "Failed to set MessageSeenEvent listener", error = error
quit(QuitFailure)
proc stopRecvService*(self: RecvService) {.async.} =
OnFilterSubscribeEvent.dropListener(self.brokerCtx, self.onSubscribeListener)
OnFilterUnsubscribeEvent.dropListener(self.brokerCtx, self.onUnsubscribeListener)
MessageSeenEvent.dropListener(self.brokerCtx, self.seenMsgListener)
if not self.msgCheckerHandler.isNil():
await self.msgCheckerHandler.cancelAndWait()
self.msgCheckerHandler = nil
if not self.msgPrunerHandler.isNil():
await self.msgPrunerHandler.cancelAndWait()
self.msgPrunerHandler = nil

View File

@ -5,7 +5,7 @@ import std/[sequtils, tables, options]
import chronos, chronicles, libp2p/utility
import
./[send_processor, relay_processor, lightpush_processor, delivery_task],
../[subscription_service],
../[subscription_manager],
waku/[
waku_core,
node/waku_node,
@ -58,7 +58,7 @@ type SendService* = ref object of RootObj
node: WakuNode
checkStoreForMessages: bool
subscriptionService: SubscriptionService
subscriptionManager: SubscriptionManager
proc setupSendProcessorChain(
peerManager: PeerManager,
@ -99,7 +99,7 @@ proc new*(
T: typedesc[SendService],
preferP2PReliability: bool,
w: WakuNode,
s: SubscriptionService,
s: SubscriptionManager,
): Result[T, string] =
if w.wakuRelay.isNil() and w.wakuLightpushClient.isNil():
return err(
@ -120,7 +120,7 @@ proc new*(
sendProcessor: sendProcessorChain,
node: w,
checkStoreForMessages: checkStoreForMessages,
subscriptionService: s,
subscriptionManager: s,
)
return ok(sendService)
@ -250,9 +250,9 @@ proc serviceLoop(self: SendService) {.async.} =
proc startSendService*(self: SendService) =
self.serviceLoopHandle = self.serviceLoop()
proc stopSendService*(self: SendService) =
proc stopSendService*(self: SendService) {.async.} =
if not self.serviceLoopHandle.isNil():
discard self.serviceLoopHandle.cancelAndWait()
await self.serviceLoopHandle.cancelAndWait()
proc send*(self: SendService, task: DeliveryTask) {.async.} =
assert(not task.isNil(), "task for send must not be nil")
@ -260,7 +260,7 @@ proc send*(self: SendService, task: DeliveryTask) {.async.} =
info "SendService.send: processing delivery task",
requestId = task.requestId, msgHash = task.msgHash.to0xHex()
self.subscriptionService.subscribe(task.msg.contentTopic).isOkOr:
self.subscriptionManager.subscribe(task.msg.contentTopic).isOkOr:
error "SendService.send: failed to subscribe to content topic",
contentTopic = task.msg.contentTopic, error = error

View File

@ -0,0 +1,164 @@
import std/[sets, tables, options, strutils], chronos, chronicles, results
import
waku/[
waku_core,
waku_core/topics,
waku_core/topics/sharding,
waku_node,
waku_relay,
common/broker/broker_context,
events/delivery_events,
]
type SubscriptionManager* = ref object of RootObj
node: WakuNode
contentTopicSubs: Table[PubsubTopic, HashSet[ContentTopic]]
## Map of Shard to ContentTopic needed because e.g. WakuRelay is PubsubTopic only.
## A present key with an empty HashSet value means pubsubtopic already subscribed
## (via subscribePubsubTopics()) but there's no specific content topic interest yet.
proc new*(T: typedesc[SubscriptionManager], node: WakuNode): T =
SubscriptionManager(
node: node, contentTopicSubs: initTable[PubsubTopic, HashSet[ContentTopic]]()
)
proc addContentTopicInterest(
self: SubscriptionManager, shard: PubsubTopic, topic: ContentTopic
): Result[void, string] =
if not self.contentTopicSubs.hasKey(shard):
self.contentTopicSubs[shard] = initHashSet[ContentTopic]()
self.contentTopicSubs.withValue(shard, cTopics):
if not cTopics[].contains(topic):
cTopics[].incl(topic)
# TODO: Call a "subscribe(shard, topic)" on filter client here,
# so the filter client can know that subscriptions changed.
return ok()
proc removeContentTopicInterest(
self: SubscriptionManager, shard: PubsubTopic, topic: ContentTopic
): Result[void, string] =
self.contentTopicSubs.withValue(shard, cTopics):
if cTopics[].contains(topic):
cTopics[].excl(topic)
if cTopics[].len == 0 and isNil(self.node.wakuRelay):
self.contentTopicSubs.del(shard) # We're done with cTopics here
# TODO: Call a "unsubscribe(shard, topic)" on filter client here,
# so the filter client can know that subscriptions changed.
return ok()
proc subscribePubsubTopics(
self: SubscriptionManager, shards: seq[PubsubTopic]
): Result[void, string] =
if isNil(self.node.wakuRelay):
return err("subscribePubsubTopics requires a Relay")
var errors: seq[string] = @[]
for shard in shards:
if not self.contentTopicSubs.hasKey(shard):
self.node.subscribe((kind: PubsubSub, topic: shard), nil).isOkOr:
errors.add("shard " & shard & ": " & error)
continue
self.contentTopicSubs[shard] = initHashSet[ContentTopic]()
if errors.len > 0:
return err("subscribeShard errors: " & errors.join("; "))
return ok()
proc startSubscriptionManager*(self: SubscriptionManager) =
if isNil(self.node.wakuRelay):
return
if self.node.wakuAutoSharding.isSome():
# Subscribe relay to all shards in autosharding.
let autoSharding = self.node.wakuAutoSharding.get()
let clusterId = autoSharding.clusterId
let numShards = autoSharding.shardCountGenZero
if numShards > 0:
var clusterPubsubTopics = newSeqOfCap[PubsubTopic](numShards)
for i in 0 ..< numShards:
let shardObj = RelayShard(clusterId: clusterId, shardId: uint16(i))
clusterPubsubTopics.add(PubsubTopic($shardObj))
self.subscribePubsubTopics(clusterPubsubTopics).isOkOr:
error "Failed to auto-subscribe Relay to cluster shards: ", error = error
else:
info "SubscriptionManager has no AutoSharding configured; skipping auto-subscribe."
proc stopSubscriptionManager*(self: SubscriptionManager) {.async.} =
discard
proc getActiveSubscriptions*(
self: SubscriptionManager
): seq[tuple[pubsubTopic: string, contentTopics: seq[ContentTopic]]] =
var activeSubs: seq[tuple[pubsubTopic: string, contentTopics: seq[ContentTopic]]] =
@[]
for pubsub, cTopicSet in self.contentTopicSubs.pairs:
if cTopicSet.len > 0:
var cTopicSeq = newSeqOfCap[ContentTopic](cTopicSet.len)
for t in cTopicSet:
cTopicSeq.add(t)
activeSubs.add((pubsub, cTopicSeq))
return activeSubs
proc getShardForContentTopic(
self: SubscriptionManager, topic: ContentTopic
): Result[PubsubTopic, string] =
if self.node.wakuAutoSharding.isSome():
let shardObj = ?self.node.wakuAutoSharding.get().getShard(topic)
return ok($shardObj)
return err("SubscriptionManager requires AutoSharding")
proc isSubscribed*(
self: SubscriptionManager, topic: ContentTopic
): Result[bool, string] =
let shard = ?self.getShardForContentTopic(topic)
return ok(
self.contentTopicSubs.hasKey(shard) and self.contentTopicSubs[shard].contains(topic)
)
proc isSubscribed*(
self: SubscriptionManager, shard: PubsubTopic, contentTopic: ContentTopic
): bool {.raises: [].} =
self.contentTopicSubs.withValue(shard, cTopics):
return cTopics[].contains(contentTopic)
return false
proc subscribe*(self: SubscriptionManager, topic: ContentTopic): Result[void, string] =
if isNil(self.node.wakuRelay) and isNil(self.node.wakuFilterClient):
return err("SubscriptionManager requires either Relay or Filter Client.")
let shard = ?self.getShardForContentTopic(topic)
if not isNil(self.node.wakuRelay) and not self.contentTopicSubs.hasKey(shard):
?self.subscribePubsubTopics(@[shard])
?self.addContentTopicInterest(shard, topic)
return ok()
proc unsubscribe*(
self: SubscriptionManager, topic: ContentTopic
): Result[void, string] =
if isNil(self.node.wakuRelay) and isNil(self.node.wakuFilterClient):
return err("SubscriptionManager requires either Relay or Filter Client.")
let shard = ?self.getShardForContentTopic(topic)
if self.isSubscribed(shard, topic):
?self.removeContentTopicInterest(shard, topic)
return ok()

View File

@ -1,64 +0,0 @@
import chronos, chronicles
import
waku/[
waku_core,
waku_core/topics,
events/message_events,
waku_node,
common/broker/broker_context,
]
type SubscriptionService* = ref object of RootObj
brokerCtx: BrokerContext
node: WakuNode
proc new*(T: typedesc[SubscriptionService], node: WakuNode): T =
## The storeClient will help to acquire any possible missed messages
return SubscriptionService(brokerCtx: node.brokerCtx, node: node)
proc isSubscribed*(
self: SubscriptionService, topic: ContentTopic
): Result[bool, string] =
var isSubscribed = false
if self.node.wakuRelay.isNil() == false:
return self.node.isSubscribed((kind: ContentSub, topic: topic))
# TODO: Add support for edge mode with Filter subscription management
return ok(isSubscribed)
#TODO: later PR may consider to refactor or place this function elsewhere
# The only important part is that it emits MessageReceivedEvent
proc getReceiveHandler(self: SubscriptionService): WakuRelayHandler =
return proc(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} =
let msgHash = computeMessageHash(topic, msg).to0xHex()
info "API received message",
pubsubTopic = topic, contentTopic = msg.contentTopic, msgHash = msgHash
MessageReceivedEvent.emit(self.brokerCtx, msgHash, msg)
proc subscribe*(self: SubscriptionService, topic: ContentTopic): Result[void, string] =
let isSubscribed = self.isSubscribed(topic).valueOr:
error "Failed to check subscription status: ", error = error
return err("Failed to check subscription status: " & error)
if isSubscribed == false:
if self.node.wakuRelay.isNil() == false:
self.node.subscribe((kind: ContentSub, topic: topic), self.getReceiveHandler()).isOkOr:
error "Failed to subscribe: ", error = error
return err("Failed to subscribe: " & error)
# TODO: Add support for edge mode with Filter subscription management
return ok()
proc unsubscribe*(
self: SubscriptionService, topic: ContentTopic
): Result[void, string] =
if self.node.wakuRelay.isNil() == false:
self.node.unsubscribe((kind: ContentSub, topic: topic)).isOkOr:
error "Failed to unsubscribe: ", error = error
return err("Failed to unsubscribe: " & error)
# TODO: Add support for edge mode with Filter subscription management
return ok()

View File

@ -19,16 +19,20 @@ import
libp2p/utility
import
../waku_node,
../../waku_relay,
../../waku_core,
../../waku_core/topics/sharding,
../../waku_filter_v2,
../../waku_archive_legacy,
../../waku_archive,
../../waku_store_sync,
../peer_manager,
../../waku_rln_relay
waku/[
waku_relay,
waku_core,
waku_core/topics/sharding,
waku_filter_v2,
waku_archive_legacy,
waku_archive,
waku_store_sync,
waku_rln_relay,
node/waku_node,
node/peer_manager,
common/broker/broker_context,
events/message_events,
]
export waku_relay.WakuRelayHandler
@ -44,14 +48,25 @@ logScope:
## Waku relay
proc registerRelayHandler(
node: WakuNode, topic: PubsubTopic, appHandler: WakuRelayHandler
) =
node: WakuNode, topic: PubsubTopic, appHandler: WakuRelayHandler = nil
): bool =
## Registers the only handler for the given topic.
## Notice that this handler internally calls other handlers, such as filter,
## archive, etc, plus the handler provided by the application.
## Returns `true` if a mesh subscription was created or `false` if the relay
## was already subscribed to the topic.
if node.wakuRelay.isSubscribed(topic):
return
let alreadySubscribed = node.wakuRelay.isSubscribed(topic)
if not appHandler.isNil():
if not alreadySubscribed or not node.legacyAppHandlers.hasKey(topic):
node.legacyAppHandlers[topic] = appHandler
else:
debug "Legacy appHandler already exists for active PubsubTopic, ignoring new handler",
topic = topic
if alreadySubscribed:
return false
proc traceHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
let msgSizeKB = msg.payload.len / 1000
@ -82,6 +97,9 @@ proc registerRelayHandler(
node.wakuStoreReconciliation.messageIngress(topic, msg)
proc internalHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
MessageSeenEvent.emit(node.brokerCtx, topic, msg)
let uniqueTopicHandler = proc(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
@ -89,7 +107,15 @@ proc registerRelayHandler(
await filterHandler(topic, msg)
await archiveHandler(topic, msg)
await syncHandler(topic, msg)
await appHandler(topic, msg)
await internalHandler(topic, msg)
# Call the legacy (kernel API) app handler if it exists.
# Normally, hasKey is false and the MessageSeenEvent bus (new API) is used instead.
# But we need to support legacy behavior (kernel API use), hence this.
# NOTE: We can delete `legacyAppHandlers` if instead we refactor WakuRelay to support multiple
# PubsubTopic handlers, since that's actually supported by libp2p PubSub (bigger refactor...)
if node.legacyAppHandlers.hasKey(topic) and not node.legacyAppHandlers[topic].isNil():
await node.legacyAppHandlers[topic](topic, msg)
node.wakuRelay.subscribe(topic, uniqueTopicHandler)
@ -115,8 +141,11 @@ proc subscribe*(
): Result[void, string] =
## Subscribes to a PubSub or Content topic. Triggers handler when receiving messages on
## this topic. WakuRelayHandler is a method that takes a topic and a Waku message.
## If `handler` is nil, the API call will subscribe to the topic in the relay mesh
## but no app handler will be registered at this time (it can be registered later with
## another call to this proc for the same gossipsub topic).
if node.wakuRelay.isNil():
if isNil(node.wakuRelay):
error "Invalid API call to `subscribe`. WakuRelay not mounted."
return err("Invalid API call to `subscribe`. WakuRelay not mounted.")
@ -124,13 +153,15 @@ proc subscribe*(
error "Failed to decode subscription event", error = error
return err("Failed to decode subscription event: " & error)
if node.wakuRelay.isSubscribed(pubsubTopic):
warn "No-effect API call to subscribe. Already subscribed to topic", pubsubTopic
return ok()
info "subscribe", pubsubTopic, contentTopicOp
node.registerRelayHandler(pubsubTopic, handler)
node.topicSubscriptionQueue.emit((kind: PubsubSub, topic: pubsubTopic))
if node.registerRelayHandler(pubsubTopic, handler):
info "subscribe", pubsubTopic, contentTopicOp
node.topicSubscriptionQueue.emit((kind: PubsubSub, topic: pubsubTopic))
else:
if isNil(handler):
warn "No-effect API call to subscribe. Already subscribed to topic", pubsubTopic
else:
info "subscribe (was already subscribed in the mesh; appHandler set)",
pubsubTopic = pubsubTopic
return ok()
@ -138,8 +169,10 @@ proc unsubscribe*(
node: WakuNode, subscription: SubscriptionEvent
): Result[void, string] =
## Unsubscribes from a specific PubSub or Content topic.
## This will both unsubscribe from the relay mesh and remove the app handler, if any.
## NOTE: This works because using MAPI and Kernel API at the same time is unsupported.
if node.wakuRelay.isNil():
if isNil(node.wakuRelay):
error "Invalid API call to `unsubscribe`. WakuRelay not mounted."
return err("Invalid API call to `unsubscribe`. WakuRelay not mounted.")
@ -147,13 +180,20 @@ proc unsubscribe*(
error "Failed to decode unsubscribe event", error = error
return err("Failed to decode unsubscribe event: " & error)
if not node.wakuRelay.isSubscribed(pubsubTopic):
warn "No-effect API call to `unsubscribe`. Was not subscribed", pubsubTopic
return ok()
let hadHandler = node.legacyAppHandlers.hasKey(pubsubTopic)
if hadHandler:
node.legacyAppHandlers.del(pubsubTopic)
info "unsubscribe", pubsubTopic, contentTopicOp
node.wakuRelay.unsubscribe(pubsubTopic)
node.topicSubscriptionQueue.emit((kind: PubsubUnsub, topic: pubsubTopic))
if node.wakuRelay.isSubscribed(pubsubTopic):
info "unsubscribe", pubsubTopic, contentTopicOp
node.wakuRelay.unsubscribe(pubsubTopic)
node.topicSubscriptionQueue.emit((kind: PubsubUnsub, topic: pubsubTopic))
else:
if not hadHandler:
warn "No-effect API call to `unsubscribe`. Was not subscribed", pubsubTopic
else:
info "unsubscribe (was not subscribed in the mesh; appHandler removed)",
pubsubTopic = pubsubTopic
return ok()

View File

@ -146,6 +146,8 @@ type
started*: bool # Indicates that node has started listening
topicSubscriptionQueue*: AsyncEventQueue[SubscriptionEvent]
rateLimitSettings*: ProtocolRateLimitSettings
legacyAppHandlers*: Table[PubsubTopic, WakuRelayHandler]
## Kernel API Relay appHandlers (if any)
wakuMix*: WakuMix
edgeTopicsHealth*: Table[PubsubTopic, TopicHealth]
edgeHealthEvent*: AsyncEvent

View File

@ -5,19 +5,19 @@ import std/tables, results, chronicles, chronos
import ./push_handler, ../topics, ../message
## Subscription manager
type SubscriptionManager* = object
type LegacySubscriptionManager* = object
subscriptions: TableRef[(string, ContentTopic), FilterPushHandler]
proc init*(T: type SubscriptionManager): T =
SubscriptionManager(
proc init*(T: type LegacySubscriptionManager): T =
LegacySubscriptionManager(
subscriptions: newTable[(string, ContentTopic), FilterPushHandler]()
)
proc clear*(m: var SubscriptionManager) =
proc clear*(m: var LegacySubscriptionManager) =
m.subscriptions.clear()
proc registerSubscription*(
m: SubscriptionManager,
m: LegacySubscriptionManager,
pubsubTopic: PubsubTopic,
contentTopic: ContentTopic,
handler: FilterPushHandler,
@ -29,12 +29,12 @@ proc registerSubscription*(
error "failed to register filter subscription", error = getCurrentExceptionMsg()
proc removeSubscription*(
m: SubscriptionManager, pubsubTopic: PubsubTopic, contentTopic: ContentTopic
m: LegacySubscriptionManager, pubsubTopic: PubsubTopic, contentTopic: ContentTopic
) =
m.subscriptions.del((pubsubTopic, contentTopic))
proc notifySubscriptionHandler*(
m: SubscriptionManager,
m: LegacySubscriptionManager,
pubsubTopic: PubsubTopic,
contentTopic: ContentTopic,
message: WakuMessage,
@ -48,5 +48,5 @@ proc notifySubscriptionHandler*(
except CatchableError:
discard
proc getSubscriptionsCount*(m: SubscriptionManager): int =
proc getSubscriptionsCount*(m: LegacySubscriptionManager): int =
m.subscriptions.len()