From b8867dee38278d709f9a04cb66bab47b96371e1b Mon Sep 17 00:00:00 2001 From: fryorcraken Date: Sat, 19 Jul 2025 14:24:30 +1000 Subject: [PATCH] test: re-introduce usage of ensureSubscriptions --- packages/interfaces/src/sharding.ts | 10 ++++- .../tests/high-throughput.spec.ts | 14 ++++++- .../reliability-tests/tests/longevity.spec.ts | 14 ++++++- .../tests/throughput-sizes.spec.ts | 14 ++++++- packages/tests/src/lib/runNodes.ts | 11 +---- packages/tests/src/utils/nodes.ts | 41 ++++++++++++++++--- .../tests/tests/filter/subscribe.node.spec.ts | 10 +++-- 7 files changed, 88 insertions(+), 26 deletions(-) diff --git a/packages/interfaces/src/sharding.ts b/packages/interfaces/src/sharding.ts index ee5a0f0bb4..3cbe86d6c7 100644 --- a/packages/interfaces/src/sharding.ts +++ b/packages/interfaces/src/sharding.ts @@ -25,10 +25,18 @@ export type ShardId = number; export interface IRoutingInfoAutoSharding { pubsubTopic: string; shardId: ShardId; + + // Is the network config really needed for exposure? + // we should probably aim to only expose the above + Cluster Id networkConfig: AutoSharding; - contentTopic: string; + + // This is actually a property of network config, should probably be removed isAutoSharding: boolean; isStaticSharding: boolean; + + // This is only needed for tests, to setup nwaku node + // might be a cleaner way to handle it + contentTopic: string; } export interface IRoutingInfoStaticSharding { diff --git a/packages/reliability-tests/tests/high-throughput.spec.ts b/packages/reliability-tests/tests/high-throughput.spec.ts index 357efed5f7..48bb20f5a3 100644 --- a/packages/reliability-tests/tests/high-throughput.spec.ts +++ b/packages/reliability-tests/tests/high-throughput.spec.ts @@ -1,6 +1,10 @@ import { LightNode, Protocols } from "@waku/interfaces"; import { createDecoder, createLightNode, utf8ToBytes } from "@waku/sdk"; -import { createRoutingInfo, delay } from "@waku/utils"; +import { + contentTopicToPubsubTopic, + createRoutingInfo, + delay +} from "@waku/utils"; import { expect } from "chai"; import { @@ -58,7 +62,13 @@ describe("High Throughput Messaging", function () { await delay(1000); - // TODO await nwaku.ensureSubscriptions(shardInfoToPubsubTopics(shardInfo)); + await nwaku.ensureSubscriptions([ + contentTopicToPubsubTopic( + ContentTopic, + networkConfig.clusterId, + networkConfig.numShardsInCluster + ) + ]); waku = await createLightNode({ networkConfig }); await waku.start(); diff --git a/packages/reliability-tests/tests/longevity.spec.ts b/packages/reliability-tests/tests/longevity.spec.ts index 3e7848842f..e0ec05678f 100644 --- a/packages/reliability-tests/tests/longevity.spec.ts +++ b/packages/reliability-tests/tests/longevity.spec.ts @@ -1,6 +1,10 @@ import { LightNode, Protocols } from "@waku/interfaces"; import { createDecoder, createLightNode, utf8ToBytes } from "@waku/sdk"; -import { createRoutingInfo, delay } from "@waku/utils"; +import { + contentTopicToPubsubTopic, + createRoutingInfo, + delay +} from "@waku/utils"; import { expect } from "chai"; import { @@ -57,7 +61,13 @@ describe("Longevity", function () { { retries: 3 } ); - // TODO await nwaku.ensureSubscriptions(shardInfoToPubsubTopics(shardInfo)); + await nwaku.ensureSubscriptions([ + contentTopicToPubsubTopic( + ContentTopic, + networkConfig.clusterId, + networkConfig.numShardsInCluster + ) + ]); waku = await createLightNode({ networkConfig }); await waku.start(); diff --git a/packages/reliability-tests/tests/throughput-sizes.spec.ts b/packages/reliability-tests/tests/throughput-sizes.spec.ts index 6d556adbd9..7044176223 100644 --- a/packages/reliability-tests/tests/throughput-sizes.spec.ts +++ b/packages/reliability-tests/tests/throughput-sizes.spec.ts @@ -1,6 +1,10 @@ import { LightNode, Protocols } from "@waku/interfaces"; import { createDecoder, createLightNode, utf8ToBytes } from "@waku/sdk"; -import { createRoutingInfo, delay } from "@waku/utils"; +import { + contentTopicToPubsubTopic, + createRoutingInfo, + delay +} from "@waku/utils"; import { expect } from "chai"; import { @@ -63,7 +67,13 @@ describe("Throughput Sanity Checks - Different Message Sizes", function () { await delay(1000); - // TODO await nwaku.ensureSubscriptions(shardInfoToPubsubTopics(shardInfo)); + await nwaku.ensureSubscriptions([ + contentTopicToPubsubTopic( + ContentTopic, + networkConfig.clusterId, + networkConfig.numShardsInCluster + ) + ]); waku = await createLightNode({ networkConfig }); await waku.start(); diff --git a/packages/tests/src/lib/runNodes.ts b/packages/tests/src/lib/runNodes.ts index 19ba198bd3..687ae1af25 100644 --- a/packages/tests/src/lib/runNodes.ts +++ b/packages/tests/src/lib/runNodes.ts @@ -109,16 +109,7 @@ export async function runNodes( await waku.dial(await nwaku.getMultiaddrWithId()); await waku.waitForPeers(protocols); - // TODO - - // const clusterId = networkConfig.clusterId; - - // await nwaku.ensureSubscriptions( - // relayShardsToPubsubTopics({ - // clusterId, - // shards: options.relayShards ?? [] - // }) - // ); + await nwaku.ensureSubscriptions(routingInfos.map((r) => r.pubsubTopic)); return [nwaku, waku as T]; } else { diff --git a/packages/tests/src/utils/nodes.ts b/packages/tests/src/utils/nodes.ts index 3490193d84..2275f5bd0c 100644 --- a/packages/tests/src/utils/nodes.ts +++ b/packages/tests/src/utils/nodes.ts @@ -5,7 +5,12 @@ import { Protocols } from "@waku/interfaces"; import { createLightNode } from "@waku/sdk"; -import { RoutingInfo } from "@waku/utils"; +import { + contentTopicToPubsubTopic, + formatPubsubTopic, + isAutoShardingRoutingInfo, + RoutingInfo +} from "@waku/utils"; import { Context } from "mocha"; import pRetry from "p-retry"; @@ -63,13 +68,39 @@ export async function runMultipleNodes( throw new Error("Failed to initialize waku"); } + const pubsubTopics = []; + + pubsubTopics.push(routingInfo.pubsubTopic); + + if (customArgs?.shard) { + const shards = customArgs?.shard ?? []; + for (const s of shards) { + pubsubTopics.push( + formatPubsubTopic(routingInfo.networkConfig.clusterId, s) + ); + } + } + + if (customArgs?.contentTopic && isAutoShardingRoutingInfo(routingInfo)) { + const contentTopics = customArgs?.contentTopic ?? []; + for (const ct of contentTopics) { + pubsubTopics.push( + contentTopicToPubsubTopic( + ct, + routingInfo.networkConfig.clusterId, + routingInfo.networkConfig.numShardsInCluster + ) + ); + } + } + for (const node of serviceNodes.nodes) { await waku.dial(await node.getMultiaddrWithId()); await waku.waitForPeers([Protocols.Filter, Protocols.LightPush]); - // TODO - // await node.ensureSubscriptions( - // derivePubsubTopicsFromNetworkConfig(networkConfig) - // ); + + if (pubsubTopics.length > 0) { + await node.ensureSubscriptions(pubsubTopics); + } const wakuConnections = waku.libp2p.getConnections(); diff --git a/packages/tests/tests/filter/subscribe.node.spec.ts b/packages/tests/tests/filter/subscribe.node.spec.ts index 6fd66b9ef2..fd2bcb5290 100644 --- a/packages/tests/tests/filter/subscribe.node.spec.ts +++ b/packages/tests/tests/filter/subscribe.node.spec.ts @@ -8,7 +8,7 @@ import { symmetric } from "@waku/message-encryption"; import { Protocols, utf8ToBytes } from "@waku/sdk"; -import { createRoutingInfo } from "@waku/utils"; +import { createRoutingInfo, formatPubsubTopic } from "@waku/utils"; import { expect } from "chai"; import { @@ -645,18 +645,20 @@ const runTestsStatic = (strictCheckNodes: boolean): void => { routingInfo: routingInfoShard2 }); + const shardId = 2; await nwaku2.start({ filter: true, lightpush: true, relay: true, clusterId: TestClusterId, - shard: [2] + shard: [shardId] }); await waku.dial(await nwaku2.getMultiaddrWithId()); await waku.waitForPeers([Protocols.Filter, Protocols.LightPush]); - // TODO - // await nwaku2.ensureSubscriptions([customDecoder.pubsubTopic]); + await nwaku2.ensureSubscriptions([ + formatPubsubTopic(TestClusterId, shardId) + ]); const messageCollector2 = new MessageCollector();