Implement stateful SubscriptionService for Core mode (WIP)

* SubscriptionService tracks shard and content topic interest
* Emit MessageReceivedEvent on subscribed content topics
* Add selectPeers() to PeerManager for future edge node sub impl
* Add test_api_subscriptions.nim with a placeholder sub test
This commit is contained in:
Fabiana Cecin 2026-02-20 09:34:40 -03:00
parent f208cb79ed
commit 0eb6aa07c6
No known key found for this signature in database
GPG Key ID: BCAB8A55CB51B6C7
5 changed files with 307 additions and 67 deletions

View File

@ -1,3 +1,8 @@
{.used.}
import ./test_entry_nodes, ./test_node_conf, ./test_api_send, ./test_api_health
import
./test_entry_nodes,
./test_node_conf,
./test_api_send,
./test_api_subscription,
./test_api_health

View File

@ -0,0 +1,109 @@
{.used.}
import std/strutils
import chronos, testutils/unittests, stew/byteutils
import ../testlib/[common, testasync]
import
waku, waku/[waku_node, waku_core, common/broker/broker_context, events/message_events]
import waku/api/api_conf, waku/factory/waku_conf
type ReceiveEventListenerManager = ref object
brokerCtx: BrokerContext
receivedListener: MessageReceivedEventListener
receivedFuture: Future[void]
receivedMessages: seq[WakuMessage]
proc newReceiveEventListenerManager(
brokerCtx: BrokerContext
): ReceiveEventListenerManager =
let manager = ReceiveEventListenerManager(brokerCtx: brokerCtx, receivedMessages: @[])
manager.receivedFuture = newFuture[void]("receivedEvent")
manager.receivedListener = MessageReceivedEvent.listen(
brokerCtx,
proc(event: MessageReceivedEvent) {.async: (raises: []).} =
manager.receivedMessages.add(event.message)
echo "RECEIVED EVENT TRIGGERED: contentTopic=", event.message.contentTopic
if not manager.receivedFuture.finished():
manager.receivedFuture.complete()
,
).valueOr:
raiseAssert error
return manager
proc teardown(manager: ReceiveEventListenerManager) =
MessageReceivedEvent.dropListener(manager.brokerCtx, manager.receivedListener)
proc waitForEvent(
manager: ReceiveEventListenerManager, timeout: Duration
): Future[bool] {.async.} =
return await manager.receivedFuture.withTimeout(timeout)
proc createApiNodeConf(mode: WakuMode = WakuMode.Core): NodeConfig =
let netConf = NetworkingConfig(listenIpv4: "0.0.0.0", p2pTcpPort: 0, discv5UdpPort: 0)
result = NodeConfig.init(
mode = mode,
protocolsConfig = ProtocolsConfig.init(
entryNodes = @[],
clusterId = 1,
autoShardingConfig = AutoShardingConfig(numShardsInCluster: 1),
),
networkingConfig = netConf,
p2pReliability = true,
)
suite "Waku API - Subscription Service":
asyncTest "Subscription API, two relays with subscribe and receive message":
var node1, node2: Waku
lockNewGlobalBrokerContext:
node1 = (await createNode(createApiNodeConf())).valueOr:
raiseAssert error
(await startWaku(addr node1)).isOkOr:
raiseAssert "Failed to start node1"
lockNewGlobalBrokerContext:
node2 = (await createNode(createApiNodeConf())).valueOr:
raiseAssert error
(await startWaku(addr node2)).isOkOr:
raiseAssert "Failed to start node2"
let node2PeerInfo = node2.node.peerInfo.toRemotePeerInfo()
await node1.node.connectToNodes(@[node2PeerInfo])
await sleepAsync(2.seconds)
let testTopic = ContentTopic("/waku/2/test-content/proto")
(await node1.subscribe(testTopic)).isOkOr:
raiseAssert "Node1 failed to subscribe: " & error
(await node2.subscribe(testTopic)).isOkOr:
raiseAssert "Node2 failed to subscribe: " & error
await sleepAsync(2.seconds)
let eventManager = newReceiveEventListenerManager(node2.brokerCtx)
defer:
eventManager.teardown()
let envelope = MessageEnvelope.init(testTopic, "hello world payload")
let sendResult = await node1.send(envelope)
check sendResult.isOk()
const eventTimeout = 5.seconds
let receivedInTime = await eventManager.waitForEvent(eventTimeout)
check receivedInTime == true
check eventManager.receivedMessages.len == 1
let receivedMsg = eventManager.receivedMessages[0]
check receivedMsg.contentTopic == testTopic
check string.fromBytes(receivedMsg.payload) == "hello world payload"
(await node1.stop()).isOkOr:
raiseAssert error
(await node2.stop()).isOkOr:
raiseAssert error

