mirror of
https://github.com/logos-messaging/logos-delivery.git
synced 2026-02-27 21:53:16 +00:00
Merge 4ddc958e050626d5e47e80e72c36b91e37225643 into ba85873f03a1da6ab04287949849815fd97b7bfd
This commit is contained in:
commit
28b4a27fb8
@ -1,3 +1,8 @@
|
||||
{.used.}
|
||||
|
||||
import ./test_entry_nodes, ./test_node_conf, ./test_api_send, ./test_api_health
|
||||
import
|
||||
./test_entry_nodes,
|
||||
./test_node_conf,
|
||||
./test_api_send,
|
||||
./test_api_subscription,
|
||||
./test_api_health
|
||||
|
||||
319
tests/api/test_api_subscription.nim
Normal file
319
tests/api/test_api_subscription.nim
Normal file
@ -0,0 +1,319 @@
|
||||
{.used.}
|
||||
|
||||
import std/[strutils, net, options]
|
||||
import chronos, testutils/unittests, stew/byteutils
|
||||
import libp2p/[peerid, peerinfo, multiaddress, crypto/crypto]
|
||||
import ../testlib/[common, wakucore, wakunode, testasync]
|
||||
|
||||
import
|
||||
waku,
|
||||
waku/[
|
||||
waku_node,
|
||||
waku_core,
|
||||
common/broker/broker_context,
|
||||
events/message_events,
|
||||
waku_relay/protocol,
|
||||
]
|
||||
import waku/api/api_conf, waku/factory/waku_conf
|
||||
|
||||
# TODO: Edge testing (after MAPI edge support is completed)
|
||||
|
||||
const TestTimeout = chronos.seconds(10)
|
||||
const NegativeTestTimeout = chronos.seconds(2)
|
||||
const DefaultShard = PubsubTopic("/waku/2/rs/1/0")
|
||||
|
||||
type ReceiveEventListenerManager = ref object
|
||||
brokerCtx: BrokerContext
|
||||
receivedListener: MessageReceivedEventListener
|
||||
receivedEvent: AsyncEvent
|
||||
receivedMessages: seq[WakuMessage]
|
||||
targetCount: int
|
||||
|
||||
proc newReceiveEventListenerManager(
|
||||
brokerCtx: BrokerContext, expectedCount: int = 1
|
||||
): ReceiveEventListenerManager =
|
||||
let manager = ReceiveEventListenerManager(
|
||||
brokerCtx: brokerCtx, receivedMessages: @[], targetCount: expectedCount
|
||||
)
|
||||
manager.receivedEvent = newAsyncEvent()
|
||||
|
||||
manager.receivedListener = MessageReceivedEvent
|
||||
.listen(
|
||||
brokerCtx,
|
||||
proc(event: MessageReceivedEvent) {.async: (raises: []).} =
|
||||
manager.receivedMessages.add(event.message)
|
||||
|
||||
if manager.receivedMessages.len >= manager.targetCount:
|
||||
manager.receivedEvent.fire()
|
||||
,
|
||||
)
|
||||
.expect("Failed to listen to MessageReceivedEvent")
|
||||
|
||||
return manager
|
||||
|
||||
proc teardown(manager: ReceiveEventListenerManager) =
|
||||
MessageReceivedEvent.dropListener(manager.brokerCtx, manager.receivedListener)
|
||||
|
||||
proc waitForEvents(
|
||||
manager: ReceiveEventListenerManager, timeout: Duration
|
||||
): Future[bool] {.async.} =
|
||||
return await manager.receivedEvent.wait().withTimeout(timeout)
|
||||
|
||||
proc createApiNodeConf(mode: WakuMode = WakuMode.Core): NodeConfig =
|
||||
let netConf = NetworkingConfig(listenIpv4: "0.0.0.0", p2pTcpPort: 0, discv5UdpPort: 0)
|
||||
result = NodeConfig.init(
|
||||
mode = mode,
|
||||
protocolsConfig = ProtocolsConfig.init(
|
||||
entryNodes = @[],
|
||||
clusterId = 1,
|
||||
autoShardingConfig = AutoShardingConfig(numShardsInCluster: 1),
|
||||
),
|
||||
networkingConfig = netConf,
|
||||
p2pReliability = true,
|
||||
)
|
||||
|
||||
proc setupSubscriberNode(conf: NodeConfig): Future[Waku] {.async.} =
|
||||
var node: Waku
|
||||
lockNewGlobalBrokerContext:
|
||||
node = (await createNode(conf)).expect("Failed to create subscriber node")
|
||||
(await startWaku(addr node)).expect("Failed to start subscriber node")
|
||||
return node
|
||||
|
||||
proc waitForMesh*(node: WakuNode, shard: PubsubTopic) {.async.} =
|
||||
for _ in 0 ..< 50:
|
||||
if node.wakuRelay.getNumPeersInMesh(shard).valueOr(0) > 0:
|
||||
return
|
||||
await sleepAsync(100.milliseconds)
|
||||
raise newException(ValueError, "GossipSub Mesh failed to stabilize")
|
||||
|
||||
proc publishWhenMeshReady(
|
||||
publisher: WakuNode,
|
||||
pubsubTopic: PubsubTopic,
|
||||
contentTopic: ContentTopic,
|
||||
payload: seq[byte],
|
||||
): Future[Result[int, string]] {.async.} =
|
||||
await waitForMesh(publisher, pubsubTopic)
|
||||
|
||||
let msg = WakuMessage(
|
||||
payload: payload, contentTopic: contentTopic, version: 0, timestamp: now()
|
||||
)
|
||||
return await publisher.publish(some(pubsubTopic), msg)
|
||||
|
||||
suite "Messaging API, SubscriptionService":
|
||||
var
|
||||
publisherNode {.threadvar.}: WakuNode
|
||||
publisherPeerInfo {.threadvar.}: RemotePeerInfo
|
||||
publisherPeerId {.threadvar.}: PeerId
|
||||
|
||||
subscriberNode {.threadvar.}: Waku
|
||||
|
||||
asyncSetup:
|
||||
lockNewGlobalBrokerContext:
|
||||
publisherNode =
|
||||
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
|
||||
|
||||
publisherNode.mountMetadata(1, @[0'u16]).expect("Failed to mount metadata")
|
||||
(await publisherNode.mountRelay()).expect("Failed to mount relay")
|
||||
await publisherNode.mountLibp2pPing()
|
||||
await publisherNode.start()
|
||||
|
||||
publisherPeerInfo = publisherNode.peerInfo.toRemotePeerInfo()
|
||||
publisherPeerId = publisherNode.peerInfo.peerId
|
||||
|
||||
proc dummyHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
|
||||
discard
|
||||
|
||||
publisherNode.subscribe((kind: PubsubSub, topic: DefaultShard), dummyHandler).expect(
|
||||
"Failed to subscribe publisherNode"
|
||||
)
|
||||
|
||||
asyncTeardown:
|
||||
if not subscriberNode.isNil():
|
||||
(await subscriberNode.stop()).expect("Failed to stop subscriber node")
|
||||
subscriberNode = nil
|
||||
|
||||
if not publisherNode.isNil():
|
||||
await publisherNode.stop()
|
||||
publisherNode = nil
|
||||
|
||||
asyncTest "Subscription API, relay node auto subscribe and receive message":
|
||||
subscriberNode = await setupSubscriberNode(createApiNodeConf(WakuMode.Core))
|
||||
await subscriberNode.node.connectToNodes(@[publisherPeerInfo])
|
||||
let testTopic = ContentTopic("/waku/2/test-content/proto")
|
||||
|
||||
(await subscriberNode.subscribe(testTopic)).expect(
|
||||
"subscriberNode failed to subscribe"
|
||||
)
|
||||
|
||||
let eventManager = newReceiveEventListenerManager(subscriberNode.brokerCtx, 1)
|
||||
defer:
|
||||
eventManager.teardown()
|
||||
|
||||
discard (
|
||||
await publishWhenMeshReady(
|
||||
publisherNode, DefaultShard, testTopic, "Hello, world!".toBytes()
|
||||
)
|
||||
).expect("Publish failed")
|
||||
|
||||
require await eventManager.waitForEvents(TestTimeout)
|
||||
require eventManager.receivedMessages.len == 1
|
||||
check eventManager.receivedMessages[0].contentTopic == testTopic
|
||||
|
||||
asyncTest "Subscription API, relay node ignores unsubscribed content topics on same shard":
|
||||
subscriberNode = await setupSubscriberNode(createApiNodeConf(WakuMode.Core))
|
||||
await subscriberNode.node.connectToNodes(@[publisherPeerInfo])
|
||||
|
||||
let subbedTopic = ContentTopic("/waku/2/subbed-topic/proto")
|
||||
let ignoredTopic = ContentTopic("/waku/2/ignored-topic/proto")
|
||||
(await subscriberNode.subscribe(subbedTopic)).expect("failed to subscribe")
|
||||
|
||||
let eventManager = newReceiveEventListenerManager(subscriberNode.brokerCtx, 1)
|
||||
defer:
|
||||
eventManager.teardown()
|
||||
|
||||
discard (
|
||||
await publishWhenMeshReady(
|
||||
publisherNode, DefaultShard, ignoredTopic, "Ghost Msg".toBytes()
|
||||
)
|
||||
).expect("Publish failed")
|
||||
|
||||
check not await eventManager.waitForEvents(NegativeTestTimeout)
|
||||
check eventManager.receivedMessages.len == 0
|
||||
|
||||
asyncTest "Subscription API, relay node unsubscribe stops message receipt":
|
||||
subscriberNode = await setupSubscriberNode(createApiNodeConf(WakuMode.Core))
|
||||
await subscriberNode.node.connectToNodes(@[publisherPeerInfo])
|
||||
let testTopic = ContentTopic("/waku/2/unsub-test/proto")
|
||||
|
||||
(await subscriberNode.subscribe(testTopic)).expect("failed to subscribe")
|
||||
subscriberNode.unsubscribe(testTopic).expect("failed to unsubscribe")
|
||||
|
||||
let eventManager = newReceiveEventListenerManager(subscriberNode.brokerCtx, 1)
|
||||
defer:
|
||||
eventManager.teardown()
|
||||
|
||||
discard (
|
||||
await publishWhenMeshReady(
|
||||
publisherNode, DefaultShard, testTopic, "Should be dropped".toBytes()
|
||||
)
|
||||
).expect("Publish failed")
|
||||
|
||||
check not await eventManager.waitForEvents(NegativeTestTimeout)
|
||||
check eventManager.receivedMessages.len == 0
|
||||
|
||||
asyncTest "Subscription API, overlapping topics on same shard maintain correct isolation":
|
||||
subscriberNode = await setupSubscriberNode(createApiNodeConf(WakuMode.Core))
|
||||
await subscriberNode.node.connectToNodes(@[publisherPeerInfo])
|
||||
|
||||
let topicA = ContentTopic("/waku/2/topic-a/proto")
|
||||
let topicB = ContentTopic("/waku/2/topic-b/proto")
|
||||
(await subscriberNode.subscribe(topicA)).expect("failed to sub A")
|
||||
(await subscriberNode.subscribe(topicB)).expect("failed to sub B")
|
||||
|
||||
let eventManager = newReceiveEventListenerManager(subscriberNode.brokerCtx, 1)
|
||||
defer:
|
||||
eventManager.teardown()
|
||||
|
||||
await waitForMesh(publisherNode, DefaultShard)
|
||||
|
||||
subscriberNode.unsubscribe(topicA).expect("failed to unsub A")
|
||||
|
||||
discard (
|
||||
await publisherNode.publish(
|
||||
some(DefaultShard),
|
||||
WakuMessage(
|
||||
payload: "Dropped Message".toBytes(),
|
||||
contentTopic: topicA,
|
||||
version: 0,
|
||||
timestamp: now(),
|
||||
),
|
||||
)
|
||||
).expect("Publish A failed")
|
||||
|
||||
discard (
|
||||
await publisherNode.publish(
|
||||
some(DefaultShard),
|
||||
WakuMessage(
|
||||
payload: "Kept Msg".toBytes(),
|
||||
contentTopic: topicB,
|
||||
version: 0,
|
||||
timestamp: now(),
|
||||
),
|
||||
)
|
||||
).expect("Publish B failed")
|
||||
|
||||
require await eventManager.waitForEvents(TestTimeout)
|
||||
require eventManager.receivedMessages.len == 1
|
||||
check eventManager.receivedMessages[0].contentTopic == topicB
|
||||
|
||||
asyncTest "Subscription API, redundant subs tolerated and subs are removed":
|
||||
subscriberNode = await setupSubscriberNode(createApiNodeConf(WakuMode.Core))
|
||||
await subscriberNode.node.connectToNodes(@[publisherPeerInfo])
|
||||
let glitchTopic = ContentTopic("/waku/2/glitch/proto")
|
||||
|
||||
(await subscriberNode.subscribe(glitchTopic)).expect("failed to sub")
|
||||
(await subscriberNode.subscribe(glitchTopic)).expect("failed to double sub")
|
||||
subscriberNode.unsubscribe(glitchTopic).expect("failed to unsub")
|
||||
|
||||
let eventManager = newReceiveEventListenerManager(subscriberNode.brokerCtx, 1)
|
||||
defer:
|
||||
eventManager.teardown()
|
||||
|
||||
discard (
|
||||
await publishWhenMeshReady(
|
||||
publisherNode, DefaultShard, glitchTopic, "Ghost Msg".toBytes()
|
||||
)
|
||||
).expect("Publish failed")
|
||||
|
||||
check not await eventManager.waitForEvents(NegativeTestTimeout)
|
||||
check eventManager.receivedMessages.len == 0
|
||||
|
||||
asyncTest "Subscription API, resubscribe to an unsubscribed topic":
|
||||
subscriberNode = await setupSubscriberNode(createApiNodeConf(WakuMode.Core))
|
||||
await subscriberNode.node.connectToNodes(@[publisherPeerInfo])
|
||||
let testTopic = ContentTopic("/waku/2/resub-test/proto")
|
||||
|
||||
# Subscribe
|
||||
(await subscriberNode.subscribe(testTopic)).expect("Initial sub failed")
|
||||
|
||||
var eventManager = newReceiveEventListenerManager(subscriberNode.brokerCtx, 1)
|
||||
discard (
|
||||
await publishWhenMeshReady(
|
||||
publisherNode, DefaultShard, testTopic, "Msg 1".toBytes()
|
||||
)
|
||||
).expect("Pub 1 failed")
|
||||
|
||||
require await eventManager.waitForEvents(TestTimeout)
|
||||
eventManager.teardown()
|
||||
|
||||
# Unsubscribe and verify teardown
|
||||
subscriberNode.unsubscribe(testTopic).expect("Unsub failed")
|
||||
eventManager = newReceiveEventListenerManager(subscriberNode.brokerCtx, 1)
|
||||
|
||||
discard (
|
||||
await publisherNode.publish(
|
||||
some(DefaultShard),
|
||||
WakuMessage(
|
||||
payload: "Ghost".toBytes(),
|
||||
contentTopic: testTopic,
|
||||
version: 0,
|
||||
timestamp: now(),
|
||||
),
|
||||
)
|
||||
).expect("Ghost pub failed")
|
||||
|
||||
check not await eventManager.waitForEvents(NegativeTestTimeout)
|
||||
eventManager.teardown()
|
||||
|
||||
# Resubscribe
|
||||
(await subscriberNode.subscribe(testTopic)).expect("Resub failed")
|
||||
eventManager = newReceiveEventListenerManager(subscriberNode.brokerCtx, 1)
|
||||
|
||||
discard (
|
||||
await publishWhenMeshReady(
|
||||
publisherNode, DefaultShard, testTopic, "Msg 2".toBytes()
|
||||
)
|
||||
).expect("Pub 2 failed")
|
||||
|
||||
require await eventManager.waitForEvents(TestTimeout)
|
||||
check eventManager.receivedMessages[0].payload == "Msg 2".toBytes()
|
||||
@ -374,6 +374,12 @@ procSuite "WakuNode - Store":
|
||||
waitFor allFutures(client.stop(), server.stop())
|
||||
|
||||
test "Store protocol queries overrun request rate limitation":
|
||||
when defined(macosx):
|
||||
# on macos CI, this test is resulting a code 200 (OK) instead of a 429 error
|
||||
# means the runner is somehow too slow to cause a request limit failure
|
||||
skip()
|
||||
return
|
||||
|
||||
## Setup
|
||||
let
|
||||
serverKey = generateSecp256k1Key()
|
||||
|
||||
@ -48,6 +48,15 @@ proc send*(
|
||||
): Future[Result[RequestId, string]] {.async.} =
|
||||
?checkApiAvailability(w)
|
||||
|
||||
let isSubbed = w.deliveryService.subscriptionService
|
||||
.isSubscribed(envelope.contentTopic)
|
||||
.valueOr(false)
|
||||
if not isSubbed:
|
||||
info "Auto-subscribing to topic on send", contentTopic = envelope.contentTopic
|
||||
w.deliveryService.subscriptionService.subscribe(envelope.contentTopic).isOkOr:
|
||||
warn "Failed to auto-subscribe", error = error
|
||||
return err("Failed to auto-subscribe before sending: " & error)
|
||||
|
||||
let requestId = RequestId.new(w.rng)
|
||||
|
||||
let deliveryTask = DeliveryTask.new(requestId, envelope, w.brokerCtx).valueOr:
|
||||
|
||||
@ -1,6 +1,4 @@
|
||||
import waku/common/broker/event_broker
|
||||
import waku/api/types
|
||||
import waku/waku_core/message
|
||||
import waku/[api/types, waku_core/message, waku_core/topics, common/broker/event_broker]
|
||||
|
||||
export types
|
||||
|
||||
@ -28,3 +26,9 @@ EventBroker:
|
||||
type MessageReceivedEvent* = object
|
||||
messageHash*: string
|
||||
message*: WakuMessage
|
||||
|
||||
EventBroker:
|
||||
# Internal event emitted when a message arrives from the network via any protocol
|
||||
type MessageSeenEvent* = object
|
||||
topic*: PubsubTopic
|
||||
message*: WakuMessage
|
||||
|
||||
@ -35,6 +35,7 @@ import
|
||||
node/health_monitor,
|
||||
node/waku_metrics,
|
||||
node/delivery_service/delivery_service,
|
||||
node/delivery_service/subscription_service,
|
||||
rest_api/message_cache,
|
||||
rest_api/endpoint/server,
|
||||
rest_api/endpoint/builder as rest_server_builder,
|
||||
@ -453,7 +454,7 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises:
|
||||
).isOkOr:
|
||||
error "Failed to set RequestProtocolHealth provider", error = error
|
||||
|
||||
## Setup RequestHealthReport provider (The lost child)
|
||||
## Setup RequestHealthReport provider
|
||||
|
||||
RequestHealthReport.setProvider(
|
||||
globalBrokerContext(),
|
||||
@ -514,6 +515,10 @@ proc stop*(waku: Waku): Future[Result[void, string]] {.async: (raises: []).} =
|
||||
if not waku.wakuDiscv5.isNil():
|
||||
await waku.wakuDiscv5.stop()
|
||||
|
||||
if not waku.deliveryService.isNil():
|
||||
await waku.deliveryService.stopDeliveryService()
|
||||
waku.deliveryService = nil
|
||||
|
||||
if not waku.node.isNil():
|
||||
await waku.node.stop()
|
||||
|
||||
|
||||
@ -38,9 +38,11 @@ proc new*(
|
||||
)
|
||||
|
||||
proc startDeliveryService*(self: DeliveryService) =
|
||||
self.sendService.startSendService()
|
||||
self.subscriptionService.startSubscriptionService()
|
||||
self.recvService.startRecvService()
|
||||
self.sendService.startSendService()
|
||||
|
||||
proc stopDeliveryService*(self: DeliveryService) {.async.} =
|
||||
self.sendService.stopSendService()
|
||||
await self.sendService.stopSendService()
|
||||
await self.recvService.stopRecvService()
|
||||
await self.subscriptionService.stopSubscriptionService()
|
||||
|
||||
@ -2,7 +2,7 @@
|
||||
## receive and is backed by store-v3 requests to get an additional degree of certainty
|
||||
##
|
||||
|
||||
import std/[tables, sequtils, options]
|
||||
import std/[tables, sequtils, options, sets]
|
||||
import chronos, chronicles, libp2p/utility
|
||||
import ../[subscription_service]
|
||||
import
|
||||
@ -13,6 +13,7 @@ import
|
||||
waku_filter_v2/client,
|
||||
waku_core/topics,
|
||||
events/delivery_events,
|
||||
events/message_events,
|
||||
waku_node,
|
||||
common/broker/broker_context,
|
||||
]
|
||||
@ -35,13 +36,8 @@ type RecvMessage = object
|
||||
|
||||
type RecvService* = ref object of RootObj
|
||||
brokerCtx: BrokerContext
|
||||
topicsInterest: Table[PubsubTopic, seq[ContentTopic]]
|
||||
## Tracks message verification requests and when was the last time a
|
||||
## pubsub topic was verified for missing messages
|
||||
## The key contains pubsub-topics
|
||||
node: WakuNode
|
||||
onSubscribeListener: OnFilterSubscribeEventListener
|
||||
onUnsubscribeListener: OnFilterUnsubscribeEventListener
|
||||
seenMsgListener: MessageSeenEventListener
|
||||
subscriptionService: SubscriptionService
|
||||
|
||||
recentReceivedMsgs: seq[RecvMessage]
|
||||
@ -95,20 +91,20 @@ proc msgChecker(self: RecvService) {.async.} =
|
||||
self.endTimeToCheck = getNowInNanosecondTime()
|
||||
|
||||
var msgHashesInStore = newSeq[WakuMessageHash](0)
|
||||
for pubsubTopic, cTopics in self.topicsInterest.pairs:
|
||||
for sub in self.subscriptionService.getActiveSubscriptions():
|
||||
let storeResp: StoreQueryResponse = (
|
||||
await self.node.wakuStoreClient.queryToAny(
|
||||
StoreQueryRequest(
|
||||
includeData: false,
|
||||
pubsubTopic: some(PubsubTopic(pubsubTopic)),
|
||||
contentTopics: cTopics,
|
||||
pubsubTopic: some(PubsubTopic(sub.pubsubTopic)),
|
||||
contentTopics: sub.contentTopics,
|
||||
startTime: some(self.startTimeToCheck - DelayExtra.nanos),
|
||||
endTime: some(self.endTimeToCheck + DelayExtra.nanos),
|
||||
)
|
||||
)
|
||||
).valueOr:
|
||||
error "msgChecker failed to get remote msgHashes",
|
||||
pubsubTopic, cTopics, error = $error
|
||||
pubsubTopic = sub.pubsubTopic, cTopics = sub.contentTopics, error = $error
|
||||
continue
|
||||
|
||||
msgHashesInStore.add(storeResp.messages.mapIt(it.messageHash))
|
||||
@ -133,29 +129,18 @@ proc msgChecker(self: RecvService) {.async.} =
|
||||
## update next check times
|
||||
self.startTimeToCheck = self.endTimeToCheck
|
||||
|
||||
proc onSubscribe(
|
||||
self: RecvService, pubsubTopic: string, contentTopics: seq[string]
|
||||
) {.gcsafe, raises: [].} =
|
||||
info "onSubscribe", pubsubTopic, contentTopics
|
||||
self.topicsInterest.withValue(pubsubTopic, contentTopicsOfInterest):
|
||||
contentTopicsOfInterest[].add(contentTopics)
|
||||
do:
|
||||
self.topicsInterest[pubsubTopic] = contentTopics
|
||||
proc processIncomingMessageOfInterest(
|
||||
self: RecvService, pubsubTopic: string, message: WakuMessage
|
||||
) =
|
||||
## Resolve an incoming network message that was already filtered by topic.
|
||||
## Deduplicate (by hash), store (saves in recently-seen messages) and emit
|
||||
## the MAPI MessageReceivedEvent for every unique incoming message.
|
||||
|
||||
proc onUnsubscribe(
|
||||
self: RecvService, pubsubTopic: string, contentTopics: seq[string]
|
||||
) {.gcsafe, raises: [].} =
|
||||
info "onUnsubscribe", pubsubTopic, contentTopics
|
||||
|
||||
self.topicsInterest.withValue(pubsubTopic, contentTopicsOfInterest):
|
||||
let remainingCTopics =
|
||||
contentTopicsOfInterest[].filterIt(not contentTopics.contains(it))
|
||||
contentTopicsOfInterest[] = remainingCTopics
|
||||
|
||||
if remainingCTopics.len == 0:
|
||||
self.topicsInterest.del(pubsubTopic)
|
||||
do:
|
||||
error "onUnsubscribe unsubscribing from wrong topic", pubsubTopic, contentTopics
|
||||
let msgHash = computeMessageHash(pubsubTopic, message)
|
||||
if not self.recentReceivedMsgs.anyIt(it.msgHash == msgHash):
|
||||
let rxMsg = RecvMessage(msgHash: msgHash, rxTime: message.timestamp)
|
||||
self.recentReceivedMsgs.add(rxMsg)
|
||||
MessageReceivedEvent.emit(self.brokerCtx, msgHash.to0xHex(), message)
|
||||
|
||||
proc new*(T: typedesc[RecvService], node: WakuNode, s: SubscriptionService): T =
|
||||
## The storeClient will help to acquire any possible missed messages
|
||||
@ -166,21 +151,12 @@ proc new*(T: typedesc[RecvService], node: WakuNode, s: SubscriptionService): T =
|
||||
startTimeToCheck: now,
|
||||
brokerCtx: node.brokerCtx,
|
||||
subscriptionService: s,
|
||||
topicsInterest: initTable[PubsubTopic, seq[ContentTopic]](),
|
||||
recentReceivedMsgs: @[],
|
||||
)
|
||||
|
||||
if not node.wakuFilterClient.isNil():
|
||||
let filterPushHandler = proc(
|
||||
pubsubTopic: PubsubTopic, message: WakuMessage
|
||||
) {.async, closure.} =
|
||||
## Captures all the messages received through filter
|
||||
|
||||
let msgHash = computeMessageHash(pubSubTopic, message)
|
||||
let rxMsg = RecvMessage(msgHash: msgHash, rxTime: message.timestamp)
|
||||
recvService.recentReceivedMsgs.add(rxMsg)
|
||||
|
||||
node.wakuFilterClient.registerPushHandler(filterPushHandler)
|
||||
# TODO: For MAPI Edge support, either call node.wakuFilterClient.registerPushHandler
|
||||
# so that the RecvService listens to incoming filter messages,
|
||||
# or have the filter client emit MessageSeenEvent.
|
||||
|
||||
return recvService
|
||||
|
||||
@ -194,26 +170,24 @@ proc startRecvService*(self: RecvService) =
|
||||
self.msgCheckerHandler = self.msgChecker()
|
||||
self.msgPrunerHandler = self.loopPruneOldMessages()
|
||||
|
||||
self.onSubscribeListener = OnFilterSubscribeEvent.listen(
|
||||
self.seenMsgListener = MessageSeenEvent.listen(
|
||||
self.brokerCtx,
|
||||
proc(subsEv: OnFilterSubscribeEvent) {.async: (raises: []).} =
|
||||
self.onSubscribe(subsEv.pubsubTopic, subsEv.contentTopics),
|
||||
).valueOr:
|
||||
error "Failed to set OnFilterSubscribeEvent listener", error = error
|
||||
quit(QuitFailure)
|
||||
proc(event: MessageSeenEvent) {.async: (raises: []).} =
|
||||
if not self.subscriptionService.isSubscribed(
|
||||
event.topic, event.message.contentTopic
|
||||
):
|
||||
return
|
||||
|
||||
self.onUnsubscribeListener = OnFilterUnsubscribeEvent.listen(
|
||||
self.brokerCtx,
|
||||
proc(subsEv: OnFilterUnsubscribeEvent) {.async: (raises: []).} =
|
||||
self.onUnsubscribe(subsEv.pubsubTopic, subsEv.contentTopics),
|
||||
self.processIncomingMessageOfInterest(event.topic, event.message),
|
||||
).valueOr:
|
||||
error "Failed to set OnFilterUnsubscribeEvent listener", error = error
|
||||
error "Failed to set MessageSeenEvent listener", error = error
|
||||
quit(QuitFailure)
|
||||
|
||||
proc stopRecvService*(self: RecvService) {.async.} =
|
||||
OnFilterSubscribeEvent.dropListener(self.brokerCtx, self.onSubscribeListener)
|
||||
OnFilterUnsubscribeEvent.dropListener(self.brokerCtx, self.onUnsubscribeListener)
|
||||
MessageSeenEvent.dropListener(self.brokerCtx, self.seenMsgListener)
|
||||
if not self.msgCheckerHandler.isNil():
|
||||
await self.msgCheckerHandler.cancelAndWait()
|
||||
self.msgCheckerHandler = nil
|
||||
if not self.msgPrunerHandler.isNil():
|
||||
await self.msgPrunerHandler.cancelAndWait()
|
||||
self.msgPrunerHandler = nil
|
||||
|
||||
@ -250,9 +250,9 @@ proc serviceLoop(self: SendService) {.async.} =
|
||||
proc startSendService*(self: SendService) =
|
||||
self.serviceLoopHandle = self.serviceLoop()
|
||||
|
||||
proc stopSendService*(self: SendService) =
|
||||
proc stopSendService*(self: SendService) {.async.} =
|
||||
if not self.serviceLoopHandle.isNil():
|
||||
discard self.serviceLoopHandle.cancelAndWait()
|
||||
await self.serviceLoopHandle.cancelAndWait()
|
||||
|
||||
proc send*(self: SendService, task: DeliveryTask) {.async.} =
|
||||
assert(not task.isNil(), "task for send must not be nil")
|
||||
|
||||
@ -1,64 +1,175 @@
|
||||
import chronos, chronicles
|
||||
import std/[sets, tables, options, strutils], chronos, chronicles, results
|
||||
import
|
||||
waku/[
|
||||
waku_core,
|
||||
waku_core/topics,
|
||||
events/message_events,
|
||||
waku_core/topics/sharding,
|
||||
waku_node,
|
||||
waku_relay,
|
||||
common/broker/broker_context,
|
||||
events/delivery_events,
|
||||
]
|
||||
|
||||
type SubscriptionService* = ref object of RootObj
|
||||
brokerCtx: BrokerContext
|
||||
node: WakuNode
|
||||
contentTopicSubs: Table[PubsubTopic, HashSet[ContentTopic]]
|
||||
## Map of Shard to ContentTopic needed because e.g. WakuRelay is PubsubTopic only.
|
||||
## A present key with an empty HashSet value means pubsubtopic already subscribed
|
||||
## (via subscribePubsubTopics()) but there's no specific content topic interest yet.
|
||||
|
||||
proc new*(T: typedesc[SubscriptionService], node: WakuNode): T =
|
||||
## The storeClient will help to acquire any possible missed messages
|
||||
SubscriptionService(
|
||||
node: node, contentTopicSubs: initTable[PubsubTopic, HashSet[ContentTopic]]()
|
||||
)
|
||||
|
||||
return SubscriptionService(brokerCtx: node.brokerCtx, node: node)
|
||||
proc addContentTopicInterest(
|
||||
self: SubscriptionService, shard: PubsubTopic, topic: ContentTopic
|
||||
): Result[void, string] =
|
||||
if not self.contentTopicSubs.hasKey(shard):
|
||||
self.contentTopicSubs[shard] = initHashSet[ContentTopic]()
|
||||
|
||||
self.contentTopicSubs.withValue(shard, cTopics):
|
||||
if not cTopics[].contains(topic):
|
||||
cTopics[].incl(topic)
|
||||
|
||||
# TODO: Call a "subscribe(shard, topic)" on filter client here,
|
||||
# so the filter client can know that subscriptions changed.
|
||||
|
||||
return ok()
|
||||
|
||||
proc removeContentTopicInterest(
|
||||
self: SubscriptionService, shard: PubsubTopic, topic: ContentTopic
|
||||
): Result[void, string] =
|
||||
self.contentTopicSubs.withValue(shard, cTopics):
|
||||
if cTopics[].contains(topic):
|
||||
cTopics[].excl(topic)
|
||||
|
||||
if cTopics[].len == 0 and isNil(self.node.wakuRelay):
|
||||
self.contentTopicSubs.del(shard) # We're done with cTopics here
|
||||
|
||||
# TODO: Call a "unsubscribe(shard, topic)" on filter client here,
|
||||
# so the filter client can know that subscriptions changed.
|
||||
|
||||
return ok()
|
||||
|
||||
proc subscribePubsubTopics*(
|
||||
self: SubscriptionService, shards: seq[PubsubTopic]
|
||||
): Result[void, string] =
|
||||
if isNil(self.node.wakuRelay):
|
||||
return err("subscribeShard requires a Relay")
|
||||
|
||||
var errors: seq[string] = @[]
|
||||
|
||||
for shard in shards:
|
||||
if not self.contentTopicSubs.hasKey(shard):
|
||||
self.node.subscribe((kind: PubsubSub, topic: shard), nil).isOkOr:
|
||||
errors.add("shard " & shard & ": " & error)
|
||||
continue
|
||||
|
||||
self.contentTopicSubs[shard] = initHashSet[ContentTopic]()
|
||||
|
||||
if errors.len > 0:
|
||||
return err("subscribeShard errors: " & errors.join("; "))
|
||||
|
||||
return ok()
|
||||
|
||||
proc startSubscriptionService*(self: SubscriptionService) =
|
||||
if not isNil(self.node.wakuRelay):
|
||||
if self.node.wakuAutoSharding.isSome():
|
||||
# Subscribe relay to all shards in autosharding.
|
||||
let autoSharding = self.node.wakuAutoSharding.get()
|
||||
let clusterId = autoSharding.clusterId
|
||||
let numShards = autoSharding.shardCountGenZero
|
||||
|
||||
if numShards > 0:
|
||||
var clusterPubsubTopics = newSeqOfCap[PubsubTopic](numShards)
|
||||
|
||||
for i in 0 ..< numShards:
|
||||
let shardObj = RelayShard(clusterId: clusterId, shardId: uint16(i))
|
||||
clusterPubsubTopics.add(PubsubTopic($shardObj))
|
||||
|
||||
self.subscribePubsubTopics(clusterPubsubTopics).isOkOr:
|
||||
error "Failed to auto-subscribe Relay to cluster shards: ", error = error
|
||||
else:
|
||||
# NOTE: We can't fallback to configured shards when no autosharding here since
|
||||
# we don't currently have access to Waku.conf here. However, we don't support
|
||||
# manual/static sharding at the MAPI level anyway so wiring that up now is not needed.
|
||||
# When we no longer auto-subscribe to all shards in Core boot, we will probably
|
||||
# scan the shard config due to fleet nodes; then shard conf will have to be reachable here.
|
||||
# For non-fleet, interactive Core nodes (e.g. Desktop apps) this can't matter
|
||||
# as much since shard subscriptions originate from subscription to content topics, but
|
||||
# I guess even in that case subbing to some conf shards may make sense for some apps.
|
||||
info "SubscriptionService has no AutoSharding for Relay, won't subscribe to shards by default."
|
||||
|
||||
discard
|
||||
|
||||
proc stopSubscriptionService*(self: SubscriptionService) {.async.} =
|
||||
discard
|
||||
|
||||
proc getActiveSubscriptions*(
|
||||
self: SubscriptionService
|
||||
): seq[tuple[pubsubTopic: string, contentTopics: seq[ContentTopic]]] =
|
||||
var activeSubs: seq[tuple[pubsubTopic: string, contentTopics: seq[ContentTopic]]] =
|
||||
@[]
|
||||
|
||||
for pubsub, cTopicSet in self.contentTopicSubs.pairs:
|
||||
if cTopicSet.len > 0:
|
||||
var cTopicSeq = newSeqOfCap[ContentTopic](cTopicSet.len)
|
||||
for t in cTopicSet:
|
||||
cTopicSeq.add(t)
|
||||
activeSubs.add((pubsub, cTopicSeq))
|
||||
|
||||
return activeSubs
|
||||
|
||||
proc getShardForContentTopic(
|
||||
self: SubscriptionService, topic: ContentTopic
|
||||
): Result[PubsubTopic, string] =
|
||||
if self.node.wakuAutoSharding.isSome():
|
||||
let shardObj = ?self.node.wakuAutoSharding.get().getShard(topic)
|
||||
return ok($shardObj)
|
||||
|
||||
return err("SubscriptionService requires AutoSharding")
|
||||
|
||||
proc isSubscribed*(
|
||||
self: SubscriptionService, topic: ContentTopic
|
||||
): Result[bool, string] =
|
||||
var isSubscribed = false
|
||||
if self.node.wakuRelay.isNil() == false:
|
||||
return self.node.isSubscribed((kind: ContentSub, topic: topic))
|
||||
let shard = ?self.getShardForContentTopic(topic)
|
||||
return ok(
|
||||
self.contentTopicSubs.hasKey(shard) and self.contentTopicSubs[shard].contains(topic)
|
||||
)
|
||||
|
||||
# TODO: Add support for edge mode with Filter subscription management
|
||||
return ok(isSubscribed)
|
||||
|
||||
#TODO: later PR may consider to refactor or place this function elsewhere
|
||||
# The only important part is that it emits MessageReceivedEvent
|
||||
proc getReceiveHandler(self: SubscriptionService): WakuRelayHandler =
|
||||
return proc(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} =
|
||||
let msgHash = computeMessageHash(topic, msg).to0xHex()
|
||||
info "API received message",
|
||||
pubsubTopic = topic, contentTopic = msg.contentTopic, msgHash = msgHash
|
||||
|
||||
MessageReceivedEvent.emit(self.brokerCtx, msgHash, msg)
|
||||
proc isSubscribed*(
|
||||
self: SubscriptionService, shard: PubsubTopic, contentTopic: ContentTopic
|
||||
): bool {.raises: [].} =
|
||||
try:
|
||||
return
|
||||
self.contentTopicSubs.hasKey(shard) and
|
||||
self.contentTopicSubs[shard].contains(contentTopic)
|
||||
except KeyError:
|
||||
discard
|
||||
|
||||
proc subscribe*(self: SubscriptionService, topic: ContentTopic): Result[void, string] =
|
||||
let isSubscribed = self.isSubscribed(topic).valueOr:
|
||||
error "Failed to check subscription status: ", error = error
|
||||
return err("Failed to check subscription status: " & error)
|
||||
if isNil(self.node.wakuRelay) and isNil(self.node.wakuFilterClient):
|
||||
return err("SubscriptionService requires either Relay or Filter Client.")
|
||||
|
||||
if isSubscribed == false:
|
||||
if self.node.wakuRelay.isNil() == false:
|
||||
self.node.subscribe((kind: ContentSub, topic: topic), self.getReceiveHandler()).isOkOr:
|
||||
error "Failed to subscribe: ", error = error
|
||||
return err("Failed to subscribe: " & error)
|
||||
let shard = ?self.getShardForContentTopic(topic)
|
||||
|
||||
# TODO: Add support for edge mode with Filter subscription management
|
||||
if not isNil(self.node.wakuRelay) and not self.contentTopicSubs.hasKey(shard):
|
||||
?self.subscribePubsubTopics(@[shard])
|
||||
|
||||
?self.addContentTopicInterest(shard, topic)
|
||||
|
||||
return ok()
|
||||
|
||||
proc unsubscribe*(
|
||||
self: SubscriptionService, topic: ContentTopic
|
||||
): Result[void, string] =
|
||||
if self.node.wakuRelay.isNil() == false:
|
||||
self.node.unsubscribe((kind: ContentSub, topic: topic)).isOkOr:
|
||||
error "Failed to unsubscribe: ", error = error
|
||||
return err("Failed to unsubscribe: " & error)
|
||||
if isNil(self.node.wakuRelay) and isNil(self.node.wakuFilterClient):
|
||||
return err("SubscriptionService requires either Relay or Filter Client.")
|
||||
|
||||
let shard = ?self.getShardForContentTopic(topic)
|
||||
|
||||
if self.isSubscribed(shard, topic):
|
||||
?self.removeContentTopicInterest(shard, topic)
|
||||
|
||||
# TODO: Add support for edge mode with Filter subscription management
|
||||
return ok()
|
||||
|
||||
@ -19,16 +19,20 @@ import
|
||||
libp2p/utility
|
||||
|
||||
import
|
||||
../waku_node,
|
||||
../../waku_relay,
|
||||
../../waku_core,
|
||||
../../waku_core/topics/sharding,
|
||||
../../waku_filter_v2,
|
||||
../../waku_archive_legacy,
|
||||
../../waku_archive,
|
||||
../../waku_store_sync,
|
||||
../peer_manager,
|
||||
../../waku_rln_relay
|
||||
waku/[
|
||||
waku_relay,
|
||||
waku_core,
|
||||
waku_core/topics/sharding,
|
||||
waku_filter_v2,
|
||||
waku_archive_legacy,
|
||||
waku_archive,
|
||||
waku_store_sync,
|
||||
waku_rln_relay,
|
||||
node/waku_node,
|
||||
node/peer_manager,
|
||||
common/broker/broker_context,
|
||||
events/message_events,
|
||||
]
|
||||
|
||||
export waku_relay.WakuRelayHandler
|
||||
|
||||
@ -44,14 +48,25 @@ logScope:
|
||||
## Waku relay
|
||||
|
||||
proc registerRelayHandler(
|
||||
node: WakuNode, topic: PubsubTopic, appHandler: WakuRelayHandler
|
||||
) =
|
||||
node: WakuNode, topic: PubsubTopic, appHandler: WakuRelayHandler = nil
|
||||
): bool =
|
||||
## Registers the only handler for the given topic.
|
||||
## Notice that this handler internally calls other handlers, such as filter,
|
||||
## archive, etc, plus the handler provided by the application.
|
||||
## Returns `true` if a mesh subscription was created or `false` if the relay
|
||||
## was already subscribed to the topic.
|
||||
|
||||
if node.wakuRelay.isSubscribed(topic):
|
||||
return
|
||||
let alreadySubscribed = node.wakuRelay.isSubscribed(topic)
|
||||
|
||||
if not appHandler.isNil():
|
||||
if not alreadySubscribed or not node.legacyAppHandlers.hasKey(topic):
|
||||
node.legacyAppHandlers[topic] = appHandler
|
||||
else:
|
||||
debug "Legacy appHandler already exists for active PubsubTopic, ignoring new handler",
|
||||
topic = topic
|
||||
|
||||
if alreadySubscribed:
|
||||
return false
|
||||
|
||||
proc traceHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
|
||||
let msgSizeKB = msg.payload.len / 1000
|
||||
@ -82,6 +97,9 @@ proc registerRelayHandler(
|
||||
|
||||
node.wakuStoreReconciliation.messageIngress(topic, msg)
|
||||
|
||||
proc internalHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
|
||||
MessageSeenEvent.emit(node.brokerCtx, topic, msg)
|
||||
|
||||
let uniqueTopicHandler = proc(
|
||||
topic: PubsubTopic, msg: WakuMessage
|
||||
): Future[void] {.async, gcsafe.} =
|
||||
@ -89,7 +107,15 @@ proc registerRelayHandler(
|
||||
await filterHandler(topic, msg)
|
||||
await archiveHandler(topic, msg)
|
||||
await syncHandler(topic, msg)
|
||||
await appHandler(topic, msg)
|
||||
await internalHandler(topic, msg)
|
||||
|
||||
# Call the legacy (kernel API) app handler if it exists.
|
||||
# Normally, hasKey is false and the MessageSeenEvent bus (new API) is used instead.
|
||||
# But we need to support legacy behavior (kernel API use), hence this.
|
||||
# NOTE: We can delete `legacyAppHandlers` if instead we refactor WakuRelay to support multiple
|
||||
# PubsubTopic handlers, since that's actually supported by libp2p PubSub (bigger refactor...)
|
||||
if node.legacyAppHandlers.hasKey(topic) and not node.legacyAppHandlers[topic].isNil():
|
||||
await node.legacyAppHandlers[topic](topic, msg)
|
||||
|
||||
node.wakuRelay.subscribe(topic, uniqueTopicHandler)
|
||||
|
||||
@ -115,8 +141,11 @@ proc subscribe*(
|
||||
): Result[void, string] =
|
||||
## Subscribes to a PubSub or Content topic. Triggers handler when receiving messages on
|
||||
## this topic. WakuRelayHandler is a method that takes a topic and a Waku message.
|
||||
## If `handler` is nil, the API call will subscribe to the topic in the relay mesh
|
||||
## but no app handler will be registered at this time (it can be registered later with
|
||||
## another call to this proc for the same gossipsub topic).
|
||||
|
||||
if node.wakuRelay.isNil():
|
||||
if isNil(node.wakuRelay):
|
||||
error "Invalid API call to `subscribe`. WakuRelay not mounted."
|
||||
return err("Invalid API call to `subscribe`. WakuRelay not mounted.")
|
||||
|
||||
@ -124,13 +153,15 @@ proc subscribe*(
|
||||
error "Failed to decode subscription event", error = error
|
||||
return err("Failed to decode subscription event: " & error)
|
||||
|
||||
if node.wakuRelay.isSubscribed(pubsubTopic):
|
||||
warn "No-effect API call to subscribe. Already subscribed to topic", pubsubTopic
|
||||
return ok()
|
||||
|
||||
info "subscribe", pubsubTopic, contentTopicOp
|
||||
node.registerRelayHandler(pubsubTopic, handler)
|
||||
node.topicSubscriptionQueue.emit((kind: PubsubSub, topic: pubsubTopic))
|
||||
if node.registerRelayHandler(pubsubTopic, handler):
|
||||
info "subscribe", pubsubTopic, contentTopicOp
|
||||
node.topicSubscriptionQueue.emit((kind: PubsubSub, topic: pubsubTopic))
|
||||
else:
|
||||
if isNil(handler):
|
||||
warn "No-effect API call to subscribe. Already subscribed to topic", pubsubTopic
|
||||
else:
|
||||
info "subscribe (was already subscribed in the mesh; appHandler set)",
|
||||
pubsubTopic = pubsubTopic
|
||||
|
||||
return ok()
|
||||
|
||||
@ -138,8 +169,10 @@ proc unsubscribe*(
|
||||
node: WakuNode, subscription: SubscriptionEvent
|
||||
): Result[void, string] =
|
||||
## Unsubscribes from a specific PubSub or Content topic.
|
||||
## This will both unsubscribe from the relay mesh and remove the app handler, if any.
|
||||
## NOTE: This works because using MAPI and Kernel API at the same time is unsupported.
|
||||
|
||||
if node.wakuRelay.isNil():
|
||||
if isNil(node.wakuRelay):
|
||||
error "Invalid API call to `unsubscribe`. WakuRelay not mounted."
|
||||
return err("Invalid API call to `unsubscribe`. WakuRelay not mounted.")
|
||||
|
||||
@ -147,13 +180,20 @@ proc unsubscribe*(
|
||||
error "Failed to decode unsubscribe event", error = error
|
||||
return err("Failed to decode unsubscribe event: " & error)
|
||||
|
||||
if not node.wakuRelay.isSubscribed(pubsubTopic):
|
||||
warn "No-effect API call to `unsubscribe`. Was not subscribed", pubsubTopic
|
||||
return ok()
|
||||
let hadHandler = node.legacyAppHandlers.hasKey(pubsubTopic)
|
||||
if hadHandler:
|
||||
node.legacyAppHandlers.del(pubsubTopic)
|
||||
|
||||
info "unsubscribe", pubsubTopic, contentTopicOp
|
||||
node.wakuRelay.unsubscribe(pubsubTopic)
|
||||
node.topicSubscriptionQueue.emit((kind: PubsubUnsub, topic: pubsubTopic))
|
||||
if node.wakuRelay.isSubscribed(pubsubTopic):
|
||||
info "unsubscribe", pubsubTopic, contentTopicOp
|
||||
node.wakuRelay.unsubscribe(pubsubTopic)
|
||||
node.topicSubscriptionQueue.emit((kind: PubsubUnsub, topic: pubsubTopic))
|
||||
else:
|
||||
if not hadHandler:
|
||||
warn "No-effect API call to `unsubscribe`. Was not subscribed", pubsubTopic
|
||||
else:
|
||||
info "unsubscribe (was not subscribed in the mesh; appHandler removed)",
|
||||
pubsubTopic = pubsubTopic
|
||||
|
||||
return ok()
|
||||
|
||||
|
||||
@ -146,6 +146,8 @@ type
|
||||
started*: bool # Indicates that node has started listening
|
||||
topicSubscriptionQueue*: AsyncEventQueue[SubscriptionEvent]
|
||||
rateLimitSettings*: ProtocolRateLimitSettings
|
||||
legacyAppHandlers*: Table[PubsubTopic, WakuRelayHandler]
|
||||
## Kernel API Relay appHandlers (if any)
|
||||
wakuMix*: WakuMix
|
||||
edgeTopicsHealth*: Table[PubsubTopic, TopicHealth]
|
||||
edgeHealthEvent*: AsyncEvent
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user