test: re-introduce usage of ensureSubscriptions

This commit is contained in:
fryorcraken 2025-07-19 14:24:30 +10:00
parent b4787e0e87
commit b8867dee38
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4
7 changed files with 88 additions and 26 deletions

View File

@ -25,10 +25,18 @@ export type ShardId = number;
export interface IRoutingInfoAutoSharding {
pubsubTopic: string;
shardId: ShardId;
// Is the network config really needed for exposure?
// we should probably aim to only expose the above + Cluster Id
networkConfig: AutoSharding;
contentTopic: string;
// This is actually a property of network config, should probably be removed
isAutoSharding: boolean;
isStaticSharding: boolean;
// This is only needed for tests, to setup nwaku node
// might be a cleaner way to handle it
contentTopic: string;
}
export interface IRoutingInfoStaticSharding {

View File

@ -1,6 +1,10 @@
import { LightNode, Protocols } from "@waku/interfaces";
import { createDecoder, createLightNode, utf8ToBytes } from "@waku/sdk";
import { createRoutingInfo, delay } from "@waku/utils";
import {
contentTopicToPubsubTopic,
createRoutingInfo,
delay
} from "@waku/utils";
import { expect } from "chai";
import {
@ -58,7 +62,13 @@ describe("High Throughput Messaging", function () {
await delay(1000);
// TODO await nwaku.ensureSubscriptions(shardInfoToPubsubTopics(shardInfo));
await nwaku.ensureSubscriptions([
contentTopicToPubsubTopic(
ContentTopic,
networkConfig.clusterId,
networkConfig.numShardsInCluster
)
]);
waku = await createLightNode({ networkConfig });
await waku.start();

View File

@ -1,6 +1,10 @@
import { LightNode, Protocols } from "@waku/interfaces";
import { createDecoder, createLightNode, utf8ToBytes } from "@waku/sdk";
import { createRoutingInfo, delay } from "@waku/utils";
import {
contentTopicToPubsubTopic,
createRoutingInfo,
delay
} from "@waku/utils";
import { expect } from "chai";
import {
@ -57,7 +61,13 @@ describe("Longevity", function () {
{ retries: 3 }
);
// TODO await nwaku.ensureSubscriptions(shardInfoToPubsubTopics(shardInfo));
await nwaku.ensureSubscriptions([
contentTopicToPubsubTopic(
ContentTopic,
networkConfig.clusterId,
networkConfig.numShardsInCluster
)
]);
waku = await createLightNode({ networkConfig });
await waku.start();

View File

@ -1,6 +1,10 @@
import { LightNode, Protocols } from "@waku/interfaces";
import { createDecoder, createLightNode, utf8ToBytes } from "@waku/sdk";
import { createRoutingInfo, delay } from "@waku/utils";
import {
contentTopicToPubsubTopic,
createRoutingInfo,
delay
} from "@waku/utils";
import { expect } from "chai";
import {
@ -63,7 +67,13 @@ describe("Throughput Sanity Checks - Different Message Sizes", function () {
await delay(1000);
// TODO await nwaku.ensureSubscriptions(shardInfoToPubsubTopics(shardInfo));
await nwaku.ensureSubscriptions([
contentTopicToPubsubTopic(
ContentTopic,
networkConfig.clusterId,
networkConfig.numShardsInCluster
)
]);
waku = await createLightNode({ networkConfig });
await waku.start();

View File

@ -109,16 +109,7 @@ export async function runNodes<T>(
await waku.dial(await nwaku.getMultiaddrWithId());
await waku.waitForPeers(protocols);
// TODO
// const clusterId = networkConfig.clusterId;
// await nwaku.ensureSubscriptions(
// relayShardsToPubsubTopics({
// clusterId,
// shards: options.relayShards ?? []
// })
// );
await nwaku.ensureSubscriptions(routingInfos.map((r) => r.pubsubTopic));
return [nwaku, waku as T];
} else {

View File

@ -5,7 +5,12 @@ import {
Protocols
} from "@waku/interfaces";
import { createLightNode } from "@waku/sdk";
import { RoutingInfo } from "@waku/utils";
import {
contentTopicToPubsubTopic,
formatPubsubTopic,
isAutoShardingRoutingInfo,
RoutingInfo
} from "@waku/utils";
import { Context } from "mocha";
import pRetry from "p-retry";
@ -63,13 +68,39 @@ export async function runMultipleNodes(
throw new Error("Failed to initialize waku");
}
const pubsubTopics = [];
pubsubTopics.push(routingInfo.pubsubTopic);
if (customArgs?.shard) {
const shards = customArgs?.shard ?? [];
for (const s of shards) {
pubsubTopics.push(
formatPubsubTopic(routingInfo.networkConfig.clusterId, s)
);
}
}
if (customArgs?.contentTopic && isAutoShardingRoutingInfo(routingInfo)) {
const contentTopics = customArgs?.contentTopic ?? [];
for (const ct of contentTopics) {
pubsubTopics.push(
contentTopicToPubsubTopic(
ct,
routingInfo.networkConfig.clusterId,
routingInfo.networkConfig.numShardsInCluster
)
);
}
}
for (const node of serviceNodes.nodes) {
await waku.dial(await node.getMultiaddrWithId());
await waku.waitForPeers([Protocols.Filter, Protocols.LightPush]);
// TODO
// await node.ensureSubscriptions(
// derivePubsubTopicsFromNetworkConfig(networkConfig)
// );
if (pubsubTopics.length > 0) {
await node.ensureSubscriptions(pubsubTopics);
}
const wakuConnections = waku.libp2p.getConnections();

View File

@ -8,7 +8,7 @@ import {
symmetric
} from "@waku/message-encryption";
import { Protocols, utf8ToBytes } from "@waku/sdk";
import { createRoutingInfo } from "@waku/utils";
import { createRoutingInfo, formatPubsubTopic } from "@waku/utils";
import { expect } from "chai";
import {
@ -645,18 +645,20 @@ const runTestsStatic = (strictCheckNodes: boolean): void => {
routingInfo: routingInfoShard2
});
const shardId = 2;
await nwaku2.start({
filter: true,
lightpush: true,
relay: true,
clusterId: TestClusterId,
shard: [2]
shard: [shardId]
});
await waku.dial(await nwaku2.getMultiaddrWithId());
await waku.waitForPeers([Protocols.Filter, Protocols.LightPush]);
// TODO
// await nwaku2.ensureSubscriptions([customDecoder.pubsubTopic]);
await nwaku2.ensureSubscriptions([
formatPubsubTopic(TestClusterId, shardId)
]);
const messageCollector2 = new MessageCollector();