View File

@ -48,6 +48,15 @@ proc send*(
): Future[Result[RequestId, string]] {.async.} =
?checkApiAvailability(w)
let isSubbed = w.deliveryService.subscriptionService
.isSubscribed(envelope.contentTopic)
.valueOr(false)
if not isSubbed:
info "Auto-subscribing to topic on send", contentTopic = envelope.contentTopic
let subRes = w.deliveryService.subscriptionService.subscribe(envelope.contentTopic)
if subRes.isErr():
warn "Failed to auto-subscribe", error = subRes.error
let requestId = RequestId.new(w.rng)
let deliveryTask = DeliveryTask.new(requestId, envelope, w.brokerCtx).valueOr:

View File

@ -1,64 +1,167 @@
import chronos, chronicles
import std/[sets, tables, options, strutils], chronos, chronicles, results
import
waku/[
waku_core,
waku_core/topics,
waku_core/topics/sharding,
events/message_events,
waku_node,
waku_relay,
common/broker/broker_context,
]
type SubscriptionService* = ref object of RootObj
brokerCtx: BrokerContext
node: WakuNode
shardSubs: HashSet[PubsubTopic]
contentTopicSubs: Table[PubsubTopic, HashSet[ContentTopic]]
relayHandler: WakuRelayHandler
proc new*(T: typedesc[SubscriptionService], node: WakuNode): T =
## The storeClient will help to acquire any possible missed messages
let service = SubscriptionService(
node: node,
shardSubs: initHashSet[PubsubTopic](),
contentTopicSubs: initTable[PubsubTopic, HashSet[ContentTopic]](),
)
return SubscriptionService(brokerCtx: node.brokerCtx, node: node)
service.relayHandler = proc(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
if not service.contentTopicSubs.hasKey(topic) or
not service.contentTopicSubs[topic].contains(msg.contentTopic):
return
let msgHash = computeMessageHash(topic, msg).to0xHex()
info "MessageReceivedEvent",
pubsubTopic = topic, contentTopic = msg.contentTopic, msgHash = msgHash
MessageReceivedEvent.emit(service.node.brokerCtx, msgHash, msg)
return service
proc getShardForContentTopic(
self: SubscriptionService, topic: ContentTopic
): Result[PubsubTopic, string] =
if self.node.wakuAutoSharding.isSome():
let shardObj = ?self.node.wakuAutoSharding.get().getShard(topic)
return ok($shardObj)
return
err("Manual sharding is not supported in this API. Autosharding must be enabled.")
proc doSubscribe(self: SubscriptionService, shard: PubsubTopic): Result[void, string] =
self.node.subscribe((kind: PubsubSub, topic: shard), self.relayHandler).isOkOr:
error "Failed to subscribe to Relay shard", shard = shard, error = error
return err("Failed to subscribe: " & error)
return ok()
proc doUnsubscribe(
self: SubscriptionService, shard: PubsubTopic
): Result[void, string] =
self.node.unsubscribe((kind: PubsubUnsub, topic: shard)).isOkOr:
error "Failed to unsubscribe from Relay shard", shard = shard, error = error
return err("Failed to unsubscribe: " & error)
return ok()
proc isSubscribed*(
self: SubscriptionService, topic: ContentTopic
): Result[bool, string] =
var isSubscribed = false
if self.node.wakuRelay.isNil() == false:
return self.node.isSubscribed((kind: ContentSub, topic: topic))
if self.node.wakuRelay.isNil():
return err("SubscriptionService currently only supports Relay (Core) mode.")
# TODO: Add support for edge mode with Filter subscription management
return ok(isSubscribed)
let shard = ?self.getShardForContentTopic(topic)
#TODO: later PR may consider to refactor or place this function elsewhere
# The only important part is that it emits MessageReceivedEvent
proc getReceiveHandler(self: SubscriptionService): WakuRelayHandler =
return proc(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} =
let msgHash = computeMessageHash(topic, msg).to0xHex()
info "API received message",
pubsubTopic = topic, contentTopic = msg.contentTopic, msgHash = msgHash
if self.contentTopicSubs.hasKey(shard) and self.contentTopicSubs[shard].contains(
topic
):
return ok(true)
MessageReceivedEvent.emit(self.brokerCtx, msgHash, msg)
return ok(false)
proc subscribe*(self: SubscriptionService, topic: ContentTopic): Result[void, string] =
let isSubscribed = self.isSubscribed(topic).valueOr:
error "Failed to check subscription status: ", error = error
return err("Failed to check subscription status: " & error)
if self.node.wakuRelay.isNil():
return err("SubscriptionService currently only supports Relay (Core) mode.")
if isSubscribed == false:
if self.node.wakuRelay.isNil() == false:
self.node.subscribe((kind: ContentSub, topic: topic), self.getReceiveHandler()).isOkOr:
error "Failed to subscribe: ", error = error
return err("Failed to subscribe: " & error)
let shard = ?self.getShardForContentTopic(topic)
# TODO: Add support for edge mode with Filter subscription management
let needShardSub =
not self.shardSubs.contains(shard) and not self.contentTopicSubs.hasKey(shard)
if needShardSub:
?self.doSubscribe(shard)
self.contentTopicSubs.mgetOrPut(shard, initHashSet[ContentTopic]()).incl(topic)
return ok()
proc unsubscribe*(
self: SubscriptionService, topic: ContentTopic
): Result[void, string] =
if self.node.wakuRelay.isNil() == false:
self.node.unsubscribe((kind: ContentSub, topic: topic)).isOkOr:
error "Failed to unsubscribe: ", error = error
return err("Failed to unsubscribe: " & error)
if self.node.wakuRelay.isNil():
return err("SubscriptionService currently only supports Relay (Core) mode.")
let shard = ?self.getShardForContentTopic(topic)
if self.contentTopicSubs.hasKey(shard) and self.contentTopicSubs[shard].contains(
topic
):
let isLastTopic = self.contentTopicSubs[shard].len == 1
let needShardUnsub = isLastTopic and not self.shardSubs.contains(shard)
if needShardUnsub:
?self.doUnsubscribe(shard)
self.contentTopicSubs[shard].excl(topic)
if self.contentTopicSubs[shard].len == 0:
self.contentTopicSubs.del(shard)
return ok()
proc subscribeShard*(
self: SubscriptionService, shards: seq[PubsubTopic]
): Result[void, string] =
if self.node.wakuRelay.isNil():
return err("SubscriptionService currently only supports Relay (Core) mode.")
var errors: seq[string] = @[]
for shard in shards:
if not self.shardSubs.contains(shard):
let needShardSub = not self.contentTopicSubs.hasKey(shard)
if needShardSub:
let res = self.doSubscribe(shard)
if res.isErr():
errors.add("Shard " & shard & " failed: " & res.error)
continue
self.shardSubs.incl(shard)
if errors.len > 0:
return err("Batch subscribe had errors: " & errors.join("; "))
return ok()
proc unsubscribeShard*(
self: SubscriptionService, shards: seq[PubsubTopic]
): Result[void, string] =
if self.node.wakuRelay.isNil():
return err("SubscriptionService currently only supports Relay (Core) mode.")
var errors: seq[string] = @[]
for shard in shards:
if self.shardSubs.contains(shard):
let needShardUnsub = not self.contentTopicSubs.hasKey(shard)
if needShardUnsub:
let res = self.doUnsubscribe(shard)
if res.isErr():
errors.add("Shard " & shard & " failed: " & res.error)
continue
self.shardSubs.excl(shard)
if errors.len > 0:
return err("Batch unsubscribe had errors: " & errors.join("; "))
# TODO: Add support for edge mode with Filter subscription management
return ok()

View File

@ -131,6 +131,14 @@ proc protocolMatcher*(codec: string): Matcher =
return match
proc peerSupportsShard*(peer: RemotePeerInfo, shardInfo: RelayShard): bool =
## Returns true if the given peer has an ENR record with the given shard
## or if it has the shard in its shards list (populated from metadata protocol).
return
(peer.enr.isSome() and peer.enr.get().containsShard(shardInfo)) or
(peer.shards.len > 0 and peer.shards.contains(shardInfo.shardId))
#~~~~~~~~~~~~~~~~~~~~~~~~~~#
# Peer Storage Management #
#~~~~~~~~~~~~~~~~~~~~~~~~~~#
@ -217,54 +225,60 @@ proc loadFromStorage(pm: PeerManager) {.gcsafe.} =
trace "recovered peers from storage", amount = amount
proc selectPeer*(
pm: PeerManager, proto: string, shard: Option[PubsubTopic] = none(PubsubTopic)
): Option[RemotePeerInfo] =
# Selects the best peer for a given protocol
proc selectPeers*(
pm: PeerManager,
proto: string,
amount: int,
shard: Option[PubsubTopic] = none(PubsubTopic),
): seq[RemotePeerInfo] =
# Selects the best peers for a given protocol
var peers = pm.switch.peerStore.getPeersByProtocol(proto)
trace "Selecting peer from peerstore",
protocol = proto, peers, address = cast[uint](pm.switch.peerStore)
trace "Selecting peers from peerstore",
amount = amount, protocol = proto, peers, address = cast[uint](pm.switch.peerStore)
if shard.isSome():
# Parse the shard from the pubsub topic to get cluster and shard ID
let shardInfo = RelayShard.parse(shard.get()).valueOr:
trace "Failed to parse shard from pubsub topic", topic = shard.get()
return none(RemotePeerInfo)
return @[]
# Filter peers that support the requested shard
# Check both ENR (if present) and the shards field on RemotePeerInfo
peers.keepItIf(
# Check ENR if available
(it.enr.isSome() and it.enr.get().containsShard(shard.get())) or
# Otherwise check the shards field directly
(it.shards.len > 0 and it.shards.contains(shardInfo.shardId))
)
peers.keepItIf(it.peerSupportsShard(shardInfo))
shuffle(peers)
# For non relay protocols, we may select the peer that is slotted for the given protocol
if proto != WakuRelayCodec:
pm.serviceSlots.withValue(proto, serviceSlot):
trace "Got peer from service slots",
peerId = serviceSlot[].peerId, multi = serviceSlot[].addrs[0], protocol = proto
return @[serviceSlot[]]
# No criteria for selecting a peer for WakuRelay, random one
if proto == WakuRelayCodec:
# TODO: proper heuristic here that compares peer scores and selects "best" one. For now the first peer for the given protocol is returned
if peers.len > 0:
trace "Got peer from peerstore",
peerId = peers[0].peerId, multi = peers[0].addrs[0], protocol = proto
return some(peers[0])
trace "No peer found for protocol", protocol = proto
return none(RemotePeerInfo)
# For other protocols, we select the peer that is slotted for the given protocol
pm.serviceSlots.withValue(proto, serviceSlot):
trace "Got peer from service slots",
peerId = serviceSlot[].peerId, multi = serviceSlot[].addrs[0], protocol = proto
return some(serviceSlot[])
# If not slotted, we select a random peer for the given protocol
# If no slotted peer available, select random peers for the given protocol
if peers.len > 0:
trace "Got peer from peerstore",
peerId = peers[0].peerId, multi = peers[0].addrs[0], protocol = proto
return some(peers[0])
# TODO: proper heuristic here that compares peer scores and selects the best ones.
shuffle(peers)
let count = min(peers.len, amount)
let selected = peers[0 ..< count]
for i, peer in selected:
trace "Selected peer from peerstore",
peerId = peer.peerId,
multi = peer.addrs[0],
protocol = proto,
num = $(i + 1) & "/" & $count
return selected
trace "No peer found for protocol", protocol = proto
return @[]
proc selectPeer*(
pm: PeerManager, proto: string, shard: Option[PubsubTopic] = none(PubsubTopic)
): Option[RemotePeerInfo] =
let peers = pm.selectPeers(proto, 1, shard)
if peers.len > 0:
return some(peers[0])
return none(RemotePeerInfo)
# Adds a peer to the service slots, which is a list of peers that are slotted for a given protocol