mirror of
https://github.com/logos-messaging/logos-delivery.git
synced 2026-02-27 21:53:16 +00:00
Add fixed relay/core shard subscriptions and fix test
This commit is contained in:
parent
0eb6aa07c6
commit
10673afc95
@ -1,45 +1,53 @@
|
||||
{.used.}
|
||||
|
||||
import std/strutils
|
||||
import std/[strutils, net, options]
|
||||
import chronos, testutils/unittests, stew/byteutils
|
||||
import ../testlib/[common, testasync]
|
||||
import libp2p/[peerid, peerinfo, multiaddress, crypto/crypto]
|
||||
import ../testlib/[common, wakucore, wakunode, testasync]
|
||||
|
||||
import
|
||||
waku, waku/[waku_node, waku_core, common/broker/broker_context, events/message_events]
|
||||
import waku/api/api_conf, waku/factory/waku_conf
|
||||
|
||||
const TestTimeout = chronos.seconds(10)
|
||||
const DefaultShard = PubsubTopic("/waku/2/rs/1/0")
|
||||
|
||||
type ReceiveEventListenerManager = ref object
|
||||
brokerCtx: BrokerContext
|
||||
receivedListener: MessageReceivedEventListener
|
||||
receivedFuture: Future[void]
|
||||
receivedEvent: AsyncEvent
|
||||
receivedMessages: seq[WakuMessage]
|
||||
targetCount: int
|
||||
|
||||
proc newReceiveEventListenerManager(
|
||||
brokerCtx: BrokerContext
|
||||
brokerCtx: BrokerContext, expectedCount: int = 1
|
||||
): ReceiveEventListenerManager =
|
||||
let manager = ReceiveEventListenerManager(brokerCtx: brokerCtx, receivedMessages: @[])
|
||||
manager.receivedFuture = newFuture[void]("receivedEvent")
|
||||
let manager = ReceiveEventListenerManager(
|
||||
brokerCtx: brokerCtx, receivedMessages: @[], targetCount: expectedCount
|
||||
)
|
||||
manager.receivedEvent = newAsyncEvent()
|
||||
|
||||
manager.receivedListener = MessageReceivedEvent.listen(
|
||||
brokerCtx,
|
||||
proc(event: MessageReceivedEvent) {.async: (raises: []).} =
|
||||
manager.receivedMessages.add(event.message)
|
||||
echo "RECEIVED EVENT TRIGGERED: contentTopic=", event.message.contentTopic
|
||||
manager.receivedListener = MessageReceivedEvent
|
||||
.listen(
|
||||
brokerCtx,
|
||||
proc(event: MessageReceivedEvent) {.async: (raises: []).} =
|
||||
manager.receivedMessages.add(event.message)
|
||||
|
||||
if not manager.receivedFuture.finished():
|
||||
manager.receivedFuture.complete()
|
||||
,
|
||||
).valueOr:
|
||||
raiseAssert error
|
||||
if manager.receivedMessages.len >= manager.targetCount:
|
||||
manager.receivedEvent.fire()
|
||||
,
|
||||
)
|
||||
.expect("Failed to listen to MessageReceivedEvent")
|
||||
|
||||
return manager
|
||||
|
||||
proc teardown(manager: ReceiveEventListenerManager) =
|
||||
MessageReceivedEvent.dropListener(manager.brokerCtx, manager.receivedListener)
|
||||
|
||||
proc waitForEvent(
|
||||
proc waitForEvents(
|
||||
manager: ReceiveEventListenerManager, timeout: Duration
|
||||
): Future[bool] {.async.} =
|
||||
return await manager.receivedFuture.withTimeout(timeout)
|
||||
return await manager.receivedEvent.wait().withTimeout(timeout)
|
||||
|
||||
proc createApiNodeConf(mode: WakuMode = WakuMode.Core): NodeConfig =
|
||||
let netConf = NetworkingConfig(listenIpv4: "0.0.0.0", p2pTcpPort: 0, discv5UdpPort: 0)
|
||||
@ -54,56 +62,99 @@ proc createApiNodeConf(mode: WakuMode = WakuMode.Core): NodeConfig =
|
||||
p2pReliability = true,
|
||||
)
|
||||
|
||||
suite "Waku API - Subscription Service":
|
||||
asyncTest "Subscription API, two relays with subscribe and receive message":
|
||||
var node1, node2: Waku
|
||||
proc setupSubscriberNode(conf: NodeConfig): Future[Waku] {.async.} =
|
||||
var node: Waku
|
||||
lockNewGlobalBrokerContext:
|
||||
node = (await createNode(conf)).expect("Failed to create subscriber node")
|
||||
(await startWaku(addr node)).expect("Failed to start subscriber node")
|
||||
return node
|
||||
|
||||
proc publishWhenMeshReady(
|
||||
publisher: WakuNode,
|
||||
pubsubTopic: PubsubTopic,
|
||||
contentTopic: ContentTopic,
|
||||
payload: seq[byte],
|
||||
maxRetries: int = 50,
|
||||
retryDelay: Duration = 200.milliseconds,
|
||||
): Future[Result[int, string]] {.async.} =
|
||||
for _ in 0 ..< maxRetries:
|
||||
let msg = WakuMessage(
|
||||
payload: payload, contentTopic: contentTopic, version: 0, timestamp: now()
|
||||
)
|
||||
|
||||
let publishRes = await publisher.publish(some(pubsubTopic), msg)
|
||||
if publishRes.isOk() and publishRes.value > 0:
|
||||
return publishRes
|
||||
|
||||
await sleepAsync(retryDelay)
|
||||
|
||||
return err("Timed out waiting for mesh")
|
||||
|
||||
suite "Messaging API, SubscriptionService":
|
||||
var
|
||||
publisherNode {.threadvar.}: WakuNode
|
||||
publisherPeerInfo {.threadvar.}: RemotePeerInfo
|
||||
publisherPeerId {.threadvar.}: PeerId
|
||||
|
||||
subscriberNode {.threadvar.}: Waku
|
||||
|
||||
asyncSetup:
|
||||
lockNewGlobalBrokerContext:
|
||||
node1 = (await createNode(createApiNodeConf())).valueOr:
|
||||
raiseAssert error
|
||||
(await startWaku(addr node1)).isOkOr:
|
||||
raiseAssert "Failed to start node1"
|
||||
publisherNode =
|
||||
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
|
||||
|
||||
lockNewGlobalBrokerContext:
|
||||
node2 = (await createNode(createApiNodeConf())).valueOr:
|
||||
raiseAssert error
|
||||
(await startWaku(addr node2)).isOkOr:
|
||||
raiseAssert "Failed to start node2"
|
||||
publisherNode.mountMetadata(1, @[0'u16]).expect("Failed to mount metadata")
|
||||
(await publisherNode.mountRelay()).expect("Failed to mount relay")
|
||||
await publisherNode.mountLibp2pPing()
|
||||
await publisherNode.start()
|
||||
|
||||
let node2PeerInfo = node2.node.peerInfo.toRemotePeerInfo()
|
||||
await node1.node.connectToNodes(@[node2PeerInfo])
|
||||
publisherPeerInfo = publisherNode.peerInfo.toRemotePeerInfo()
|
||||
publisherPeerId = publisherNode.peerInfo.peerId
|
||||
|
||||
await sleepAsync(2.seconds)
|
||||
proc dummyHandler(
|
||||
topic: PubsubTopic, msg: WakuMessage
|
||||
): Future[void] {.async, gcsafe.} =
|
||||
discard
|
||||
|
||||
publisherNode.subscribe((kind: PubsubSub, topic: DefaultShard), dummyHandler).expect(
|
||||
"Failed to subscribe publisherNode"
|
||||
)
|
||||
|
||||
asyncTeardown:
|
||||
if not subscriberNode.isNil():
|
||||
(await subscriberNode.stop()).expect("Failed to stop subscriber node")
|
||||
subscriberNode = nil
|
||||
|
||||
if not publisherNode.isNil():
|
||||
await publisherNode.stop()
|
||||
publisherNode = nil
|
||||
|
||||
asyncTest "Subscription API, relay node auto subscribe and receive message":
|
||||
subscriberNode = await setupSubscriberNode(createApiNodeConf(WakuMode.Core))
|
||||
await subscriberNode.node.connectToNodes(@[publisherPeerInfo])
|
||||
let testTopic = ContentTopic("/waku/2/test-content/proto")
|
||||
|
||||
(await node1.subscribe(testTopic)).isOkOr:
|
||||
raiseAssert "Node1 failed to subscribe: " & error
|
||||
(await subscriberNode.subscribe(testTopic)).expect(
|
||||
"subscriberNode failed to subscribe"
|
||||
)
|
||||
|
||||
(await node2.subscribe(testTopic)).isOkOr:
|
||||
raiseAssert "Node2 failed to subscribe: " & error
|
||||
|
||||
await sleepAsync(2.seconds)
|
||||
|
||||
let eventManager = newReceiveEventListenerManager(node2.brokerCtx)
|
||||
let eventManager = newReceiveEventListenerManager(subscriberNode.brokerCtx, 1)
|
||||
defer:
|
||||
eventManager.teardown()
|
||||
|
||||
let envelope = MessageEnvelope.init(testTopic, "hello world payload")
|
||||
let sendResult = await node1.send(envelope)
|
||||
check sendResult.isOk()
|
||||
const testMessageStr = "Hello, world!"
|
||||
let msgPayload = testMessageStr.toBytes()
|
||||
|
||||
const eventTimeout = 5.seconds
|
||||
let receivedInTime = await eventManager.waitForEvent(eventTimeout)
|
||||
discard (
|
||||
await publishWhenMeshReady(publisherNode, DefaultShard, testTopic, msgPayload)
|
||||
).expect("Timed out waiting for mesh to stabilize")
|
||||
|
||||
check receivedInTime == true
|
||||
check eventManager.receivedMessages.len == 1
|
||||
let receivedInTime = await eventManager.waitForEvents(TestTimeout)
|
||||
|
||||
# Hard abort if these conditions aren't met to prevent an IndexDefect below
|
||||
require receivedInTime
|
||||
require eventManager.receivedMessages.len == 1
|
||||
|
||||
let receivedMsg = eventManager.receivedMessages[0]
|
||||
check receivedMsg.contentTopic == testTopic
|
||||
check string.fromBytes(receivedMsg.payload) == "hello world payload"
|
||||
|
||||
(await node1.stop()).isOkOr:
|
||||
raiseAssert error
|
||||
(await node2.stop()).isOkOr:
|
||||
raiseAssert error
|
||||
check string.fromBytes(receivedMsg.payload) == testMessageStr
|
||||
|
||||
@ -35,6 +35,7 @@ import
|
||||
node/health_monitor,
|
||||
node/waku_metrics,
|
||||
node/delivery_service/delivery_service,
|
||||
node/delivery_service/subscription_service,
|
||||
rest_api/message_cache,
|
||||
rest_api/endpoint/server,
|
||||
rest_api/endpoint/builder as rest_server_builder,
|
||||
@ -416,6 +417,37 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises:
|
||||
if not waku[].deliveryService.isNil():
|
||||
waku[].deliveryService.startDeliveryService()
|
||||
|
||||
## Subscription Service
|
||||
if not waku.node.wakuRelay.isNil() and not waku.deliveryService.isNil():
|
||||
let subService = waku.deliveryService.subscriptionService
|
||||
|
||||
if waku.node.wakuAutoSharding.isSome():
|
||||
# Subscribe relay to all shards in autosharding.
|
||||
let autoSharding = waku.node.wakuAutoSharding.get()
|
||||
let clusterId = autoSharding.clusterId
|
||||
let numShards = autoSharding.shardCountGenZero
|
||||
|
||||
if numShards > 0:
|
||||
var clusterPubsubTopics = newSeqOfCap[PubsubTopic](numShards)
|
||||
|
||||
for i in 0 ..< numShards:
|
||||
let shardObj = RelayShard(clusterId: clusterId, shardId: uint16(i))
|
||||
clusterPubsubTopics.add(PubsubTopic($shardObj))
|
||||
|
||||
subService.subscribeShard(clusterPubsubTopics).isOkOr:
|
||||
return err("Failed to auto-subscribe Relay to cluster shards: " & error)
|
||||
else:
|
||||
# Fallback to configured shards when no autosharding.
|
||||
if waku.conf.subscribeShards.len > 0:
|
||||
let manualShards = waku.conf.subscribeShards.mapIt(
|
||||
PubsubTopic(
|
||||
$(RelayShard(clusterId: waku.conf.clusterId, shardId: uint16(it)))
|
||||
)
|
||||
)
|
||||
|
||||
subService.subscribeShard(manualShards).isOkOr:
|
||||
return err("Failed to subscribe Relay to manual shards: " & error)
|
||||
|
||||
## Health Monitor
|
||||
waku[].healthMonitor.startHealthMonitor().isOkOr:
|
||||
return err("failed to start health monitor: " & $error)
|
||||
@ -450,7 +482,7 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises:
|
||||
).isOkOr:
|
||||
error "Failed to set RequestProtocolHealth provider", error = error
|
||||
|
||||
## Setup RequestHealthReport provider (The lost child)
|
||||
## Setup RequestHealthReport provider
|
||||
|
||||
RequestHealthReport.setProvider(
|
||||
globalBrokerContext(),
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user