chore: update interop test suit for latest nwaku (0.35.1) (#2345)

* update Filter test suit, make service nodes connected to each other, remove single node Filter test suit, use 0.35 nwaku image

* update light push tests

* improve auto shard tests

* update static sharding test

* skip blocked tests

* fix test

* remove usage of pusubtopic with nwaku

* remove comment
This commit is contained in:
Sasha 2025-04-11 01:34:11 +02:00 committed by GitHub
parent 28f28d0d36
commit 3793e6f5c0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
33 changed files with 752 additions and 2831 deletions

View File

@ -122,14 +122,14 @@ jobs:
uses: ./.github/workflows/test-node.yml uses: ./.github/workflows/test-node.yml
secrets: inherit secrets: inherit
with: with:
nim_wakunode_image: ${{ inputs.nim_wakunode_image || 'wakuorg/nwaku:v0.31.0' }} nim_wakunode_image: ${{ inputs.nim_wakunode_image || 'wakuorg/nwaku:v0.35.1' }}
test_type: node test_type: node
allure_reports: true allure_reports: true
node_optional: node_optional:
uses: ./.github/workflows/test-node.yml uses: ./.github/workflows/test-node.yml
with: with:
nim_wakunode_image: ${{ inputs.nim_wakunode_image || 'wakuorg/nwaku:v0.31.0' }} nim_wakunode_image: ${{ inputs.nim_wakunode_image || 'wakuorg/nwaku:v0.35.1' }}
test_type: node-optional test_type: node-optional
node_with_nwaku_master: node_with_nwaku_master:

View File

@ -18,7 +18,7 @@ export default class Dockerode {
public containerId?: string; public containerId?: string;
private static network: Docker.Network; private static network: Docker.Network;
private containerIp: string; public readonly containerIp: string;
private constructor(imageName: string, containerIp: string) { private constructor(imageName: string, containerIp: string) {
this.docker = new Docker(); this.docker = new Docker();
@ -107,7 +107,10 @@ export default class Dockerode {
const container = await this.docker.createContainer({ const container = await this.docker.createContainer({
Image: this.IMAGE_NAME, Image: this.IMAGE_NAME,
HostConfig: { HostConfig: {
NetworkMode: NETWORK_NAME,
AutoRemove: true, AutoRemove: true,
Dns: ["8.8.8.8"],
Links: [],
PortBindings: { PortBindings: {
[`${restPort}/tcp`]: [{ HostPort: restPort.toString() }], [`${restPort}/tcp`]: [{ HostPort: restPort.toString() }],
[`${tcpPort}/tcp`]: [{ HostPort: tcpPort.toString() }], [`${tcpPort}/tcp`]: [{ HostPort: tcpPort.toString() }],
@ -135,18 +138,19 @@ export default class Dockerode {
[`${discv5UdpPort}/udp`]: {} [`${discv5UdpPort}/udp`]: {}
}) })
}, },
Cmd: argsArrayWithIP Cmd: argsArrayWithIP,
}); NetworkingConfig: {
await container.start(); EndpointsConfig: {
[NETWORK_NAME]: {
await Dockerode.network.connect({ IPAMConfig: {
Container: container.id, IPv4Address: this.containerIp
EndpointConfig: { }
IPAMConfig: { }
IPv4Address: this.containerIp
} }
} }
}); });
await container.start();
const logStream = fs.createWriteStream(logPath); const logStream = fs.createWriteStream(logPath);
container.logs( container.logs(

View File

@ -1,6 +1,6 @@
import { DecodedMessage } from "@waku/core"; import { DecodedMessage } from "@waku/core";
import { NetworkConfig } from "@waku/interfaces"; import { AutoSharding, NetworkConfig, StaticSharding } from "@waku/interfaces";
import { derivePubsubTopicsFromNetworkConfig, Logger } from "@waku/utils"; import { contentTopicToShardIndex, Logger } from "@waku/utils";
import { expect } from "chai"; import { expect } from "chai";
import { DefaultTestPubsubTopic } from "../constants.js"; import { DefaultTestPubsubTopic } from "../constants.js";
@ -29,24 +29,27 @@ export class ServiceNodesFleet {
_args?: Args, _args?: Args,
withoutFilter = false withoutFilter = false
): Promise<ServiceNodesFleet> { ): Promise<ServiceNodesFleet> {
const serviceNodePromises = Array.from( const nodes: ServiceNode[] = [];
{ length: nodesToCreate },
async () => {
const node = new ServiceNode(
makeLogFileName(mochaContext) +
Math.random().toString(36).substring(7)
);
const args = getArgs(networkConfig, _args); for (let i = 0; i < nodesToCreate; i++) {
await node.start(args, { const node = new ServiceNode(
retries: 3 makeLogFileName(mochaContext) + Math.random().toString(36).substring(7)
}); );
return node; const args = getArgs(networkConfig, _args);
if (nodes[0]) {
const addr = await nodes[0].getExternalMultiaddr();
args.staticnode = addr ?? args.staticnode;
} }
);
const nodes = await Promise.all(serviceNodePromises); await node.start(args, {
retries: 3
});
nodes.push(node);
}
return new ServiceNodesFleet(nodes, withoutFilter, strictChecking); return new ServiceNodesFleet(nodes, withoutFilter, strictChecking);
} }
@ -251,16 +254,23 @@ class MultipleNodesMessageCollector {
} }
function getArgs(networkConfig: NetworkConfig, args?: Args): Args { function getArgs(networkConfig: NetworkConfig, args?: Args): Args {
const pubsubTopics = derivePubsubTopicsFromNetworkConfig(networkConfig);
const defaultArgs = { const defaultArgs = {
lightpush: true, lightpush: true,
filter: true, filter: true,
discv5Discovery: true, discv5Discovery: true,
peerExchange: true, peerExchange: true,
relay: true, relay: true,
pubsubTopic: pubsubTopics,
clusterId: networkConfig.clusterId clusterId: networkConfig.clusterId
} as Args; } as Args;
if ((networkConfig as StaticSharding).shards) {
defaultArgs.shard = (networkConfig as StaticSharding).shards;
} else if ((networkConfig as AutoSharding).contentTopics) {
defaultArgs.contentTopic = (networkConfig as AutoSharding).contentTopics;
defaultArgs.shard = (networkConfig as AutoSharding).contentTopics.map(
(topic) => contentTopicToShardIndex(topic)
);
}
return { ...defaultArgs, ...args }; return { ...defaultArgs, ...args };
} }

View File

@ -269,8 +269,6 @@ export class MessageCollector {
} }
private getPubsubTopicToUse(pubsubTopic: string | undefined): string { private getPubsubTopicToUse(pubsubTopic: string | undefined): string {
return ( return pubsubTopic || DefaultTestPubsubTopic;
pubsubTopic || this.nwaku?.pubsubTopics?.[0] || DefaultTestPubsubTopic
);
} }
} }

View File

@ -37,7 +37,7 @@ export async function runNodes<T>(
lightpush: true, lightpush: true,
relay: true, relay: true,
store: true, store: true,
pubsubTopic: pubsubTopics, shard: shardInfo.shards,
clusterId: shardInfo.clusterId clusterId: shardInfo.clusterId
}, },
{ retries: 3 } { retries: 3 }

View File

@ -1,7 +1,7 @@
import type { PeerId } from "@libp2p/interface"; import type { PeerId } from "@libp2p/interface";
import { peerIdFromString } from "@libp2p/peer-id"; import { peerIdFromString } from "@libp2p/peer-id";
import { Multiaddr, multiaddr } from "@multiformats/multiaddr"; import { Multiaddr, multiaddr } from "@multiformats/multiaddr";
import { isDefined } from "@waku/utils"; import { isDefined, shardInfoToPubsubTopics } from "@waku/utils";
import { Logger } from "@waku/utils"; import { Logger } from "@waku/utils";
import pRetry from "p-retry"; import pRetry from "p-retry";
import portfinder from "portfinder"; import portfinder from "portfinder";
@ -27,7 +27,7 @@ const WAKU_SERVICE_NODE_PARAMS =
const NODE_READY_LOG_LINE = "Node setup complete"; const NODE_READY_LOG_LINE = "Node setup complete";
export const DOCKER_IMAGE_NAME = export const DOCKER_IMAGE_NAME =
process.env.WAKUNODE_IMAGE || "wakuorg/nwaku:v0.31.0"; process.env.WAKUNODE_IMAGE || "wakuorg/nwaku:v0.35.1";
const LOG_DIR = "./log"; const LOG_DIR = "./log";
@ -235,9 +235,15 @@ export class ServiceNode {
); );
} }
public async messages(pubsubTopic?: string): Promise<MessageRpcResponse[]> { public async messages(_pubsubTopic?: string): Promise<MessageRpcResponse[]> {
const pubsubTopic =
_pubsubTopic ??
shardInfoToPubsubTopics({
clusterId: this.args?.clusterId,
shards: this.args?.shard
})[0];
return this.restCall<MessageRpcResponse[]>( return this.restCall<MessageRpcResponse[]>(
`/relay/v1/messages/${encodeURIComponent(pubsubTopic || this?.args?.pubsubTopic?.[0] || DefaultTestPubsubTopic)}`, `/relay/v1/messages/${encodeURIComponent(pubsubTopic)}`,
"GET", "GET",
null, null,
async (response) => { async (response) => {
@ -262,7 +268,7 @@ export class ServiceNode {
public async sendMessage( public async sendMessage(
message: MessageRpcQuery, message: MessageRpcQuery,
pubsubTopic?: string _pubsubTopic?: string
): Promise<boolean> { ): Promise<boolean> {
this.checkProcess(); this.checkProcess();
@ -270,8 +276,14 @@ export class ServiceNode {
message.timestamp = BigInt(new Date().valueOf()) * OneMillion; message.timestamp = BigInt(new Date().valueOf()) * OneMillion;
} }
const pubsubTopic =
_pubsubTopic ??
shardInfoToPubsubTopics({
clusterId: this.args?.clusterId,
shards: this.args?.shard
})[0];
return this.restCall<boolean>( return this.restCall<boolean>(
`/relay/v1/messages/${encodeURIComponent(pubsubTopic || this.args?.pubsubTopic?.[0] || DefaultTestPubsubTopic)}`, `/relay/v1/messages/${encodeURIComponent(pubsubTopic || DefaultTestPubsubTopic)}`,
"POST", "POST",
message, message,
async (response) => response.status === 200 async (response) => response.status === 200
@ -348,10 +360,6 @@ export class ServiceNode {
return `http://127.0.0.1:${this.restPort}`; return `http://127.0.0.1:${this.restPort}`;
} }
public get pubsubTopics(): string[] {
return this.args?.pubsubTopic ?? [];
}
public async restCall<T>( public async restCall<T>(
endpoint: string, endpoint: string,
method: "GET" | "POST", method: "GET" | "POST",
@ -377,6 +385,15 @@ export class ServiceNode {
} }
} }
public async getExternalMultiaddr(): Promise<string | undefined> {
if (!this.docker?.container) {
return undefined;
}
const containerIp = this.docker.containerIp;
const peerId = await this.getPeerId();
return `/ip4/${containerIp}/tcp/${this.websocketPort}/ws/p2p/${peerId}`;
}
private checkProcess(): void { private checkProcess(): void {
if (!this.docker?.container) { if (!this.docker?.container) {
throw `Container hasn't started`; throw `Container hasn't started`;

View File

@ -3,7 +3,7 @@ import { promisify } from "util";
const execAsync = promisify(exec); const execAsync = promisify(exec);
const WAKUNODE_IMAGE = process.env.WAKUNODE_IMAGE || "wakuorg/nwaku:v0.31.0"; const WAKUNODE_IMAGE = process.env.WAKUNODE_IMAGE || "wakuorg/nwaku:v0.35.1";
async function main() { async function main() {
try { try {

View File

@ -7,7 +7,7 @@ import { ServiceNode } from "./lib/index.js";
const execAsync = promisify(exec); const execAsync = promisify(exec);
const WAKUNODE_IMAGE = process.env.WAKUNODE_IMAGE || "wakuorg/nwaku:v0.31.0"; const WAKUNODE_IMAGE = process.env.WAKUNODE_IMAGE || "wakuorg/nwaku:v0.35.1";
const containerName = "rln_tree"; const containerName = "rln_tree";
async function syncRlnTree() { async function syncRlnTree() {

View File

@ -14,7 +14,6 @@ export interface Args {
peerExchange?: boolean; peerExchange?: boolean;
discv5Discovery?: boolean; discv5Discovery?: boolean;
storeMessageDbUrl?: string; storeMessageDbUrl?: string;
pubsubTopic?: Array<string>;
contentTopic?: Array<string>; contentTopic?: Array<string>;
websocketSupport?: boolean; websocketSupport?: boolean;
tcpPort?: number; tcpPort?: number;

View File

@ -45,7 +45,6 @@ export async function runMultipleNodes(
}; };
const waku = await createLightNode(wakuOptions); const waku = await createLightNode(wakuOptions);
await waku.start();
if (!waku) { if (!waku) {
throw new Error("Failed to initialize waku"); throw new Error("Failed to initialize waku");

View File

@ -6,7 +6,6 @@ import { expect } from "chai";
import { import {
afterEachCustom, afterEachCustom,
DefaultTestPubsubTopic,
makeLogFileName, makeLogFileName,
NOISE_KEY_1, NOISE_KEY_1,
ServiceNode, ServiceNode,
@ -30,7 +29,8 @@ describe("ENR Interop: ServiceNode", function () {
store: false, store: false,
filter: false, filter: false,
lightpush: false, lightpush: false,
pubsubTopic: [DefaultTestPubsubTopic] clusterId: DefaultTestShardInfo.clusterId,
shard: DefaultTestShardInfo.shards
}); });
const multiAddrWithId = await nwaku.getMultiaddrWithId(); const multiAddrWithId = await nwaku.getMultiaddrWithId();
@ -64,7 +64,8 @@ describe("ENR Interop: ServiceNode", function () {
store: true, store: true,
filter: false, filter: false,
lightpush: false, lightpush: false,
pubsubTopic: [DefaultTestPubsubTopic] clusterId: DefaultTestShardInfo.clusterId,
shard: DefaultTestShardInfo.shards
}); });
const multiAddrWithId = await nwaku.getMultiaddrWithId(); const multiAddrWithId = await nwaku.getMultiaddrWithId();
@ -98,7 +99,8 @@ describe("ENR Interop: ServiceNode", function () {
store: true, store: true,
filter: true, filter: true,
lightpush: true, lightpush: true,
pubsubTopic: [DefaultTestPubsubTopic] clusterId: DefaultTestShardInfo.clusterId,
shard: DefaultTestShardInfo.shards
}); });
const multiAddrWithId = await nwaku.getMultiaddrWithId(); const multiAddrWithId = await nwaku.getMultiaddrWithId();

View File

@ -15,7 +15,11 @@ import {
createEncoder as createSymEncoder createEncoder as createSymEncoder
} from "@waku/message-encryption/symmetric"; } from "@waku/message-encryption/symmetric";
import { createLightNode } from "@waku/sdk"; import { createLightNode } from "@waku/sdk";
import { contentTopicToPubsubTopic, Logger } from "@waku/utils"; import {
contentTopicToPubsubTopic,
contentTopicToShardIndex,
Logger
} from "@waku/utils";
import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes"; import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai"; import { expect } from "chai";
@ -84,14 +88,15 @@ describe("Waku Message Ephemeral field", function () {
beforeEachCustom(this, async () => { beforeEachCustom(this, async () => {
nwaku = new ServiceNode(makeLogFileName(this.ctx)); nwaku = new ServiceNode(makeLogFileName(this.ctx));
const contentTopics = [TestContentTopic, AsymContentTopic, SymContentTopic];
await nwaku.start({ await nwaku.start({
filter: true, filter: true,
lightpush: true, lightpush: true,
store: true, store: true,
relay: true, relay: true,
pubsubTopic: [PubsubTopic], contentTopic: contentTopics,
contentTopic: [TestContentTopic, AsymContentTopic, SymContentTopic], clusterId: ClusterId,
clusterId: ClusterId shard: contentTopics.map((t) => contentTopicToShardIndex(t))
}); });
await nwaku.ensureSubscriptionsAutosharding([ await nwaku.ensureSubscriptionsAutosharding([
TestContentTopic, TestContentTopic,

View File

@ -146,6 +146,29 @@ const runTests = (strictCheckNodes: boolean): void => {
).to.eq(false); ).to.eq(false);
}); });
it("Check message with no pubsub topic is not received", async function () {
await waku.filter.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
await delay(400);
await serviceNodes.nodes[0].restCall<boolean>(
`/relay/v1/messages/`,
"POST",
{
contentTopic: TestContentTopic,
payload: Buffer.from(utf8ToBytes(messageText)).toString("base64"),
timestamp: BigInt(Date.now()) * BigInt(1000000)
},
async (res) => res.status === 200
);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
false
);
});
it("Check message with no content topic is not received", async function () { it("Check message with no content topic is not received", async function () {
await waku.filter.subscribe( await waku.filter.subscribe(
[TestDecoder], [TestDecoder],

View File

@ -1,456 +0,0 @@
import { createDecoder, createEncoder } from "@waku/core";
import type {
ContentTopicInfo,
LightNode,
ShardInfo,
SingleShardInfo
} from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import {
contentTopicToPubsubTopic,
contentTopicToShardIndex,
singleShardInfoToPubsubTopic
} from "@waku/utils";
import { utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai";
import {
afterEachCustom,
beforeEachCustom,
makeLogFileName,
MessageCollector,
ServiceNode,
tearDownNodes
} from "../../../src/index.js";
import { runNodes } from "./utils.js";
describe("Waku Filter V2: Multiple PubsubTopics", function () {
// Set the timeout for all tests in this suite. Can be overwritten at test level
this.timeout(30000);
let waku: LightNode;
let nwaku: ServiceNode;
let nwaku2: ServiceNode;
let messageCollector: MessageCollector;
const customPubsubTopic1 = singleShardInfoToPubsubTopic({
clusterId: 3,
shard: 1
});
const customPubsubTopic2 = singleShardInfoToPubsubTopic({
clusterId: 3,
shard: 2
});
const shardInfo: ShardInfo = { clusterId: 3, shards: [1, 2] };
const singleShardInfo1: SingleShardInfo = { clusterId: 3, shard: 1 };
const singleShardInfo2: SingleShardInfo = { clusterId: 3, shard: 2 };
const customContentTopic1 = "/test/2/waku-filter";
const customContentTopic2 = "/test/3/waku-filter";
const customEncoder1 = createEncoder({
pubsubTopicShardInfo: singleShardInfo1,
contentTopic: customContentTopic1
});
const customDecoder1 = createDecoder(customContentTopic1, singleShardInfo1);
const customEncoder2 = createEncoder({
pubsubTopicShardInfo: singleShardInfo2,
contentTopic: customContentTopic2
});
const customDecoder2 = createDecoder(customContentTopic2, singleShardInfo2);
beforeEachCustom(this, async () => {
[nwaku, waku] = await runNodes(this.ctx, shardInfo);
messageCollector = new MessageCollector();
});
afterEachCustom(this, async () => {
await tearDownNodes([nwaku, nwaku2], waku);
});
it("Subscribe and receive messages on custom pubsubtopic", async function () {
await waku.filter.subscribe([customDecoder1], messageCollector.callback);
await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") });
expect(await messageCollector.waitForMessages(1)).to.eq(true);
messageCollector.verifyReceivedMessage(0, {
expectedContentTopic: customContentTopic1,
expectedPubsubTopic: customPubsubTopic1,
expectedMessageText: "M1"
});
});
it("Subscribe and receive messages on 2 different pubsubtopics", async function () {
await waku.filter.subscribe([customDecoder1], messageCollector.callback);
const messageCollector2 = new MessageCollector();
await waku.filter.subscribe([customDecoder2], messageCollector2.callback);
await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") });
await waku.lightPush.send(customEncoder2, { payload: utf8ToBytes("M2") });
expect(await messageCollector.waitForMessages(1)).to.eq(true);
expect(await messageCollector2.waitForMessages(1)).to.eq(true);
messageCollector.verifyReceivedMessage(0, {
expectedContentTopic: customContentTopic1,
expectedPubsubTopic: customPubsubTopic1,
expectedMessageText: "M1"
});
messageCollector2.verifyReceivedMessage(0, {
expectedContentTopic: customContentTopic2,
expectedPubsubTopic: customPubsubTopic2,
expectedMessageText: "M2"
});
});
it("Subscribe and receive messages from 2 nwaku nodes each with different pubsubtopics", async function () {
await waku.filter.subscribe([customDecoder1], messageCollector.callback);
// Set up and start a new nwaku node with customPubsubTopic1
nwaku2 = new ServiceNode(makeLogFileName(this) + "2");
await nwaku2.start({
filter: true,
lightpush: true,
relay: true,
pubsubTopic: [customPubsubTopic2],
clusterId: 3
});
await waku.dial(await nwaku2.getMultiaddrWithId());
await waku.waitForPeers([Protocols.Filter, Protocols.LightPush]);
await nwaku2.ensureSubscriptions([customPubsubTopic2]);
const messageCollector2 = new MessageCollector();
await waku.filter.subscribe([customDecoder2], messageCollector2.callback);
// Making sure that messages are send and reveiced for both subscriptions
// While loop is done because of https://github.com/waku-org/js-waku/issues/1606
while (
!(await messageCollector.waitForMessages(1, {
pubsubTopic: customPubsubTopic1
})) ||
!(await messageCollector2.waitForMessages(1, {
pubsubTopic: customPubsubTopic2
}))
) {
await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") });
await waku.lightPush.send(customEncoder2, { payload: utf8ToBytes("M2") });
}
messageCollector.verifyReceivedMessage(0, {
expectedContentTopic: customContentTopic1,
expectedPubsubTopic: customPubsubTopic1,
expectedMessageText: "M1"
});
messageCollector2.verifyReceivedMessage(0, {
expectedContentTopic: customContentTopic2,
expectedPubsubTopic: customPubsubTopic2,
expectedMessageText: "M2"
});
});
});
describe("Waku Filter V2 (Autosharding): Multiple PubsubTopics", function () {
// Set the timeout for all tests in this suite. Can be overwritten at test level
this.timeout(30000);
const clusterId = 3;
let waku: LightNode;
let nwaku: ServiceNode;
let nwaku2: ServiceNode;
let messageCollector: MessageCollector;
const customContentTopic1 = "/waku/2/content/utf8";
const customContentTopic2 = "/myapp/1/latest/proto";
const autoshardingPubsubTopic1 = contentTopicToPubsubTopic(
customContentTopic1,
clusterId
);
const autoshardingPubsubTopic2 = contentTopicToPubsubTopic(
customContentTopic2,
clusterId
);
const contentTopicInfo: ContentTopicInfo = {
clusterId: clusterId,
contentTopics: [customContentTopic1, customContentTopic2]
};
const customEncoder1 = createEncoder({
contentTopic: customContentTopic1,
pubsubTopicShardInfo: {
clusterId: clusterId,
shard: contentTopicToShardIndex(customContentTopic1)
}
});
const customDecoder1 = createDecoder(customContentTopic1, {
clusterId: clusterId,
shard: contentTopicToShardIndex(customContentTopic1)
});
const customEncoder2 = createEncoder({
contentTopic: customContentTopic2,
pubsubTopicShardInfo: {
clusterId: clusterId,
shard: contentTopicToShardIndex(customContentTopic2)
}
});
const customDecoder2 = createDecoder(customContentTopic2, {
clusterId: clusterId,
shard: contentTopicToShardIndex(customContentTopic2)
});
beforeEachCustom(this, async () => {
[nwaku, waku] = await runNodes(this.ctx, contentTopicInfo);
messageCollector = new MessageCollector();
});
afterEachCustom(this, async () => {
await tearDownNodes([nwaku, nwaku2], waku);
});
it("Subscribe and receive messages on autosharded pubsubtopic", async function () {
await waku.filter.subscribe([customDecoder1], messageCollector.callback);
await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") });
expect(
await messageCollector.waitForMessagesAutosharding(1, {
contentTopic: customContentTopic1
})
).to.eq(true);
messageCollector.verifyReceivedMessage(0, {
expectedContentTopic: customContentTopic1,
expectedPubsubTopic: autoshardingPubsubTopic1,
expectedMessageText: "M1"
});
});
it("Subscribe and receive messages on 2 different pubsubtopics", async function () {
await waku.filter.subscribe([customDecoder1], messageCollector.callback);
const messageCollector2 = new MessageCollector();
await waku.filter.subscribe([customDecoder2], messageCollector2.callback);
await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") });
await waku.lightPush.send(customEncoder2, { payload: utf8ToBytes("M2") });
expect(
await messageCollector.waitForMessagesAutosharding(1, {
contentTopic: customContentTopic1
})
).to.eq(true);
expect(
await messageCollector2.waitForMessagesAutosharding(1, {
contentTopic: customContentTopic2
})
).to.eq(true);
messageCollector.verifyReceivedMessage(0, {
expectedContentTopic: customContentTopic1,
expectedPubsubTopic: autoshardingPubsubTopic1,
expectedMessageText: "M1"
});
messageCollector2.verifyReceivedMessage(0, {
expectedContentTopic: customContentTopic2,
expectedPubsubTopic: autoshardingPubsubTopic2,
expectedMessageText: "M2"
});
});
it("Subscribe and receive messages from 2 nwaku nodes each with different pubsubtopics", async function () {
await waku.filter.subscribe([customDecoder1], messageCollector.callback);
// Set up and start a new nwaku node with customPubsubTopic1
nwaku2 = new ServiceNode(makeLogFileName(this) + "2");
await nwaku2.start({
filter: true,
lightpush: true,
relay: true,
pubsubTopic: [autoshardingPubsubTopic2],
clusterId: clusterId,
contentTopic: [customContentTopic2]
});
await waku.dial(await nwaku2.getMultiaddrWithId());
await waku.waitForPeers([Protocols.Filter, Protocols.LightPush]);
await nwaku2.ensureSubscriptionsAutosharding([customContentTopic2]);
const messageCollector2 = new MessageCollector();
await waku.filter.subscribe([customDecoder2], messageCollector2.callback);
// Making sure that messages are send and reveiced for both subscriptions
// While loop is done because of https://github.com/waku-org/js-waku/issues/1606
while (
!(await messageCollector.waitForMessagesAutosharding(1, {
contentTopic: customContentTopic1
})) ||
!(await messageCollector2.waitForMessagesAutosharding(1, {
contentTopic: customContentTopic2
}))
) {
await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") });
await waku.lightPush.send(customEncoder2, { payload: utf8ToBytes("M2") });
}
messageCollector.verifyReceivedMessage(0, {
expectedContentTopic: customContentTopic1,
expectedPubsubTopic: autoshardingPubsubTopic1,
expectedMessageText: "M1"
});
messageCollector2.verifyReceivedMessage(0, {
expectedContentTopic: customContentTopic2,
expectedPubsubTopic: autoshardingPubsubTopic2,
expectedMessageText: "M2"
});
});
it("Should fail to subscribe with decoder with wrong pubsubTopic", async function () {
// this subscription object is set up with the `customPubsubTopic1` but we're passing it a Decoder with the `customPubsubTopic2`
try {
await waku.filter.subscribe([customDecoder2], messageCollector.callback);
} catch (error) {
expect((error as Error).message).to.include(
"Pubsub topic not configured"
);
}
});
});
describe("Waku Filter V2 (Named sharding): Multiple PubsubTopics", function () {
// Set the timeout for all tests in this suite. Can be overwritten at test level
this.timeout(30000);
let waku: LightNode;
let nwaku: ServiceNode;
let nwaku2: ServiceNode;
let messageCollector: MessageCollector;
const customPubsubTopic1 = singleShardInfoToPubsubTopic({
clusterId: 3,
shard: 1
});
const customPubsubTopic2 = singleShardInfoToPubsubTopic({
clusterId: 3,
shard: 2
});
const shardInfo = {
clusterId: 3,
shards: [1, 2]
};
const customContentTopic1 = "/test/2/waku-filter";
const customContentTopic2 = "/test/3/waku-filter";
const customEncoder1 = createEncoder({
pubsubTopic: customPubsubTopic1,
contentTopic: customContentTopic1
});
const customDecoder1 = createDecoder(customContentTopic1, customPubsubTopic1);
const customEncoder2 = createEncoder({
pubsubTopic: customPubsubTopic2,
contentTopic: customContentTopic2
});
const customDecoder2 = createDecoder(customContentTopic2, customPubsubTopic2);
beforeEachCustom(this, async () => {
[nwaku, waku] = await runNodes(this.ctx, shardInfo);
messageCollector = new MessageCollector();
});
afterEachCustom(this, async () => {
await tearDownNodes([nwaku, nwaku2], waku);
});
it("Subscribe and receive messages on custom pubsubtopic", async function () {
await waku.filter.subscribe([customDecoder1], messageCollector.callback);
await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") });
expect(await messageCollector.waitForMessages(1)).to.eq(true);
messageCollector.verifyReceivedMessage(0, {
expectedContentTopic: customContentTopic1,
expectedPubsubTopic: customPubsubTopic1,
expectedMessageText: "M1"
});
});
it("Subscribe and receive messages on 2 different pubsubtopics", async function () {
await waku.filter.subscribe([customDecoder1], messageCollector.callback);
const messageCollector2 = new MessageCollector();
await waku.filter.subscribe([customDecoder2], messageCollector2.callback);
await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") });
await waku.lightPush.send(customEncoder2, { payload: utf8ToBytes("M2") });
expect(await messageCollector.waitForMessages(1)).to.eq(true);
expect(await messageCollector2.waitForMessages(1)).to.eq(true);
messageCollector.verifyReceivedMessage(0, {
expectedContentTopic: customContentTopic1,
expectedPubsubTopic: customPubsubTopic1,
expectedMessageText: "M1"
});
messageCollector2.verifyReceivedMessage(0, {
expectedContentTopic: customContentTopic2,
expectedPubsubTopic: customPubsubTopic2,
expectedMessageText: "M2"
});
});
it("Subscribe and receive messages from 2 nwaku nodes each with different pubsubtopics", async function () {
await waku.filter.subscribe([customDecoder1], messageCollector.callback);
// Set up and start a new nwaku node with customPubsubTopic1
nwaku2 = new ServiceNode(makeLogFileName(this) + "2");
await nwaku2.start({
filter: true,
lightpush: true,
relay: true,
pubsubTopic: [customPubsubTopic2],
clusterId: 3
});
await waku.dial(await nwaku2.getMultiaddrWithId());
await waku.waitForPeers([Protocols.Filter, Protocols.LightPush]);
await nwaku2.ensureSubscriptions([customPubsubTopic2]);
const messageCollector2 = new MessageCollector();
await waku.filter.subscribe([customDecoder2], messageCollector2.callback);
// Making sure that messages are send and reveiced for both subscriptions
// While loop is done because of https://github.com/waku-org/js-waku/issues/1606
while (
!(await messageCollector.waitForMessages(1, {
pubsubTopic: customPubsubTopic1
})) ||
!(await messageCollector2.waitForMessages(1, {
pubsubTopic: customPubsubTopic2
}))
) {
await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") });
await waku.lightPush.send(customEncoder2, { payload: utf8ToBytes("M2") });
}
messageCollector.verifyReceivedMessage(0, {
expectedContentTopic: customContentTopic1,
expectedPubsubTopic: customPubsubTopic1,
expectedMessageText: "M1"
});
messageCollector2.verifyReceivedMessage(0, {
expectedContentTopic: customContentTopic2,
expectedPubsubTopic: customPubsubTopic2,
expectedMessageText: "M2"
});
});
it("Should fail to subscribe with decoder with wrong pubsubTopic", async function () {
// this subscription object is set up with the `customPubsubTopic1` but we're passing it a Decoder with the `customPubsubTopic2`
try {
await waku.filter.subscribe([customDecoder2], messageCollector.callback);
} catch (error) {
expect((error as Error).message).to.include(
"Pubsub topic not configured"
);
}
});
});

View File

@ -1,129 +0,0 @@
import { ISubscription, LightNode } from "@waku/interfaces";
import { utf8ToBytes } from "@waku/sdk";
import { expect } from "chai";
import {
afterEachCustom,
beforeEachCustom,
MessageCollector,
ServiceNode,
tearDownNodes
} from "../../../src/index.js";
import {
TestContentTopic,
TestDecoder,
TestEncoder,
TestShardInfo,
validatePingError
} from "../utils.js";
import { runNodes } from "./utils.js";
describe("Waku Filter V2: Ping", function () {
// Set the timeout for all tests in this suite. Can be overwritten at test level
this.timeout(10000);
let waku: LightNode;
let nwaku: ServiceNode;
let messageCollector: MessageCollector;
beforeEachCustom(this, async () => {
[nwaku, waku] = await runNodes(this.ctx, TestShardInfo);
messageCollector = new MessageCollector();
});
afterEachCustom(this, async () => {
await tearDownNodes(nwaku, waku);
});
it("Ping on subscribed peer", async function () {
const { subscription, error } = await waku.filter.subscribe(
[TestDecoder],
messageCollector.callback
);
if (error) {
throw error;
}
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
expect(await messageCollector.waitForMessages(1)).to.eq(true);
// If ping is successfull(node has active subscription) we receive a success status code.
await subscription.ping();
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") });
// Confirm new messages are received after a ping.
expect(await messageCollector.waitForMessages(2)).to.eq(true);
});
it("Ping on peer without subscriptions", async function () {
const { subscription, error } = await waku.filter.subscribe(
[TestDecoder],
messageCollector.callback
);
if (error) {
throw error;
}
await subscription.unsubscribe([TestContentTopic]);
await validatePingError(subscription);
});
it("Ping on unsubscribed peer", async function () {
const { error, subscription } = await waku.filter.subscribe(
[TestDecoder],
messageCollector.callback
);
if (error) {
throw error;
}
await subscription.ping();
await subscription.unsubscribe([TestContentTopic]);
// Ping imediately after unsubscribe
await validatePingError(subscription);
});
it("Reopen subscription with peer with lost subscription", async function () {
let subscription: ISubscription;
const openSubscription = async (): Promise<void> => {
const result = await waku.filter.subscribe(
[TestDecoder],
messageCollector.callback
);
if (result.error) {
throw result.error;
}
subscription = result.subscription;
};
const unsubscribe = async (): Promise<void> => {
await subscription.unsubscribe([TestContentTopic]);
};
const pingAndReinitiateSubscription = async (): Promise<void> => {
try {
await subscription.ping();
} catch (error) {
if (
error instanceof Error &&
error.message.includes("peer has no subscriptions")
) {
await openSubscription();
} else {
throw error;
}
}
};
// open subscription & ping -> should pass
await openSubscription();
await pingAndReinitiateSubscription();
// unsubscribe & ping -> should fail and reinitiate subscription
await unsubscribe();
await pingAndReinitiateSubscription();
// ping -> should pass as subscription is reinitiated
await pingAndReinitiateSubscription();
});
});

View File

@ -1,255 +0,0 @@
import { LightNode, Protocols } from "@waku/interfaces";
import { utf8ToBytes } from "@waku/sdk";
import { expect } from "chai";
import {
afterEachCustom,
beforeEachCustom,
delay,
MessageCollector,
ServiceNode,
tearDownNodes,
TEST_STRING,
TEST_TIMESTAMPS
} from "../../../src/index.js";
import { runNodes } from "../../light-push/utils.js";
import {
messageText,
TestContentTopic,
TestDecoder,
TestEncoder,
TestPubsubTopic,
TestShardInfo
} from "../utils.js";
describe("Waku Filter V2: FilterPush", function () {
// Set the timeout for all tests in this suite. Can be overwritten at test level
this.timeout(10000);
let waku: LightNode;
let nwaku: ServiceNode;
let messageCollector: MessageCollector;
beforeEachCustom(this, async () => {
[nwaku, waku] = await runNodes(this.ctx, TestShardInfo);
messageCollector = new MessageCollector(nwaku);
});
afterEachCustom(this, async () => {
await tearDownNodes(nwaku, waku);
});
TEST_STRING.forEach((testItem) => {
it(`Check received message containing ${testItem.description}`, async function () {
await waku.filter.subscribe([TestDecoder], messageCollector.callback);
await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes(testItem.value)
});
expect(await messageCollector.waitForMessages(1)).to.eq(true);
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: testItem.value,
expectedContentTopic: TestContentTopic
});
});
});
TEST_TIMESTAMPS.forEach((testItem) => {
it(`Check received message with timestamp: ${testItem} `, async function () {
await waku.filter.subscribe([TestDecoder], messageCollector.callback);
await delay(400);
await nwaku.restCall<boolean>(
`/relay/v1/messages/${encodeURIComponent(TestPubsubTopic)}`,
"POST",
{
contentTopic: TestContentTopic,
payload: Buffer.from(utf8ToBytes(messageText)).toString("base64"),
timestamp: testItem,
version: 0
},
async (res) => res.status === 200
);
expect(await messageCollector.waitForMessages(1)).to.eq(true);
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
checkTimestamp: false,
expectedContentTopic: TestContentTopic
});
// Check if the timestamp matches
const timestamp = messageCollector.getMessage(0).timestamp;
if (testItem == undefined) {
expect(timestamp).to.eq(undefined);
}
if (timestamp !== undefined && timestamp instanceof Date) {
expect(testItem?.toString()).to.contain(timestamp.getTime().toString());
}
});
});
it("Check message with invalid timestamp is not received", async function () {
await waku.filter.subscribe([TestDecoder], messageCollector.callback);
await delay(400);
await nwaku.restCall<boolean>(
`/relay/v1/messages/${encodeURIComponent(TestPubsubTopic)}`,
"POST",
{
contentTopic: TestContentTopic,
payload: Buffer.from(utf8ToBytes(messageText)).toString("base64"),
timestamp: "2023-09-06T12:05:38.609Z"
},
async (res) => res.status === 200
);
// Verify that no message was received
expect(await messageCollector.waitForMessages(1)).to.eq(false);
});
it("Check message on other pubsub topic is not received", async function () {
await waku.filter.subscribe([TestDecoder], messageCollector.callback);
await delay(400);
await nwaku.restCall<boolean>(
`/relay/v1/messages/${encodeURIComponent("/othertopic")}`,
"POST",
{
contentTopic: TestContentTopic,
payload: Buffer.from(utf8ToBytes(messageText)).toString("base64"),
timestamp: BigInt(Date.now()) * BigInt(1000000)
},
async (res) => res.status === 200
);
expect(await messageCollector.waitForMessages(1)).to.eq(false);
});
it("Check message with no pubsub topic is not received", async function () {
await waku.filter.subscribe([TestDecoder], messageCollector.callback);
await delay(400);
await nwaku.restCall<boolean>(
`/relay/v1/messages/`,
"POST",
{
contentTopic: TestContentTopic,
payload: Buffer.from(utf8ToBytes(messageText)).toString("base64"),
timestamp: BigInt(Date.now()) * BigInt(1000000)
},
async (res) => res.status === 200
);
expect(await messageCollector.waitForMessages(1)).to.eq(false);
});
it("Check message with no content topic is not received", async function () {
await waku.filter.subscribe([TestDecoder], messageCollector.callback);
await delay(400);
await nwaku.restCall<boolean>(
`/relay/v1/messages/${encodeURIComponent(TestPubsubTopic)}`,
"POST",
{
payload: Buffer.from(utf8ToBytes(messageText)).toString("base64"),
timestamp: BigInt(Date.now()) * BigInt(1000000)
},
async (res) => res.status === 200
);
expect(await messageCollector.waitForMessages(1)).to.eq(false);
});
it("Check message with no payload is not received", async function () {
await waku.filter.subscribe([TestDecoder], messageCollector.callback);
await delay(400);
await nwaku.restCall<boolean>(
`/relay/v1/messages/${encodeURIComponent(TestPubsubTopic)}`,
"POST",
{
contentTopic: TestContentTopic,
timestamp: BigInt(Date.now()) * BigInt(1000000)
},
async (res) => res.status === 200
);
expect(await messageCollector.waitForMessages(1)).to.eq(false);
});
it("Check message with non string payload is not received", async function () {
await waku.filter.subscribe([TestDecoder], messageCollector.callback);
await delay(400);
await nwaku.restCall<boolean>(
`/relay/v1/messages/${encodeURIComponent(TestPubsubTopic)}`,
"POST",
{
contentTopic: TestContentTopic,
payload: 12345,
timestamp: BigInt(Date.now()) * BigInt(1000000)
},
async (res) => res.status === 200
);
expect(await messageCollector.waitForMessages(1)).to.eq(false);
});
// Will be skipped until https://github.com/waku-org/js-waku/issues/1464 si done
it.skip("Check message received after jswaku node is restarted", async function () {
// Subscribe and send message
await waku.filter.subscribe([TestDecoder], messageCollector.callback);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
expect(await messageCollector.waitForMessages(1)).to.eq(true);
// Restart js-waku node
await waku.stop();
expect(waku.isStarted()).to.eq(false);
await waku.start();
expect(waku.isStarted()).to.eq(true);
// Redo the connection and create a new subscription
await waku.dial(await nwaku.getMultiaddrWithId());
await waku.waitForPeers([Protocols.Filter, Protocols.LightPush]);
await waku.filter.subscribe([TestDecoder], messageCollector.callback);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") });
// Confirm both messages were received.
expect(await messageCollector.waitForMessages(2)).to.eq(true);
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: "M1",
expectedContentTopic: TestContentTopic
});
messageCollector.verifyReceivedMessage(1, {
expectedMessageText: "M2",
expectedContentTopic: TestContentTopic
});
});
// Will be skipped until https://github.com/waku-org/js-waku/issues/1464 si done
it.skip("Check message received after nwaku node is restarted", async function () {
await waku.filter.subscribe([TestDecoder], messageCollector.callback);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
expect(await messageCollector.waitForMessages(1)).to.eq(true);
// Restart nwaku node
await tearDownNodes(nwaku, []);
await nwaku.start();
await waku.waitForPeers([Protocols.Filter, Protocols.LightPush]);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") });
// Confirm both messages were received.
expect(await messageCollector.waitForMessages(2)).to.eq(true);
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: "M1",
expectedContentTopic: TestContentTopic
});
messageCollector.verifyReceivedMessage(1, {
expectedMessageText: "M2",
expectedContentTopic: TestContentTopic
});
});
});

View File

@ -1,470 +0,0 @@
import { createDecoder, createEncoder } from "@waku/core";
import { LightNode, Protocols } from "@waku/interfaces";
import {
ecies,
generatePrivateKey,
generateSymmetricKey,
getPublicKey,
symmetric
} from "@waku/message-encryption";
import { utf8ToBytes } from "@waku/sdk";
import { expect } from "chai";
import type { Context } from "mocha";
import {
afterEachCustom,
beforeEachCustom,
delay,
generateTestData,
MessageCollector,
ServiceNode,
tearDownNodes,
TEST_STRING
} from "../../../src/index.js";
import {
messagePayload,
messageText,
TestContentTopic,
TestDecoder,
TestEncoder,
TestPubsubTopic,
TestShardInfo
} from "../utils.js";
import { runNodes } from "./utils.js";
describe("Waku Filter V2: Subscribe: Single Service Node", function () {
// Set the timeout for all tests in this suite. Can be overwritten at test level
this.timeout(10000);
let waku: LightNode;
let waku2: LightNode;
let nwaku: ServiceNode;
let nwaku2: ServiceNode;
let messageCollector: MessageCollector;
let ctx: Context;
beforeEachCustom(this, async () => {
[nwaku, waku] = await runNodes(this.ctx, TestShardInfo);
messageCollector = new MessageCollector();
await nwaku.ensureSubscriptions([TestPubsubTopic]);
});
afterEachCustom(this, async () => {
await tearDownNodes([nwaku, nwaku2], [waku, waku2]);
});
it("Subscribe and receive messages via lightPush", async function () {
const { error } = await waku.filter.subscribe(
[TestDecoder],
messageCollector.callback
);
if (error) {
throw error;
}
await waku.lightPush.send(TestEncoder, messagePayload);
expect(await messageCollector.waitForMessages(1)).to.eq(true);
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: TestContentTopic,
expectedPubsubTopic: TestPubsubTopic
});
expect((await nwaku.messages()).length).to.eq(1);
});
it("Subscribe and receive ecies encrypted messages via lightPush", async function () {
const privateKey = generatePrivateKey();
const publicKey = getPublicKey(privateKey);
const encoder = ecies.createEncoder({
contentTopic: TestContentTopic,
publicKey,
pubsubTopic: TestPubsubTopic
});
const decoder = ecies.createDecoder(
TestContentTopic,
privateKey,
TestPubsubTopic
);
const { error } = await waku.filter.subscribe(
[decoder],
messageCollector.callback
);
if (error) {
throw error;
}
await waku.lightPush.send(encoder, messagePayload);
expect(await messageCollector.waitForMessages(1)).to.eq(true);
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: TestContentTopic,
expectedVersion: 1,
expectedPubsubTopic: TestPubsubTopic
});
expect((await nwaku.messages()).length).to.eq(1);
});
it("Subscribe and receive symmetrically encrypted messages via lightPush", async function () {
const symKey = generateSymmetricKey();
const encoder = symmetric.createEncoder({
contentTopic: TestContentTopic,
symKey,
pubsubTopic: TestPubsubTopic
});
const decoder = symmetric.createDecoder(
TestContentTopic,
symKey,
TestPubsubTopic
);
const { error } = await waku.filter.subscribe(
[decoder],
messageCollector.callback
);
if (error) {
throw error;
}
await waku.lightPush.send(encoder, messagePayload);
expect(await messageCollector.waitForMessages(1)).to.eq(true);
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: TestContentTopic,
expectedVersion: 1,
expectedPubsubTopic: TestPubsubTopic
});
expect((await nwaku.messages()).length).to.eq(1);
});
it("Subscribe and receive messages via waku relay post", async function () {
const { error } = await waku.filter.subscribe(
[TestDecoder],
messageCollector.callback
);
if (error) {
throw error;
}
await delay(400);
// Send a test message using the relay post method.
await nwaku.sendMessage(
ServiceNode.toMessageRpcQuery({
contentTopic: TestContentTopic,
payload: utf8ToBytes(messageText)
})
);
expect(await messageCollector.waitForMessages(1)).to.eq(true);
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: TestContentTopic,
expectedPubsubTopic: TestPubsubTopic
});
expect((await nwaku.messages()).length).to.eq(1);
});
it("Subscribe and receive 2 messages on the same topic", async function () {
await waku.filter.subscribe([TestDecoder], messageCollector.callback);
await waku.lightPush.send(TestEncoder, messagePayload);
expect(await messageCollector.waitForMessages(1)).to.eq(true);
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: TestContentTopic,
expectedPubsubTopic: TestPubsubTopic
});
// Send another message on the same topic.
const newMessageText = "Filtering still works!";
await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes(newMessageText)
});
// Verify that the second message was successfully received.
expect(await messageCollector.waitForMessages(2)).to.eq(true);
messageCollector.verifyReceivedMessage(1, {
expectedMessageText: newMessageText,
expectedContentTopic: TestContentTopic,
expectedPubsubTopic: TestPubsubTopic
});
expect((await nwaku.messages()).length).to.eq(2);
});
it("Subscribe and receive messages on 2 different content topics", async function () {
// Subscribe to the first content topic and send a message.
const { error, subscription } = await waku.filter.subscribe(
[TestDecoder],
messageCollector.callback
);
if (error) {
throw error;
}
await waku.lightPush.send(TestEncoder, messagePayload);
expect(await messageCollector.waitForMessages(1)).to.eq(true);
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: TestContentTopic,
expectedPubsubTopic: TestPubsubTopic
});
// Modify subscription to include a new content topic and send a message.
const newMessageText = "Filtering still works!";
const newMessagePayload = { payload: utf8ToBytes(newMessageText) };
const newContentTopic = "/test/2/waku-filter/default";
const newEncoder = createEncoder({
contentTopic: newContentTopic,
pubsubTopic: TestPubsubTopic
});
const newDecoder = createDecoder(newContentTopic, TestPubsubTopic);
await subscription.subscribe([newDecoder], messageCollector.callback);
await waku.lightPush.send(newEncoder, {
payload: utf8ToBytes(newMessageText)
});
expect(await messageCollector.waitForMessages(2)).to.eq(true);
messageCollector.verifyReceivedMessage(1, {
expectedContentTopic: newContentTopic,
expectedMessageText: newMessageText,
expectedPubsubTopic: TestPubsubTopic
});
// Send another message on the initial content topic to verify it still works.
await waku.lightPush.send(TestEncoder, newMessagePayload);
expect(await messageCollector.waitForMessages(3)).to.eq(true);
messageCollector.verifyReceivedMessage(2, {
expectedMessageText: newMessageText,
expectedContentTopic: TestContentTopic,
expectedPubsubTopic: TestPubsubTopic
});
expect((await nwaku.messages()).length).to.eq(3);
});
it("Subscribe and receives messages on 20 topics", async function () {
const topicCount = 20;
const td = generateTestData(topicCount, { pubsubTopic: TestPubsubTopic });
// Subscribe to all 20 topics.
for (let i = 0; i < topicCount; i++) {
await waku.filter.subscribe([td.decoders[i]], messageCollector.callback);
}
// Send a unique message on each topic.
for (let i = 0; i < topicCount; i++) {
await waku.lightPush.send(td.encoders[i], {
payload: utf8ToBytes(`Message for Topic ${i + 1}`)
});
}
// Verify that each message was received on the corresponding topic.
expect(await messageCollector.waitForMessages(20)).to.eq(true);
td.contentTopics.forEach((topic, index) => {
messageCollector.verifyReceivedMessage(index, {
expectedContentTopic: topic,
expectedMessageText: `Message for Topic ${index + 1}`,
expectedPubsubTopic: TestPubsubTopic
});
});
});
it("Subscribe to 100 topics (new limit) at once and receives messages", async function () {
this.timeout(100_000);
const topicCount = 100;
const td = generateTestData(topicCount, { pubsubTopic: TestPubsubTopic });
await waku.filter.subscribe(td.decoders, messageCollector.callback);
// Send a unique message on each topic.
for (let i = 0; i < topicCount; i++) {
await waku.lightPush.send(td.encoders[i], {
payload: utf8ToBytes(`Message for Topic ${i + 1}`)
});
}
// Verify that each message was received on the corresponding topic.
expect(await messageCollector.waitForMessages(topicCount)).to.eq(true);
td.contentTopics.forEach((topic, index) => {
messageCollector.verifyReceivedMessage(index, {
expectedContentTopic: topic,
expectedMessageText: `Message for Topic ${index + 1}`,
expectedPubsubTopic: TestPubsubTopic,
checkTimestamp: false
});
});
});
it("Error when try to subscribe to more than 101 topics (new limit)", async function () {
const topicCount = 101;
const td = generateTestData(topicCount, { pubsubTopic: TestPubsubTopic });
try {
const { error, results } = await waku.filter.subscribe(
td.decoders,
messageCollector.callback
);
if (error) {
throw error;
}
const { failures, successes } = results;
if (failures.length === 0 || successes.length > 0) {
throw new Error(
`Subscribe to ${topicCount} topics was successful but was expected to fail with a specific error.`
);
}
} catch (err) {
if (
err instanceof Error &&
err.message.includes(
`exceeds maximum content topics: ${topicCount - 1}`
)
) {
return;
} else {
throw err;
}
}
});
it("Overlapping topic subscription", async function () {
// Define two sets of test data with overlapping topics.
const topicCount1 = 2;
const td1 = generateTestData(topicCount1, { pubsubTopic: TestPubsubTopic });
const topicCount2 = 4;
const td2 = generateTestData(topicCount2, { pubsubTopic: TestPubsubTopic });
// Subscribe to the first set of topics.
await waku.filter.subscribe(td1.decoders, messageCollector.callback);
// Subscribe to the second set of topics which has overlapping topics with the first set.
await waku.filter.subscribe(td2.decoders, messageCollector.callback);
// Send messages to the first set of topics.
for (let i = 0; i < topicCount1; i++) {
const messageText = `Topic Set 1: Message Number: ${i + 1}`;
await waku.lightPush.send(td1.encoders[i], {
payload: utf8ToBytes(messageText)
});
}
// Send messages to the second set of topics.
for (let i = 0; i < topicCount2; i++) {
const messageText = `Topic Set 2: Message Number: ${i + 1}`;
await waku.lightPush.send(td2.encoders[i], {
payload: utf8ToBytes(messageText)
});
}
// Check if all messages were received.
// Since there are overlapping topics, there should be 6 messages in total (2 from the first set + 4 from the second set).
expect(await messageCollector.waitForMessages(6, { exact: true })).to.eq(
true
);
});
it("Refresh subscription", async function () {
await waku.filter.subscribe([TestDecoder], messageCollector.callback);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
// Resubscribe (refresh) to the same topic and send another message.
await waku.filter.subscribe([TestDecoder], messageCollector.callback);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") });
// Confirm both messages were received.
expect(await messageCollector.waitForMessages(2, { exact: true })).to.eq(
true
);
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: "M1",
expectedContentTopic: TestContentTopic,
expectedPubsubTopic: TestPubsubTopic
});
messageCollector.verifyReceivedMessage(1, {
expectedMessageText: "M2",
expectedContentTopic: TestContentTopic,
expectedPubsubTopic: TestPubsubTopic
});
});
TEST_STRING.forEach((testItem) => {
it(`Subscribe to topic containing ${testItem.description} and receive message`, async function () {
const newContentTopic = testItem.value;
const newEncoder = createEncoder({
contentTopic: newContentTopic,
pubsubTopic: TestPubsubTopic
});
const newDecoder = createDecoder(newContentTopic, TestPubsubTopic);
await waku.filter.subscribe([newDecoder], messageCollector.callback);
await waku.lightPush.send(newEncoder, messagePayload);
expect(await messageCollector.waitForMessages(1)).to.eq(true);
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: newContentTopic,
expectedPubsubTopic: TestPubsubTopic
});
});
});
it("Add multiple subscription objects on single nwaku node", async function () {
await waku.filter.subscribe([TestDecoder], messageCollector.callback);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
const newContentTopic = "/test/2/waku-filter/default";
const newEncoder = createEncoder({
contentTopic: newContentTopic,
pubsubTopic: TestPubsubTopic
});
const newDecoder = createDecoder(newContentTopic, TestPubsubTopic);
await waku.filter.subscribe([newDecoder], messageCollector.callback);
await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M2") });
// Check if both messages were received
expect(await messageCollector.waitForMessages(2)).to.eq(true);
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: "M1",
expectedContentTopic: TestContentTopic,
expectedPubsubTopic: TestPubsubTopic
});
messageCollector.verifyReceivedMessage(1, {
expectedContentTopic: newContentTopic,
expectedMessageText: "M2",
expectedPubsubTopic: TestPubsubTopic
});
});
it("Subscribe and receive messages from multiple nwaku nodes", async function () {
await waku.filter.subscribe([TestDecoder], messageCollector.callback);
// Set up and start a new nwaku node
[nwaku2, waku2] = await runNodes(ctx, TestShardInfo);
await waku.dial(await nwaku2.getMultiaddrWithId());
await waku.waitForPeers([Protocols.Filter, Protocols.LightPush]);
await nwaku2.ensureSubscriptions([TestPubsubTopic]);
// Send a message using the new subscription
const newContentTopic = "/test/2/waku-filter/default";
const newEncoder = createEncoder({
contentTopic: newContentTopic,
pubsubTopic: TestPubsubTopic
});
const newDecoder = createDecoder(newContentTopic, TestPubsubTopic);
await waku.filter.subscribe([newDecoder], messageCollector.callback);
// Making sure that messages are send and reveiced for both subscriptions
while (!(await messageCollector.waitForMessages(2))) {
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M2") });
}
// Check if both messages were received
expect(messageCollector.hasMessage(TestContentTopic, "M1")).to.eq(true);
expect(messageCollector.hasMessage(newContentTopic, "M2")).to.eq(true);
});
});

View File

@ -1,209 +0,0 @@
import { createDecoder, createEncoder } from "@waku/core";
import { LightNode } from "@waku/interfaces";
import { utf8ToBytes } from "@waku/sdk";
import { expect } from "chai";
import {
afterEachCustom,
beforeEachCustom,
generateTestData,
MessageCollector,
ServiceNode,
tearDownNodes
} from "../../../src/index.js";
import { runNodes } from "../../light-push/utils.js";
import {
messagePayload,
messageText,
TestContentTopic,
TestDecoder,
TestEncoder,
TestPubsubTopic,
TestShardInfo
} from "../utils.js";
describe("Waku Filter V2: Unsubscribe", function () {
// Set the timeout for all tests in this suite. Can be overwritten at test level
this.timeout(10000);
let waku: LightNode;
let nwaku: ServiceNode;
let messageCollector: MessageCollector;
beforeEachCustom(this, async () => {
[nwaku, waku] = await runNodes(this.ctx, TestShardInfo);
messageCollector = new MessageCollector();
await nwaku.ensureSubscriptions([TestPubsubTopic]);
});
afterEachCustom(this, async () => {
await tearDownNodes(nwaku, waku);
});
it("Unsubscribe 1 topic - node subscribed to 1 topic", async function () {
const { subscription, error } = await waku.filter.subscribe(
[TestDecoder],
messageCollector.callback
);
if (error) {
throw error;
}
await waku.lightPush.send(TestEncoder, messagePayload);
expect(await messageCollector.waitForMessages(1)).to.eq(true);
// Unsubscribe from the topic and send again
await subscription.unsubscribe([TestContentTopic]);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
expect(await messageCollector.waitForMessages(2)).to.eq(false);
// Check that from 2 messages send only the 1st was received
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: TestContentTopic,
expectedPubsubTopic: TestPubsubTopic
});
expect(messageCollector.count).to.eq(1);
expect((await nwaku.messages()).length).to.eq(2);
});
it("Unsubscribe 1 topic - node subscribed to 2 topics", async function () {
// Subscribe to 2 topics and send messages
const { error, subscription } = await waku.filter.subscribe(
[TestDecoder],
messageCollector.callback
);
if (error) {
throw error;
}
const newContentTopic = "/test/2/waku-filter";
const newEncoder = createEncoder({
contentTopic: newContentTopic,
pubsubTopic: TestPubsubTopic
});
const newDecoder = createDecoder(newContentTopic, TestPubsubTopic);
await subscription.subscribe([newDecoder], messageCollector.callback);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M2") });
expect(await messageCollector.waitForMessages(2)).to.eq(true);
// Unsubscribe from the first topic and send again
await subscription.unsubscribe([TestContentTopic]);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M3") });
await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M4") });
expect(await messageCollector.waitForMessages(3)).to.eq(true);
// Check that from 4 messages send 3 were received
expect(messageCollector.count).to.eq(3);
expect((await nwaku.messages()).length).to.eq(4);
});
it("Unsubscribe 2 topics - node subscribed to 2 topics", async function () {
// Subscribe to 2 topics and send messages
const { error, subscription } = await waku.filter.subscribe(
[TestDecoder],
messageCollector.callback
);
if (error) {
throw error;
}
const newContentTopic = "/test/2/waku-filter/default";
const newEncoder = createEncoder({
contentTopic: newContentTopic,
pubsubTopic: TestPubsubTopic
});
const newDecoder = createDecoder(newContentTopic, TestPubsubTopic);
await subscription.subscribe([newDecoder], messageCollector.callback);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M2") });
expect(await messageCollector.waitForMessages(2)).to.eq(true);
// Unsubscribe from both and send again
await subscription.unsubscribe([TestContentTopic, newContentTopic]);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M3") });
await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M4") });
expect(await messageCollector.waitForMessages(3)).to.eq(false);
// Check that from 4 messages send 2 were received
expect(messageCollector.count).to.eq(2);
expect((await nwaku.messages()).length).to.eq(4);
});
it("Unsubscribe topics the node is not subscribed to", async function () {
// Subscribe to 1 topic and send message
const { error, subscription } = await waku.filter.subscribe(
[TestDecoder],
messageCollector.callback
);
if (error) {
throw error;
}
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
expect(await messageCollector.waitForMessages(1)).to.eq(true);
expect(messageCollector.count).to.eq(1);
// Unsubscribe from topics that the node is not not subscribed to and send again
await subscription.unsubscribe([]);
await subscription.unsubscribe(["/test/2/waku-filter/default"]);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") });
expect(await messageCollector.waitForMessages(2)).to.eq(true);
// Check that both messages were received
expect(messageCollector.count).to.eq(2);
expect((await nwaku.messages()).length).to.eq(2);
});
it("Unsubscribes all - node subscribed to 1 topic", async function () {
const { error, subscription } = await waku.filter.subscribe(
[TestDecoder],
messageCollector.callback
);
if (error) {
throw error;
}
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
expect(await messageCollector.waitForMessages(1)).to.eq(true);
expect(messageCollector.count).to.eq(1);
// Unsubscribe from all topics and send again
await subscription.unsubscribeAll();
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") });
expect(await messageCollector.waitForMessages(2)).to.eq(false);
// Check that from 2 messages send only the 1st was received
expect(messageCollector.count).to.eq(1);
expect((await nwaku.messages()).length).to.eq(2);
});
it("Unsubscribes all - node subscribed to 10 topics", async function () {
// Subscribe to 10 topics and send message
const topicCount = 10;
const td = generateTestData(topicCount, { pubsubTopic: TestPubsubTopic });
const { error, subscription } = await waku.filter.subscribe(
td.decoders,
messageCollector.callback
);
if (error) {
throw error;
}
for (let i = 0; i < topicCount; i++) {
await waku.lightPush.send(td.encoders[i], {
payload: utf8ToBytes(`M${i + 1}`)
});
}
expect(await messageCollector.waitForMessages(10)).to.eq(true);
// Unsubscribe from all topics and send again
await subscription.unsubscribeAll();
for (let i = 0; i < topicCount; i++) {
await waku.lightPush.send(td.encoders[i], {
payload: utf8ToBytes(`M${topicCount + i + 1}`)
});
}
expect(await messageCollector.waitForMessages(11)).to.eq(false);
// Check that from 20 messages send only 10 were received
expect(messageCollector.count).to.eq(10);
expect((await nwaku.messages()).length).to.eq(20);
});
});

View File

@ -1,22 +0,0 @@
import { LightNode, NetworkConfig, Protocols } from "@waku/interfaces";
import { createLightNode } from "@waku/sdk";
import { Logger } from "@waku/utils";
import { Context } from "mocha";
import {
runNodes as runNodesBuilder,
ServiceNode
} from "../../../src/index.js";
export const log = new Logger("test:filter:single_node");
export const runNodes = (
context: Context,
shardInfo: NetworkConfig
): Promise<[ServiceNode, LightNode]> =>
runNodesBuilder<LightNode>({
context,
createNode: createLightNode,
protocols: [Protocols.LightPush, Protocols.Filter],
networkConfig: shardInfo
});

View File

@ -7,7 +7,7 @@ import {
getPublicKey, getPublicKey,
symmetric symmetric
} from "@waku/message-encryption"; } from "@waku/message-encryption";
import { utf8ToBytes } from "@waku/sdk"; import { Protocols, utf8ToBytes } from "@waku/sdk";
import { expect } from "chai"; import { expect } from "chai";
import { import {
@ -15,14 +15,19 @@ import {
beforeEachCustom, beforeEachCustom,
delay, delay,
generateTestData, generateTestData,
makeLogFileName,
MessageCollector,
runMultipleNodes, runMultipleNodes,
ServiceNode,
ServiceNodesFleet, ServiceNodesFleet,
tearDownNodes,
teardownNodesWithRedundancy, teardownNodesWithRedundancy,
TEST_STRING, TEST_STRING,
waitForConnections waitForConnections
} from "../../src/index.js"; } from "../../src/index.js";
import { import {
ClusterId,
messagePayload, messagePayload,
messageText, messageText,
TestContentTopic, TestContentTopic,
@ -103,7 +108,7 @@ const runTests = (strictCheckNodes: boolean): void => {
expectedPubsubTopic: TestPubsubTopic expectedPubsubTopic: TestPubsubTopic
}); });
await serviceNodes.confirmMessageLength(1); await serviceNodes.confirmMessageLength(2);
}); });
it("Subscribe and receive symmetrically encrypted messages via lightPush", async function () { it("Subscribe and receive symmetrically encrypted messages via lightPush", async function () {
@ -136,7 +141,7 @@ const runTests = (strictCheckNodes: boolean): void => {
expectedPubsubTopic: TestPubsubTopic expectedPubsubTopic: TestPubsubTopic
}); });
await serviceNodes.confirmMessageLength(1); await serviceNodes.confirmMessageLength(2);
}); });
it("Subscribe and receive messages via waku relay post", async function () { it("Subscribe and receive messages via waku relay post", async function () {
@ -532,6 +537,98 @@ const runTests = (strictCheckNodes: boolean): void => {
expectedContentTopic: TestContentTopic expectedContentTopic: TestContentTopic
}); });
}); });
it("Subscribe and receive messages from 2 nwaku nodes each with different pubsubtopics", async function () {
await waku.filter.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
// Set up and start a new nwaku node with customPubsubTopic1
const nwaku2 = new ServiceNode(makeLogFileName(this) + "3");
try {
const customContentTopic = "/test/4/waku-filter/default";
const customDecoder = createDecoder(customContentTopic, {
clusterId: ClusterId,
shard: 4
});
const customEncoder = createEncoder({
contentTopic: customContentTopic,
pubsubTopicShardInfo: { clusterId: ClusterId, shard: 4 }
});
await nwaku2.start({
filter: true,
lightpush: true,
relay: true,
clusterId: ClusterId,
shard: [4]
});
await waku.dial(await nwaku2.getMultiaddrWithId());
await waku.waitForPeers([Protocols.Filter, Protocols.LightPush]);
await nwaku2.ensureSubscriptions([customDecoder.pubsubTopic]);
const messageCollector2 = new MessageCollector();
await waku.filter.subscribe(
[customDecoder],
messageCollector2.callback
);
// Making sure that messages are send and reveiced for both subscriptions
// While loop is done because of https://github.com/waku-org/js-waku/issues/1606
while (
!(await serviceNodes.messageCollector.waitForMessages(1, {
pubsubTopic: TestDecoder.pubsubTopic
})) ||
!(await messageCollector2.waitForMessages(1, {
pubsubTopic: customDecoder.pubsubTopic
}))
) {
await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes("M1")
});
await waku.lightPush.send(customEncoder, {
payload: utf8ToBytes("M2")
});
}
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedContentTopic: TestDecoder.contentTopic,
expectedPubsubTopic: TestDecoder.pubsubTopic,
expectedMessageText: "M1"
});
messageCollector2.verifyReceivedMessage(0, {
expectedContentTopic: customDecoder.contentTopic,
expectedPubsubTopic: customDecoder.pubsubTopic,
expectedMessageText: "M2"
});
} catch (e) {
await tearDownNodes([nwaku2], []);
}
});
it("Should fail to subscribe with decoder with wrong shard", async function () {
const wrongDecoder = createDecoder(TestDecoder.contentTopic, {
clusterId: ClusterId,
shard: 5
});
// this subscription object is set up with the `customPubsubTopic1` but we're passing it a Decoder with the `customPubsubTopic2`
try {
await waku.filter.subscribe(
[wrongDecoder],
serviceNodes.messageCollector.callback
);
} catch (error) {
expect((error as Error).message).to.include(
`Pubsub topic ${wrongDecoder.pubsubTopic} has not been configured on this instance.`
);
}
});
}); });
}; };

View File

@ -0,0 +1,141 @@
import { createEncoder } from "@waku/core";
import { LightNode, Protocols } from "@waku/interfaces";
import { contentTopicToPubsubTopic } from "@waku/utils";
import { utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai";
import {
afterEachCustom,
beforeEachCustom,
makeLogFileName,
MessageCollector,
runMultipleNodes,
ServiceNode,
ServiceNodesFleet,
tearDownNodes,
teardownNodesWithRedundancy
} from "../../src/index.js";
import { ClusterId, TestEncoder } from "./utils.js";
describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () {
this.timeout(30000);
const numServiceNodes = 2;
let waku: LightNode;
let serviceNodes: ServiceNodesFleet;
const customEncoder2 = createEncoder({
contentTopic: "/test/2/waku-light-push/utf8",
pubsubTopic: contentTopicToPubsubTopic(
"/test/2/waku-light-push/utf8",
ClusterId
)
});
beforeEachCustom(this, async () => {
[serviceNodes, waku] = await runMultipleNodes(
this.ctx,
{
clusterId: ClusterId,
contentTopics: [TestEncoder.contentTopic, customEncoder2.contentTopic]
},
{ lightpush: true, filter: true },
false,
numServiceNodes,
false
);
});
afterEachCustom(this, async () => {
await teardownNodesWithRedundancy(serviceNodes, waku);
});
it("Subscribe and receive messages on 2 different pubsubtopics", async function () {
const pushResponse1 = await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes("M1")
});
const pushResponse2 = await waku.lightPush.send(customEncoder2, {
payload: utf8ToBytes("M2")
});
expect(pushResponse1.successes.length).to.eq(numServiceNodes);
expect(pushResponse2.successes.length).to.eq(numServiceNodes);
const messageCollector1 = new MessageCollector(serviceNodes.nodes[0]);
const messageCollector2 = new MessageCollector(serviceNodes.nodes[1]);
expect(
await messageCollector1.waitForMessages(1, {
pubsubTopic: TestEncoder.pubsubTopic
})
).to.eq(true);
expect(
await messageCollector2.waitForMessages(1, {
pubsubTopic: customEncoder2.pubsubTopic
})
).to.eq(true);
messageCollector1.verifyReceivedMessage(0, {
expectedMessageText: "M1",
expectedContentTopic: TestEncoder.contentTopic,
expectedPubsubTopic: TestEncoder.pubsubTopic
});
messageCollector2.verifyReceivedMessage(0, {
expectedMessageText: "M2",
expectedContentTopic: customEncoder2.contentTopic,
expectedPubsubTopic: customEncoder2.pubsubTopic
});
});
it("Light push messages to 2 nwaku nodes each with different pubsubtopics", async function () {
// Set up and start a new nwaku node with Default PubsubTopic
const nwaku2 = new ServiceNode(makeLogFileName(this) + "3");
try {
await nwaku2.start({
filter: true,
lightpush: true,
relay: true,
clusterId: ClusterId,
shard: [2]
});
await nwaku2.ensureSubscriptionsAutosharding([
customEncoder2.pubsubTopic
]);
await waku.dial(await nwaku2.getMultiaddrWithId());
await waku.waitForPeers([Protocols.LightPush]);
const messageCollector2 = new MessageCollector(nwaku2);
await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes("M1")
});
await waku.lightPush.send(customEncoder2, {
payload: utf8ToBytes("M2")
});
await serviceNodes.messageCollector.waitForMessages(1, {
pubsubTopic: TestEncoder.pubsubTopic
});
await messageCollector2.waitForMessagesAutosharding(1, {
contentTopic: customEncoder2.contentTopic
});
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: "M1",
expectedContentTopic: TestEncoder.contentTopic,
expectedPubsubTopic: TestEncoder.pubsubTopic
});
messageCollector2.verifyReceivedMessage(0, {
expectedMessageText: "M2",
expectedContentTopic: customEncoder2.contentTopic,
expectedPubsubTopic: customEncoder2.pubsubTopic
});
} catch (e) {
await tearDownNodes([nwaku2], []);
}
});
});

View File

@ -1,274 +0,0 @@
import { createEncoder } from "@waku/core";
import { IRateLimitProof, LightNode, ProtocolError } from "@waku/interfaces";
import { utf8ToBytes } from "@waku/sdk";
import { expect } from "chai";
import {
afterEachCustom,
beforeEachCustom,
generateRandomUint8Array,
MessageCollector,
ServiceNode,
tearDownNodes,
TEST_STRING
} from "../../../src/index.js";
import {
messagePayload,
messageText,
runNodes,
TestContentTopic,
TestEncoder,
TestPubsubTopic,
TestShardInfo
} from "../utils.js";
describe("Waku Light Push: Single Node", function () {
// Set the timeout for all tests in this suite. Can be overwritten at test level
this.timeout(15000);
let waku: LightNode;
let nwaku: ServiceNode;
let messageCollector: MessageCollector;
beforeEachCustom(this, async () => {
[nwaku, waku] = await runNodes(this.ctx, TestShardInfo);
messageCollector = new MessageCollector(nwaku);
await nwaku.ensureSubscriptions([TestPubsubTopic]);
});
afterEachCustom(this, async () => {
await tearDownNodes(nwaku, waku);
});
TEST_STRING.forEach((testItem) => {
it(`Push message with ${testItem.description} payload`, async function () {
const pushResponse = await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes(testItem.value)
});
expect(pushResponse.successes.length).to.eq(1);
expect(
await messageCollector.waitForMessages(1, {
pubsubTopic: TestPubsubTopic
})
).to.eq(true);
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: testItem.value,
expectedContentTopic: TestContentTopic,
expectedPubsubTopic: TestPubsubTopic
});
});
});
it("Push 30 different messages", async function () {
const generateMessageText = (index: number): string => `M${index}`;
for (let i = 0; i < 30; i++) {
const pushResponse = await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes(generateMessageText(i))
});
expect(pushResponse.successes.length).to.eq(1);
}
expect(
await messageCollector.waitForMessages(30, {
pubsubTopic: TestPubsubTopic
})
).to.eq(true);
for (let i = 0; i < 30; i++) {
messageCollector.verifyReceivedMessage(i, {
expectedMessageText: generateMessageText(i),
expectedContentTopic: TestContentTopic,
expectedPubsubTopic: TestPubsubTopic
});
}
});
it("Throws when trying to push message with empty payload", async function () {
const pushResponse = await waku.lightPush.send(TestEncoder, {
payload: new Uint8Array()
});
expect(pushResponse.successes.length).to.eq(0);
expect(pushResponse.failures?.map((failure) => failure.error)).to.include(
ProtocolError.EMPTY_PAYLOAD
);
expect(
await messageCollector.waitForMessages(1, {
pubsubTopic: TestPubsubTopic
})
).to.eq(false);
});
TEST_STRING.forEach((testItem) => {
it(`Push message with content topic containing ${testItem.description}`, async function () {
const customEncoder = createEncoder({
contentTopic: testItem.value,
pubsubTopic: TestPubsubTopic
});
const pushResponse = await waku.lightPush.send(
customEncoder,
messagePayload
);
expect(pushResponse.successes.length).to.eq(1);
expect(
await messageCollector.waitForMessages(1, {
pubsubTopic: TestPubsubTopic
})
).to.eq(true);
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: testItem.value,
expectedPubsubTopic: TestPubsubTopic
});
});
});
it("Fails to push message with empty content topic", async function () {
try {
createEncoder({ contentTopic: "" });
expect.fail("Expected an error but didn't get one");
} catch (error) {
expect((error as Error).message).to.equal(
"Content topic must be specified"
);
}
});
it("Push message with meta", async function () {
const customTestEncoder = createEncoder({
contentTopic: TestContentTopic,
metaSetter: () => new Uint8Array(10),
pubsubTopic: TestPubsubTopic
});
const pushResponse = await waku.lightPush.send(
customTestEncoder,
messagePayload
);
expect(pushResponse.successes.length).to.eq(1);
expect(
await messageCollector.waitForMessages(1, {
pubsubTopic: TestPubsubTopic
})
).to.eq(true);
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: TestContentTopic,
expectedPubsubTopic: TestPubsubTopic
});
});
it("Fails to push message with large meta", async function () {
const customTestEncoder = createEncoder({
contentTopic: TestContentTopic,
pubsubTopic: TestPubsubTopic,
metaSetter: () => new Uint8Array(105024) // see the note below ***
});
// *** note: this test used 10 ** 6 when `nwaku` node had MaxWakuMessageSize == 1MiB ( 1*2^20 .)
// `nwaku` establishes the max lightpush msg size as `const MaxRpcSize* = MaxWakuMessageSize + 64 * 1024`
// see: https://github.com/waku-org/nwaku/blob/07beea02095035f4f4c234ec2dec1f365e6955b8/waku/waku_lightpush/rpc_codec.nim#L15
// In the PR https://github.com/waku-org/nwaku/pull/2298 we reduced the MaxWakuMessageSize
// from 1MiB to 150KiB. Therefore, the 105024 number comes from substracting ( 1*2^20 - 150*2^10 )
// to the original 10^6 that this test had when MaxWakuMessageSize == 1*2^20
const pushResponse = await waku.lightPush.send(
customTestEncoder,
messagePayload
);
expect(pushResponse.successes.length).to.eq(0);
expect(pushResponse.failures?.map((failure) => failure.error)).to.include(
ProtocolError.REMOTE_PEER_REJECTED
);
expect(
await messageCollector.waitForMessages(1, {
pubsubTopic: TestPubsubTopic
})
).to.eq(false);
});
it("Push message with rate limit", async function () {
const rateLimitProof: IRateLimitProof = {
proof: utf8ToBytes("proofData"),
merkleRoot: utf8ToBytes("merkleRootData"),
epoch: utf8ToBytes("epochData"),
shareX: utf8ToBytes("shareXData"),
shareY: utf8ToBytes("shareYData"),
nullifier: utf8ToBytes("nullifierData"),
rlnIdentifier: utf8ToBytes("rlnIdentifierData")
};
const pushResponse = await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes(messageText),
rateLimitProof: rateLimitProof
});
expect(pushResponse.successes.length).to.eq(1);
expect(
await messageCollector.waitForMessages(1, {
pubsubTopic: TestPubsubTopic
})
).to.eq(true);
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: TestContentTopic,
expectedPubsubTopic: TestPubsubTopic
});
});
[
Date.now() - 3600000 * 24 * 356,
Date.now() - 3600000,
Date.now() + 3600000
].forEach((testItem) => {
it(`Push message with custom timestamp: ${testItem}`, async function () {
const pushResponse = await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes(messageText),
timestamp: new Date(testItem)
});
expect(pushResponse.successes.length).to.eq(1);
expect(
await messageCollector.waitForMessages(1, {
pubsubTopic: TestPubsubTopic
})
).to.eq(true);
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedTimestamp: testItem,
expectedContentTopic: TestContentTopic,
expectedPubsubTopic: TestPubsubTopic
});
});
});
it("Push message equal or less that 1MB", async function () {
const bigPayload = generateRandomUint8Array(65536);
const pushResponse = await waku.lightPush.send(TestEncoder, {
payload: bigPayload
});
expect(pushResponse.successes.length).to.greaterThan(0);
});
it("Fails to push message bigger that 1MB", async function () {
const MB = 1024 ** 2;
const pushResponse = await waku.lightPush.send(TestEncoder, {
payload: generateRandomUint8Array(MB + 65536)
});
expect(pushResponse.successes.length).to.eq(0);
expect(pushResponse.failures?.map((failure) => failure.error)).to.include(
ProtocolError.SIZE_TOO_BIG
);
expect(
await messageCollector.waitForMessages(1, {
pubsubTopic: TestPubsubTopic
})
).to.eq(false);
});
});

View File

@ -1,460 +0,0 @@
import type { PeerId } from "@libp2p/interface";
import { createEncoder } from "@waku/core";
import {
ContentTopicInfo,
LightNode,
NetworkConfig,
Protocols,
ShardInfo,
SingleShardInfo
} from "@waku/interfaces";
import {
contentTopicToPubsubTopic,
contentTopicToShardIndex,
pubsubTopicToSingleShardInfo,
singleShardInfoToPubsubTopic
} from "@waku/utils";
import { utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai";
import { Context } from "mocha";
import {
afterEachCustom,
beforeEachCustom,
makeLogFileName,
MessageCollector,
ServiceNode,
tearDownNodes
} from "../../../src/index.js";
import { messageText, runNodes } from "../utils.js";
describe("Waku Light Push : Multiple PubsubTopics", function () {
this.timeout(30000);
let waku: LightNode;
let nwaku: ServiceNode;
let nwaku2: ServiceNode;
let messageCollector: MessageCollector;
const shardInfo: ShardInfo = { clusterId: 3, shards: [1, 2] };
const singleShardInfo1: SingleShardInfo = { clusterId: 3, shard: 1 };
const singleShardInfo2: SingleShardInfo = { clusterId: 3, shard: 2 };
const customPubsubTopic1 = singleShardInfoToPubsubTopic(singleShardInfo1);
const customPubsubTopic2 = singleShardInfoToPubsubTopic(singleShardInfo2);
const customContentTopic1 = "/test/2/waku-light-push/utf8";
const customContentTopic2 = "/test/3/waku-light-push/utf8";
const customEncoder1 = createEncoder({
pubsubTopicShardInfo: singleShardInfo1,
contentTopic: customContentTopic1
});
const customEncoder2 = createEncoder({
pubsubTopicShardInfo: singleShardInfo2,
contentTopic: customContentTopic2
});
let node1PeerId: PeerId;
beforeEachCustom(this, async () => {
[nwaku, waku] = await runNodes(this.ctx, shardInfo);
messageCollector = new MessageCollector(nwaku);
node1PeerId = await nwaku.getPeerId();
});
afterEachCustom(this, async () => {
await tearDownNodes([nwaku, nwaku2], waku);
});
it("Push message on custom pubsubTopic", async function () {
const pushResponse = await waku.lightPush.send(customEncoder1, {
payload: utf8ToBytes(messageText)
});
expect(pushResponse.successes[0].toString()).to.eq(node1PeerId.toString());
expect(
await messageCollector.waitForMessages(1, {
pubsubTopic: customPubsubTopic1
})
).to.eq(true);
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: customContentTopic1
});
});
it("Subscribe and receive messages on 2 different pubsubtopics", async function () {
const pushResponse1 = await waku.lightPush.send(customEncoder1, {
payload: utf8ToBytes("M1")
});
const pushResponse2 = await waku.lightPush.send(customEncoder2, {
payload: utf8ToBytes("M2")
});
expect(pushResponse1.successes[0].toString()).to.eq(node1PeerId.toString());
expect(pushResponse2.successes[0].toString()).to.eq(node1PeerId.toString());
const messageCollector2 = new MessageCollector(nwaku);
expect(
await messageCollector.waitForMessages(1, {
pubsubTopic: customPubsubTopic1
})
).to.eq(true);
expect(
await messageCollector2.waitForMessages(1, {
pubsubTopic: customPubsubTopic2
})
).to.eq(true);
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: "M1",
expectedContentTopic: customContentTopic1,
expectedPubsubTopic: customPubsubTopic1
});
messageCollector2.verifyReceivedMessage(0, {
expectedMessageText: "M2",
expectedContentTopic: customContentTopic2,
expectedPubsubTopic: customPubsubTopic2
});
});
it("Light push messages to 2 nwaku nodes each with different pubsubtopics", async function () {
// Set up and start a new nwaku node with Default PubsubTopic
nwaku2 = new ServiceNode(makeLogFileName(this) + "2");
await nwaku2.start({
filter: true,
lightpush: true,
relay: true,
pubsubTopic: [singleShardInfoToPubsubTopic(singleShardInfo2)],
clusterId: singleShardInfo2.clusterId
});
await nwaku2.ensureSubscriptions([
singleShardInfoToPubsubTopic(singleShardInfo2)
]);
await waku.dial(await nwaku2.getMultiaddrWithId());
await waku.waitForPeers([Protocols.LightPush]);
const messageCollector2 = new MessageCollector(nwaku2);
await waku.lightPush.send(customEncoder1, {
payload: utf8ToBytes("M1")
});
await waku.lightPush.send(customEncoder2, {
payload: utf8ToBytes("M2")
});
await messageCollector.waitForMessages(1, {
pubsubTopic: customPubsubTopic1
});
await messageCollector2.waitForMessages(1, {
pubsubTopic: singleShardInfoToPubsubTopic(singleShardInfo2)
});
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: "M1",
expectedContentTopic: customContentTopic1,
expectedPubsubTopic: customPubsubTopic1
});
messageCollector2.verifyReceivedMessage(0, {
expectedMessageText: "M2",
expectedContentTopic: customContentTopic2,
expectedPubsubTopic: singleShardInfoToPubsubTopic(singleShardInfo2)
});
});
});
describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () {
this.timeout(30000);
let waku: LightNode;
let nwaku: ServiceNode;
let nwaku2: ServiceNode;
let messageCollector: MessageCollector;
const clusterId = 4;
const customContentTopic1 = "/waku/2/content/test.js";
const customContentTopic2 = "/myapp/1/latest/proto";
const autoshardingPubsubTopic1 = contentTopicToPubsubTopic(
customContentTopic1,
clusterId
);
const autoshardingPubsubTopic2 = contentTopicToPubsubTopic(
customContentTopic2,
clusterId
);
const shardInfo: ContentTopicInfo = {
clusterId,
contentTopics: [customContentTopic1, customContentTopic2]
};
const customEncoder1 = createEncoder({
contentTopic: customContentTopic1,
pubsubTopicShardInfo: pubsubTopicToSingleShardInfo(autoshardingPubsubTopic1)
});
const customEncoder2 = createEncoder({
contentTopic: customContentTopic2,
pubsubTopicShardInfo: pubsubTopicToSingleShardInfo(autoshardingPubsubTopic2)
});
let node1PeerId: PeerId;
beforeEachCustom(this, async () => {
[nwaku, waku] = await runNodes(this.ctx, shardInfo);
messageCollector = new MessageCollector(nwaku);
node1PeerId = await nwaku.getPeerId();
});
afterEachCustom(this, async () => {
await tearDownNodes([nwaku, nwaku2], waku);
});
it("Push message on custom pubsubTopic", async function () {
const pushResponse = await waku.lightPush.send(customEncoder1, {
payload: utf8ToBytes(messageText)
});
expect(pushResponse.failures).to.be.empty;
expect(pushResponse.successes[0].toString()).to.eq(node1PeerId.toString());
expect(
await messageCollector.waitForMessagesAutosharding(1, {
contentTopic: customContentTopic1
})
).to.eq(true);
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: customContentTopic1
});
});
it("Subscribe and receive messages on 2 different pubsubtopics", async function () {
const pushResponse1 = await waku.lightPush.send(customEncoder1, {
payload: utf8ToBytes("M1")
});
const pushResponse2 = await waku.lightPush.send(customEncoder2, {
payload: utf8ToBytes("M2")
});
expect(pushResponse1.successes[0].toString()).to.eq(node1PeerId.toString());
expect(pushResponse2.successes[0].toString()).to.eq(node1PeerId.toString());
const messageCollector2 = new MessageCollector(nwaku);
expect(
await messageCollector.waitForMessagesAutosharding(1, {
contentTopic: customContentTopic1
})
).to.eq(true);
expect(
await messageCollector2.waitForMessagesAutosharding(1, {
contentTopic: customContentTopic2
})
).to.eq(true);
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: "M1",
expectedContentTopic: customContentTopic1,
expectedPubsubTopic: autoshardingPubsubTopic1
});
messageCollector2.verifyReceivedMessage(0, {
expectedMessageText: "M2",
expectedContentTopic: customContentTopic2,
expectedPubsubTopic: autoshardingPubsubTopic2
});
});
it("Light push messages to 2 nwaku nodes each with different pubsubtopics", async function () {
// Set up and start a new nwaku node with Default PubsubTopic
nwaku2 = new ServiceNode(makeLogFileName(this) + "2");
await nwaku2.start({
filter: true,
lightpush: true,
relay: true,
pubsubTopic: [autoshardingPubsubTopic2],
clusterId: shardInfo.clusterId
});
await nwaku2.ensureSubscriptionsAutosharding([customContentTopic2]);
await waku.dial(await nwaku2.getMultiaddrWithId());
await waku.waitForPeers([Protocols.LightPush]);
const messageCollector2 = new MessageCollector(nwaku2);
await waku.lightPush.send(customEncoder1, {
payload: utf8ToBytes("M1")
});
await waku.lightPush.send(customEncoder2, {
payload: utf8ToBytes("M2")
});
await messageCollector.waitForMessagesAutosharding(1, {
contentTopic: customContentTopic1
});
await messageCollector2.waitForMessagesAutosharding(1, {
contentTopic: customContentTopic2
});
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: "M1",
expectedContentTopic: customContentTopic1,
expectedPubsubTopic: autoshardingPubsubTopic1
});
messageCollector2.verifyReceivedMessage(0, {
expectedMessageText: "M2",
expectedContentTopic: customContentTopic2,
expectedPubsubTopic: autoshardingPubsubTopic2
});
});
});
describe("Waku Light Push (named sharding): Multiple PubsubTopics", function () {
this.timeout(30000);
let waku: LightNode;
let waku2: LightNode;
let nwaku: ServiceNode;
let nwaku2: ServiceNode;
let messageCollector: MessageCollector;
let ctx: Context;
const clusterId = 3;
const customContentTopic1 = "/waku/2/content/utf8";
const customContentTopic2 = "/myapp/1/latest/proto";
const autoshardingPubsubTopic1 = contentTopicToPubsubTopic(
customContentTopic1,
clusterId
);
const autoshardingPubsubTopic2 = contentTopicToPubsubTopic(
customContentTopic2,
clusterId
);
const shardInfo1 = {
clusterId,
shards: [contentTopicToShardIndex(customContentTopic1)]
};
const customEncoder1 = createEncoder({
contentTopic: customContentTopic1,
pubsubTopicShardInfo: shardInfo1
});
const shardInfo2 = {
clusterId,
shards: [contentTopicToShardIndex(customContentTopic2)]
};
const customEncoder2 = createEncoder({
contentTopic: customContentTopic2,
pubsubTopicShardInfo: shardInfo2
});
const testShardInfo: NetworkConfig = {
clusterId,
shards: [
contentTopicToShardIndex(customContentTopic1),
contentTopicToShardIndex(customContentTopic2)
]
};
let node1PeerId: PeerId;
beforeEachCustom(this, async () => {
ctx = this.ctx;
[nwaku, waku] = await runNodes(ctx, testShardInfo);
messageCollector = new MessageCollector(nwaku);
node1PeerId = await nwaku.getPeerId();
});
afterEachCustom(this, async () => {
await tearDownNodes([nwaku, nwaku2], [waku, waku2]);
});
it("Push message on custom pubsubTopic", async function () {
const pushResponse = await waku.lightPush.send(customEncoder1, {
payload: utf8ToBytes(messageText)
});
expect(pushResponse.successes[0].toString()).to.eq(node1PeerId.toString());
expect(
await messageCollector.waitForMessages(1, {
pubsubTopic: autoshardingPubsubTopic1
})
).to.eq(true);
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: customContentTopic1
});
});
it("Subscribe and receive messages on 2 different pubsubtopics", async function () {
const pushResponse1 = await waku.lightPush.send(customEncoder1, {
payload: utf8ToBytes("M1")
});
const pushResponse2 = await waku.lightPush.send(customEncoder2, {
payload: utf8ToBytes("M2")
});
expect(pushResponse1.successes[0].toString()).to.eq(node1PeerId.toString());
expect(pushResponse2.successes[0].toString()).to.eq(node1PeerId.toString());
const messageCollector2 = new MessageCollector(nwaku);
expect(
await messageCollector.waitForMessages(1, {
pubsubTopic: autoshardingPubsubTopic1
})
).to.eq(true);
expect(
await messageCollector2.waitForMessages(1, {
pubsubTopic: autoshardingPubsubTopic2
})
).to.eq(true);
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: "M1",
expectedContentTopic: customContentTopic1,
expectedPubsubTopic: autoshardingPubsubTopic1
});
messageCollector2.verifyReceivedMessage(0, {
expectedMessageText: "M2",
expectedContentTopic: customContentTopic2,
expectedPubsubTopic: autoshardingPubsubTopic2
});
});
it("Light push messages to 2 nwaku nodes each with different pubsubtopics", async function () {
// Set up and start a new nwaku node with Default PubsubTopic
[nwaku2] = await runNodes(ctx, shardInfo2);
await nwaku2.ensureSubscriptions([autoshardingPubsubTopic2]);
await waku.dial(await nwaku2.getMultiaddrWithId());
await waku.waitForPeers([Protocols.LightPush]);
const messageCollector2 = new MessageCollector(nwaku2);
const { failures: f1 } = await waku.lightPush.send(customEncoder1, {
payload: utf8ToBytes("M1")
});
const { failures: f2 } = await waku.lightPush.send(customEncoder2, {
payload: utf8ToBytes("M2")
});
expect(f1).to.be.empty;
expect(f2).to.be.empty;
await messageCollector.waitForMessages(1, {
pubsubTopic: autoshardingPubsubTopic1
});
await messageCollector2.waitForMessages(1, {
pubsubTopic: autoshardingPubsubTopic2
});
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: "M1",
expectedContentTopic: customContentTopic1,
expectedPubsubTopic: autoshardingPubsubTopic1
});
messageCollector2.verifyReceivedMessage(0, {
expectedMessageText: "M2",
expectedContentTopic: customContentTopic2,
expectedPubsubTopic: autoshardingPubsubTopic2
});
});
});

View File

@ -1,8 +1,11 @@
import { createEncoder } from "@waku/core"; import { createEncoder } from "@waku/core";
import { LightNode, NetworkConfig, Protocols } from "@waku/interfaces";
import { utf8ToBytes } from "@waku/sdk"; import { utf8ToBytes } from "@waku/sdk";
import { createLightNode } from "@waku/sdk";
import { contentTopicToPubsubTopic, Logger } from "@waku/utils"; import { contentTopicToPubsubTopic, Logger } from "@waku/utils";
import { Context } from "mocha";
import { runNodes } from "../filter/single_node/utils.js"; import { runNodes as runNodesBuilder, ServiceNode } from "../../src/index.js";
// Constants for test configuration. // Constants for test configuration.
export const log = new Logger("test:lightpush"); export const log = new Logger("test:lightpush");
@ -23,4 +26,13 @@ export const TestEncoder = createEncoder({
export const messageText = "Light Push works!"; export const messageText = "Light Push works!";
export const messagePayload = { payload: utf8ToBytes(messageText) }; export const messagePayload = { payload: utf8ToBytes(messageText) };
export { runNodes }; export const runNodes = (
context: Context,
shardInfo: NetworkConfig
): Promise<[ServiceNode, LightNode]> =>
runNodesBuilder<LightNode>({
context,
createNode: createLightNode,
protocols: [Protocols.LightPush, Protocols.Filter],
networkConfig: shardInfo
});

View File

@ -2,7 +2,6 @@ import { MetadataCodec } from "@waku/core";
import type { LightNode, ShardInfo } from "@waku/interfaces"; import type { LightNode, ShardInfo } from "@waku/interfaces";
import { createLightNode } from "@waku/sdk"; import { createLightNode } from "@waku/sdk";
import { decodeRelayShard } from "@waku/utils"; import { decodeRelayShard } from "@waku/utils";
import { shardInfoToPubsubTopics } from "@waku/utils";
import chai, { expect } from "chai"; import chai, { expect } from "chai";
import chaiAsPromised from "chai-as-promised"; import chaiAsPromised from "chai-as-promised";
@ -42,7 +41,7 @@ describe("Metadata Protocol", function () {
discv5Discovery: true, discv5Discovery: true,
peerExchange: true, peerExchange: true,
clusterId: shardInfo.clusterId, clusterId: shardInfo.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo) shard: shardInfo.shards
}); });
const nwaku1Ma = await nwaku1.getMultiaddrWithId(); const nwaku1Ma = await nwaku1.getMultiaddrWithId();
@ -89,7 +88,7 @@ describe("Metadata Protocol", function () {
discv5Discovery: true, discv5Discovery: true,
peerExchange: true, peerExchange: true,
clusterId: shardInfo1.clusterId, clusterId: shardInfo1.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo1) shard: shardInfo1.shards
}); });
const nwaku1Ma = await nwaku1.getMultiaddrWithId(); const nwaku1Ma = await nwaku1.getMultiaddrWithId();
@ -136,7 +135,7 @@ describe("Metadata Protocol", function () {
discv5Discovery: true, discv5Discovery: true,
peerExchange: true, peerExchange: true,
clusterId: shardInfo1.clusterId, clusterId: shardInfo1.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo1) shard: shardInfo1.shards
}); });
const nwaku1Ma = await nwaku1.getMultiaddrWithId(); const nwaku1Ma = await nwaku1.getMultiaddrWithId();
@ -174,7 +173,7 @@ describe("Metadata Protocol", function () {
discv5Discovery: true, discv5Discovery: true,
peerExchange: true, peerExchange: true,
clusterId: shardInfo1.clusterId, clusterId: shardInfo1.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo1) shard: shardInfo1.shards
}); });
const nwaku1Ma = await nwaku1.getMultiaddrWithId(); const nwaku1Ma = await nwaku1.getMultiaddrWithId();
@ -209,7 +208,7 @@ describe("Metadata Protocol", function () {
discv5Discovery: true, discv5Discovery: true,
peerExchange: true, peerExchange: true,
clusterId: shardInfo.clusterId, clusterId: shardInfo.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo) shard: shardInfo.shards
}); });
const nwaku1Ma = await nwaku1.getMultiaddrWithId(); const nwaku1Ma = await nwaku1.getMultiaddrWithId();
@ -245,7 +244,7 @@ describe("Metadata Protocol", function () {
discv5Discovery: true, discv5Discovery: true,
peerExchange: true, peerExchange: true,
clusterId: shardInfo.clusterId, clusterId: shardInfo.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo) shard: shardInfo.shards
}); });
const nwaku1Ma = await nwaku1.getMultiaddrWithId(); const nwaku1Ma = await nwaku1.getMultiaddrWithId();

View File

@ -32,13 +32,15 @@ describe("Peer Exchange", function () {
nwaku1 = new ServiceNode(makeLogFileName(this.ctx) + "1"); nwaku1 = new ServiceNode(makeLogFileName(this.ctx) + "1");
nwaku2 = new ServiceNode(makeLogFileName(this.ctx) + "2"); nwaku2 = new ServiceNode(makeLogFileName(this.ctx) + "2");
await nwaku1.start({ await nwaku1.start({
pubsubTopic: [DefaultTestPubsubTopic], clusterId: DefaultTestShardInfo.clusterId,
shard: DefaultTestShardInfo.shards,
discv5Discovery: true, discv5Discovery: true,
peerExchange: true, peerExchange: true,
relay: true relay: true
}); });
await nwaku2.start({ await nwaku2.start({
pubsubTopic: [DefaultTestPubsubTopic], clusterId: DefaultTestShardInfo.clusterId,
shard: DefaultTestShardInfo.shards,
discv5Discovery: true, discv5Discovery: true,
peerExchange: true, peerExchange: true,
discv5BootstrapNode: (await nwaku1.info()).enrUri, discv5BootstrapNode: (await nwaku1.info()).enrUri,
@ -130,7 +132,8 @@ describe("Peer Exchange", function () {
nwaku3 = new ServiceNode(makeLogFileName(this) + "3"); nwaku3 = new ServiceNode(makeLogFileName(this) + "3");
await nwaku3.start({ await nwaku3.start({
pubsubTopic: [DefaultTestPubsubTopic], clusterId: DefaultTestShardInfo.clusterId,
shard: DefaultTestShardInfo.shards,
discv5Discovery: true, discv5Discovery: true,
peerExchange: true, peerExchange: true,
discv5BootstrapNode: (await nwaku1.info()).enrUri, discv5BootstrapNode: (await nwaku1.info()).enrUri,

View File

@ -24,7 +24,13 @@ import {
export const log = new Logger("test:pe"); export const log = new Logger("test:pe");
const pubsubTopic = [singleShardInfoToPubsubTopic({ clusterId: 0, shard: 2 })]; const ShardInfo = { clusterId: 0, shards: [2] };
const pubsubTopic = [
singleShardInfoToPubsubTopic({
clusterId: ShardInfo.clusterId,
shard: ShardInfo.shards[0]
})
];
describe("Peer Exchange Query", function () { describe("Peer Exchange Query", function () {
this.timeout(30_000); this.timeout(30_000);
@ -47,21 +53,24 @@ describe("Peer Exchange Query", function () {
nwaku2 = new ServiceNode(makeLogFileName(this.ctx) + "2"); nwaku2 = new ServiceNode(makeLogFileName(this.ctx) + "2");
nwaku3 = new ServiceNode(makeLogFileName(this.ctx) + "3"); nwaku3 = new ServiceNode(makeLogFileName(this.ctx) + "3");
await nwaku1.start({ await nwaku1.start({
pubsubTopic: pubsubTopic, shard: ShardInfo.shards,
clusterId: ShardInfo.clusterId,
discv5Discovery: true, discv5Discovery: true,
peerExchange: true, peerExchange: true,
relay: true relay: true
}); });
nwaku1PeerId = await nwaku1.getPeerId(); nwaku1PeerId = await nwaku1.getPeerId();
await nwaku2.start({ await nwaku2.start({
pubsubTopic: pubsubTopic, shard: ShardInfo.shards,
clusterId: ShardInfo.clusterId,
discv5Discovery: true, discv5Discovery: true,
peerExchange: true, peerExchange: true,
discv5BootstrapNode: (await nwaku1.info()).enrUri, discv5BootstrapNode: (await nwaku1.info()).enrUri,
relay: true relay: true
}); });
await nwaku3.start({ await nwaku3.start({
pubsubTopic: pubsubTopic, shard: ShardInfo.shards,
clusterId: ShardInfo.clusterId,
discv5Discovery: true, discv5Discovery: true,
peerExchange: true, peerExchange: true,
discv5BootstrapNode: (await nwaku2.info()).enrUri, discv5BootstrapNode: (await nwaku2.info()).enrUri,

View File

@ -1,4 +1,4 @@
import { LightNode, ProtocolError, Protocols } from "@waku/interfaces"; import { LightNode, ProtocolError } from "@waku/interfaces";
import { createEncoder, createLightNode, utf8ToBytes } from "@waku/sdk"; import { createEncoder, createLightNode, utf8ToBytes } from "@waku/sdk";
import { import {
contentTopicToPubsubTopic, contentTopicToPubsubTopic,
@ -8,11 +8,9 @@ import { expect } from "chai";
import { import {
afterEachCustom, afterEachCustom,
beforeEachCustom, runMultipleNodes,
makeLogFileName, ServiceNodesFleet,
MessageCollector, teardownNodesWithRedundancy
ServiceNode,
tearDownNodes
} from "../../src/index.js"; } from "../../src/index.js";
const ContentTopic = "/waku/2/content/test.js"; const ContentTopic = "/waku/2/content/test.js";
@ -21,43 +19,104 @@ const ContentTopic2 = "/myapp/1/latest/proto";
describe("Autosharding: Running Nodes", function () { describe("Autosharding: Running Nodes", function () {
this.timeout(50000); this.timeout(50000);
const clusterId = 10; const clusterId = 10;
let waku: LightNode; const numServiceNodes = 2;
let nwaku: ServiceNode;
let messageCollector: MessageCollector;
beforeEachCustom(this, async () => { let waku: LightNode | undefined = undefined;
nwaku = new ServiceNode(makeLogFileName(this.ctx)); let serviceNodes: ServiceNodesFleet | undefined = undefined;
messageCollector = new MessageCollector(nwaku);
});
afterEachCustom(this, async () => { afterEachCustom(this, async () => {
await tearDownNodes(nwaku, waku); if (serviceNodes) {
await teardownNodesWithRedundancy(serviceNodes, waku ?? []);
}
}); });
describe("Different clusters and topics", function () { // js-waku allows autosharding for cluster IDs different than 1
// js-waku allows autosharding for cluster IDs different than 1 it("Cluster ID 0 - Default/Global Cluster", async function () {
it("Cluster ID 0 - Default/Global Cluster", async function () { const clusterId = 0;
const clusterId = 0;
const pubsubTopics = [contentTopicToPubsubTopic(ContentTopic, clusterId)]; [serviceNodes, waku] = await runMultipleNodes(
await nwaku.start({ this.ctx,
store: true, { clusterId, contentTopics: [ContentTopic] },
lightpush: true, { lightpush: true, filter: true },
relay: true, false,
numServiceNodes,
true
);
const encoder = createEncoder({
contentTopic: ContentTopic,
pubsubTopicShardInfo: {
clusterId: clusterId, clusterId: clusterId,
pubsubTopic: pubsubTopics shard: contentTopicToShardIndex(ContentTopic)
}); }
});
await nwaku.ensureSubscriptions(pubsubTopics); const request = await waku.lightPush.send(encoder, {
payload: utf8ToBytes("Hello World")
});
waku = await createLightNode({ expect(request.successes.length).to.eq(numServiceNodes);
networkConfig: { expect(
clusterId: clusterId, await serviceNodes.messageCollector.waitForMessages(1, {
contentTopics: [ContentTopic] pubsubTopic: encoder.pubsubTopic
} })
}); ).to.eq(true);
await waku.start(); });
await waku.dial(await nwaku.getMultiaddrWithId());
await waku.waitForPeers([Protocols.LightPush]); it("Non TWN Cluster", async function () {
const clusterId = 5;
[serviceNodes, waku] = await runMultipleNodes(
this.ctx,
{ clusterId, contentTopics: [ContentTopic] },
{ lightpush: true, filter: true },
false,
numServiceNodes,
true
);
const encoder = createEncoder({
contentTopic: ContentTopic,
pubsubTopicShardInfo: {
clusterId: clusterId,
shard: contentTopicToShardIndex(ContentTopic)
}
});
const request = await waku.lightPush.send(encoder, {
payload: utf8ToBytes("Hello World")
});
expect(request.successes.length).to.eq(numServiceNodes);
expect(
await serviceNodes.messageCollector.waitForMessages(1, {
pubsubTopic: encoder.pubsubTopic
})
).to.eq(true);
});
const numTest = 10;
for (let i = 0; i < numTest; i++) {
// Random ContentTopic
const applicationName = `app${Math.floor(Math.random() * 100)}`; // Random application name app0 to app99
const version = Math.floor(Math.random() * 10) + 1; // Random version between 1 and 10
const topicName = `topic${Math.floor(Math.random() * 1000)}`; // Random topic name topic0 to topic999
const encodingList = ["proto", "json", "xml", "test.js", "utf8"]; // Potential encodings
const encoding =
encodingList[Math.floor(Math.random() * encodingList.length)]; // Random encoding
const ContentTopic = `/${applicationName}/${version}/${topicName}/${encoding}`;
it(`random auto sharding ${
i + 1
} - Cluster ID: ${clusterId}, Content Topic: ${ContentTopic}`, async function () {
[serviceNodes, waku] = await runMultipleNodes(
this.ctx,
{ clusterId, contentTopics: [ContentTopic] },
{ lightpush: true, filter: true },
false,
numServiceNodes,
true
);
const encoder = createEncoder({ const encoder = createEncoder({
contentTopic: ContentTopic, contentTopic: ContentTopic,
@ -71,256 +130,129 @@ describe("Autosharding: Running Nodes", function () {
payload: utf8ToBytes("Hello World") payload: utf8ToBytes("Hello World")
}); });
expect(request.successes.length).to.eq(1); expect(request.successes.length).to.eq(numServiceNodes);
expect( expect(
await messageCollector.waitForMessagesAutosharding(1, { await serviceNodes.messageCollector.waitForMessages(1, {
contentTopic: ContentTopic pubsubTopic: encoder.pubsubTopic
}) })
).to.eq(true); ).to.eq(true);
}); });
}
it("Non TWN Cluster", async function () { // TODO: replace with unit tests
const clusterId = 5; it("Wrong topic", async function () {
const pubsubTopics = [contentTopicToPubsubTopic(ContentTopic, clusterId)]; const wrongTopic = "wrong_format";
await nwaku.start({ try {
store: true, contentTopicToPubsubTopic(wrongTopic, clusterId);
lightpush: true, throw new Error("Wrong topic should've thrown an error");
relay: true, } catch (err) {
if (
!(err instanceof Error) ||
!err.message.includes("Content topic format is invalid")
) {
throw err;
}
}
});
it("configure the node with multiple content topics", async function () {
[serviceNodes, waku] = await runMultipleNodes(
this.ctx,
{ clusterId, contentTopics: [ContentTopic, ContentTopic2] },
{ lightpush: true, filter: true },
false,
numServiceNodes,
true
);
const encoder1 = createEncoder({
contentTopic: ContentTopic,
pubsubTopicShardInfo: {
clusterId: clusterId, clusterId: clusterId,
pubsubTopic: pubsubTopics shard: contentTopicToShardIndex(ContentTopic)
}); }
await nwaku.ensureSubscriptions(pubsubTopics);
waku = await createLightNode({
networkConfig: {
clusterId: clusterId,
contentTopics: [ContentTopic]
}
});
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waku.waitForPeers([Protocols.LightPush]);
const encoder = createEncoder({
contentTopic: ContentTopic,
pubsubTopicShardInfo: {
clusterId: clusterId,
shard: contentTopicToShardIndex(ContentTopic)
}
});
const request = await waku.lightPush.send(encoder, {
payload: utf8ToBytes("Hello World")
});
expect(request.successes.length).to.eq(1);
expect(
await messageCollector.waitForMessagesAutosharding(1, {
contentTopic: ContentTopic
})
).to.eq(true);
}); });
const numTest = 10; const encoder2 = createEncoder({
for (let i = 0; i < numTest; i++) { contentTopic: ContentTopic2,
// Random ContentTopic pubsubTopicShardInfo: {
const applicationName = `app${Math.floor(Math.random() * 100)}`; // Random application name app0 to app99 clusterId: clusterId,
const version = Math.floor(Math.random() * 10) + 1; // Random version between 1 and 10 shard: contentTopicToShardIndex(ContentTopic2)
const topicName = `topic${Math.floor(Math.random() * 1000)}`; // Random topic name topic0 to topic999 }
const encodingList = ["proto", "json", "xml", "test.js", "utf8"]; // Potential encodings });
const encoding =
encodingList[Math.floor(Math.random() * encodingList.length)]; // Random encoding
const ContentTopic = `/${applicationName}/${version}/${topicName}/${encoding}`;
it(`random auto sharding ${ const request1 = await waku.lightPush.send(encoder1, {
i + 1 payload: utf8ToBytes("Hello World")
} - Cluster ID: ${clusterId}, Content Topic: ${ContentTopic}`, async function () { });
const pubsubTopics = [ expect(request1.successes.length).to.eq(numServiceNodes);
contentTopicToPubsubTopic(ContentTopic, clusterId) expect(
]; await serviceNodes.messageCollector.waitForMessages(1, {
pubsubTopic: encoder1.pubsubTopic
})
).to.eq(true);
await nwaku.start({ const request2 = await waku.lightPush.send(encoder2, {
store: true, payload: utf8ToBytes("Hello World")
lightpush: true, });
relay: true, expect(request2.successes.length).to.eq(numServiceNodes);
clusterId: clusterId, expect(
pubsubTopic: pubsubTopics, await serviceNodes.messageCollector.waitForMessages(1, {
contentTopic: [ContentTopic] pubsubTopic: encoder2.pubsubTopic
}); })
).to.eq(true);
});
waku = await createLightNode({ it("using a protocol with unconfigured pubsub topic should fail", async function () {
networkConfig: { [serviceNodes, waku] = await runMultipleNodes(
clusterId: clusterId, this.ctx,
contentTopics: [ContentTopic] { clusterId, contentTopics: [ContentTopic] },
} { lightpush: true, filter: true },
}); false,
await waku.start(); numServiceNodes,
await waku.dial(await nwaku.getMultiaddrWithId()); true
await waku.waitForPeers([Protocols.LightPush]); );
const encoder = createEncoder({ // use a content topic that is not configured
contentTopic: ContentTopic, const encoder = createEncoder({
pubsubTopicShardInfo: { contentTopic: ContentTopic2,
clusterId: clusterId, pubsubTopicShardInfo: {
shard: contentTopicToShardIndex(ContentTopic) clusterId: clusterId,
} shard: contentTopicToShardIndex(ContentTopic2)
}); }
});
const request = await waku.lightPush.send(encoder, { const { successes, failures } = await waku.lightPush.send(encoder, {
payload: utf8ToBytes("Hello World") payload: utf8ToBytes("Hello World")
}); });
expect(request.successes.length).to.eq(1); if (successes.length > 0 || failures?.length === 0) {
expect( throw new Error("The request should've thrown an error");
await messageCollector.waitForMessagesAutosharding(1, {
contentTopic: ContentTopic
})
).to.eq(true);
});
} }
it("Wrong topic", async function () { const errors = failures?.map((failure) => failure.error);
const wrongTopic = "wrong_format"; expect(errors).to.include(ProtocolError.TOPIC_NOT_CONFIGURED);
try {
contentTopicToPubsubTopic(wrongTopic, clusterId);
throw new Error("Wrong topic should've thrown an error");
} catch (err) {
if (
!(err instanceof Error) ||
!err.message.includes("Content topic format is invalid")
) {
throw err;
}
}
});
}); });
describe("Others", function () { it("start node with empty content topic", async function () {
it("configure the node with multiple content topics", async function () { try {
const pubsubTopics = [
contentTopicToPubsubTopic(ContentTopic, clusterId),
contentTopicToPubsubTopic(ContentTopic2, clusterId)
];
await nwaku.start({
store: true,
lightpush: true,
relay: true,
clusterId: clusterId,
pubsubTopic: pubsubTopics,
contentTopic: [ContentTopic, ContentTopic2]
});
waku = await createLightNode({ waku = await createLightNode({
networkConfig: { networkConfig: {
clusterId: clusterId, clusterId: clusterId,
// For autosharding, we configure multiple pubsub topics by using two content topics that hash to different shards contentTopics: []
contentTopics: [ContentTopic, ContentTopic2]
} }
}); });
await waku.start(); throw new Error(
await waku.dial(await nwaku.getMultiaddrWithId()); "Starting the node with no content topic should've thrown an error"
await waku.waitForPeers([Protocols.LightPush]); );
} catch (err) {
const encoder1 = createEncoder({ if (
contentTopic: ContentTopic, !(err instanceof Error) ||
pubsubTopicShardInfo: { !err.message.includes(
clusterId: clusterId, "Invalid content topics configuration: please provide at least one content topic"
shard: contentTopicToShardIndex(ContentTopic) )
} ) {
}); throw err;
const encoder2 = createEncoder({
contentTopic: ContentTopic2,
pubsubTopicShardInfo: {
clusterId: clusterId,
shard: contentTopicToShardIndex(ContentTopic2)
}
});
const request1 = await waku.lightPush.send(encoder1, {
payload: utf8ToBytes("Hello World")
});
expect(request1.successes.length).to.eq(1);
expect(
await messageCollector.waitForMessagesAutosharding(1, {
contentTopic: ContentTopic
})
).to.eq(true);
const request2 = await waku.lightPush.send(encoder2, {
payload: utf8ToBytes("Hello World")
});
expect(request2.successes.length).to.eq(1);
expect(
await messageCollector.waitForMessagesAutosharding(1, {
contentTopic: ContentTopic
})
).to.eq(true);
});
it("using a protocol with unconfigured pubsub topic should fail", async function () {
const pubsubTopics = [contentTopicToPubsubTopic(ContentTopic, clusterId)];
await nwaku.start({
store: true,
lightpush: true,
relay: true,
clusterId: clusterId,
pubsubTopic: pubsubTopics,
contentTopic: [ContentTopic]
});
waku = await createLightNode({
networkConfig: {
clusterId: clusterId,
contentTopics: [ContentTopic]
}
});
await waku.start();
// use a content topic that is not configured
const encoder = createEncoder({
contentTopic: ContentTopic2,
pubsubTopicShardInfo: {
clusterId: clusterId,
shard: contentTopicToShardIndex(ContentTopic2)
}
});
const { successes, failures } = await waku.lightPush.send(encoder, {
payload: utf8ToBytes("Hello World")
});
if (successes.length > 0 || failures?.length === 0) {
throw new Error("The request should've thrown an error");
} }
}
const errors = failures?.map((failure) => failure.error);
expect(errors).to.include(ProtocolError.TOPIC_NOT_CONFIGURED);
});
it("start node with empty content topic", async function () {
try {
waku = await createLightNode({
networkConfig: {
clusterId: clusterId,
contentTopics: []
}
});
throw new Error(
"Starting the node with no content topic should've thrown an error"
);
} catch (err) {
if (
!(err instanceof Error) ||
!err.message.includes(
"Invalid content topics configuration: please provide at least one content topic"
)
) {
throw err;
}
}
});
}); });
}); });

View File

@ -10,6 +10,7 @@ import {
} from "@waku/sdk"; } from "@waku/sdk";
import { import {
contentTopicToPubsubTopic, contentTopicToPubsubTopic,
contentTopicToShardIndex,
singleShardInfoToPubsubTopic singleShardInfoToPubsubTopic
} from "@waku/utils"; } from "@waku/utils";
import chai, { expect } from "chai"; import chai, { expect } from "chai";
@ -57,7 +58,6 @@ describe("Static Sharding: Peer Management", function () {
const shardInfo: ShardInfo = { clusterId: clusterId, shards: [2] }; const shardInfo: ShardInfo = { clusterId: clusterId, shards: [2] };
await nwaku1.start({ await nwaku1.start({
pubsubTopic: pubsubTopics,
discv5Discovery: true, discv5Discovery: true,
peerExchange: true, peerExchange: true,
relay: true, relay: true,
@ -68,7 +68,6 @@ describe("Static Sharding: Peer Management", function () {
const enr1 = (await nwaku1.info()).enrUri; const enr1 = (await nwaku1.info()).enrUri;
await nwaku2.start({ await nwaku2.start({
pubsubTopic: pubsubTopics,
discv5Discovery: true, discv5Discovery: true,
peerExchange: true, peerExchange: true,
discv5BootstrapNode: enr1, discv5BootstrapNode: enr1,
@ -80,7 +79,6 @@ describe("Static Sharding: Peer Management", function () {
const enr2 = (await nwaku2.info()).enrUri; const enr2 = (await nwaku2.info()).enrUri;
await nwaku3.start({ await nwaku3.start({
pubsubTopic: pubsubTopics,
discv5Discovery: true, discv5Discovery: true,
peerExchange: true, peerExchange: true,
discv5BootstrapNode: enr2, discv5BootstrapNode: enr2,
@ -133,13 +131,9 @@ describe("Static Sharding: Peer Management", function () {
singleShardInfoToPubsubTopic({ clusterId: clusterId, shard: 2 }) singleShardInfoToPubsubTopic({ clusterId: clusterId, shard: 2 })
]; ];
const shardInfoToDial: ShardInfo = { clusterId: clusterId, shards: [2] }; const shardInfoToDial: ShardInfo = { clusterId: clusterId, shards: [2] };
const pubsubTopicsToIgnore = [
singleShardInfoToPubsubTopic({ clusterId: clusterId, shard: 1 })
];
// this service node is not subscribed to the shard // this service node is not subscribed to the shard
await nwaku1.start({ await nwaku1.start({
pubsubTopic: pubsubTopicsToIgnore,
relay: true, relay: true,
discv5Discovery: true, discv5Discovery: true,
peerExchange: true, peerExchange: true,
@ -150,7 +144,6 @@ describe("Static Sharding: Peer Management", function () {
const enr1 = (await nwaku1.info()).enrUri; const enr1 = (await nwaku1.info()).enrUri;
await nwaku2.start({ await nwaku2.start({
pubsubTopic: pubsubTopicsToDial,
relay: true, relay: true,
discv5Discovery: true, discv5Discovery: true,
peerExchange: true, peerExchange: true,
@ -162,7 +155,6 @@ describe("Static Sharding: Peer Management", function () {
const enr2 = (await nwaku2.info()).enrUri; const enr2 = (await nwaku2.info()).enrUri;
await nwaku3.start({ await nwaku3.start({
pubsubTopic: pubsubTopicsToDial,
relay: true, relay: true,
discv5Discovery: true, discv5Discovery: true,
peerExchange: true, peerExchange: true,
@ -213,6 +205,7 @@ describe("Static Sharding: Peer Management", function () {
describe("Autosharding: Peer Management", function () { describe("Autosharding: Peer Management", function () {
const ContentTopic = "/myapp/1/latest/proto"; const ContentTopic = "/myapp/1/latest/proto";
const clusterId = 8; const clusterId = 8;
const Shard = [contentTopicToShardIndex(ContentTopic)];
describe("Peer Exchange", function () { describe("Peer Exchange", function () {
let waku: LightNode; let waku: LightNode;
@ -243,35 +236,35 @@ describe("Autosharding: Peer Management", function () {
}; };
await nwaku1.start({ await nwaku1.start({
pubsubTopic: pubsubTopics,
discv5Discovery: true, discv5Discovery: true,
peerExchange: true, peerExchange: true,
relay: true, relay: true,
clusterId: clusterId, clusterId: clusterId,
shard: Shard,
contentTopic: [ContentTopic] contentTopic: [ContentTopic]
}); });
const enr1 = (await nwaku1.info()).enrUri; const enr1 = (await nwaku1.info()).enrUri;
await nwaku2.start({ await nwaku2.start({
pubsubTopic: pubsubTopics,
discv5Discovery: true, discv5Discovery: true,
peerExchange: true, peerExchange: true,
discv5BootstrapNode: enr1, discv5BootstrapNode: enr1,
relay: true, relay: true,
clusterId: clusterId, clusterId: clusterId,
shard: Shard,
contentTopic: [ContentTopic] contentTopic: [ContentTopic]
}); });
const enr2 = (await nwaku2.info()).enrUri; const enr2 = (await nwaku2.info()).enrUri;
await nwaku3.start({ await nwaku3.start({
pubsubTopic: pubsubTopics,
discv5Discovery: true, discv5Discovery: true,
peerExchange: true, peerExchange: true,
discv5BootstrapNode: enr2, discv5BootstrapNode: enr2,
relay: true, relay: true,
clusterId: clusterId, clusterId: clusterId,
shard: Shard,
contentTopic: [ContentTopic] contentTopic: [ContentTopic]
}); });
const nwaku3Ma = await nwaku3.getMultiaddrWithId(); const nwaku3Ma = await nwaku3.getMultiaddrWithId();
@ -322,38 +315,37 @@ describe("Autosharding: Peer Management", function () {
clusterId: clusterId, clusterId: clusterId,
contentTopics: [ContentTopic] contentTopics: [ContentTopic]
}; };
const pubsubTopicsToIgnore = [contentTopicToPubsubTopic(ContentTopic, 3)];
// this service node is not subscribed to the shard // this service node is not subscribed to the shard
await nwaku1.start({ await nwaku1.start({
pubsubTopic: pubsubTopicsToIgnore,
relay: true, relay: true,
discv5Discovery: true, discv5Discovery: true,
peerExchange: true, peerExchange: true,
clusterId: 3 clusterId: 3,
shard: Shard
}); });
const enr1 = (await nwaku1.info()).enrUri; const enr1 = (await nwaku1.info()).enrUri;
await nwaku2.start({ await nwaku2.start({
pubsubTopic: pubsubTopicsToDial,
relay: true, relay: true,
discv5Discovery: true, discv5Discovery: true,
peerExchange: true, peerExchange: true,
discv5BootstrapNode: enr1, discv5BootstrapNode: enr1,
clusterId: clusterId, clusterId: clusterId,
shard: Shard,
contentTopic: [ContentTopic] contentTopic: [ContentTopic]
}); });
const enr2 = (await nwaku2.info()).enrUri; const enr2 = (await nwaku2.info()).enrUri;
await nwaku3.start({ await nwaku3.start({
pubsubTopic: pubsubTopicsToDial,
relay: true, relay: true,
discv5Discovery: true, discv5Discovery: true,
peerExchange: true, peerExchange: true,
discv5BootstrapNode: enr2, discv5BootstrapNode: enr2,
clusterId: clusterId, clusterId: clusterId,
shard: Shard,
contentTopic: [ContentTopic] contentTopic: [ContentTopic]
}); });
const nwaku3Ma = await nwaku3.getMultiaddrWithId(); const nwaku3Ma = await nwaku3.getMultiaddrWithId();

View File

@ -1,10 +1,4 @@
import { import { LightNode, ProtocolError, SingleShardInfo } from "@waku/interfaces";
LightNode,
ProtocolError,
Protocols,
ShardInfo,
SingleShardInfo
} from "@waku/interfaces";
import { createEncoder, createLightNode, utf8ToBytes } from "@waku/sdk"; import { createEncoder, createLightNode, utf8ToBytes } from "@waku/sdk";
import { import {
shardInfoToPubsubTopics, shardInfoToPubsubTopics,
@ -16,172 +10,134 @@ import { expect } from "chai";
import { import {
afterEachCustom, afterEachCustom,
beforeEachCustom, beforeEachCustom,
makeLogFileName, runMultipleNodes,
MessageCollector, ServiceNodesFleet,
ServiceNode, teardownNodesWithRedundancy
tearDownNodes
} from "../../src/index.js"; } from "../../src/index.js";
const ContentTopic = "/waku/2/content/test.js"; const ContentTopic = "/waku/2/content/test.js";
describe("Static Sharding: Running Nodes", function () { describe("Static Sharding: Running Nodes", function () {
this.timeout(15_000); this.timeout(15_000);
let waku: LightNode; const numServiceNodes = 2;
let nwaku: ServiceNode;
let messageCollector: MessageCollector;
beforeEachCustom(this, async () => { let waku: LightNode | undefined = undefined;
nwaku = new ServiceNode(makeLogFileName(this.ctx)); let serviceNodes: ServiceNodesFleet | undefined = undefined;
messageCollector = new MessageCollector(nwaku);
});
afterEachCustom(this, async () => { afterEachCustom(this, async () => {
await tearDownNodes(nwaku, waku); if (serviceNodes) {
}); await teardownNodesWithRedundancy(serviceNodes, waku ?? []);
describe("Different clusters and shards", function () {
it("shard 0", async function () {
const singleShardInfo = { clusterId: 0, shard: 0 };
const shardInfo = singleShardInfosToShardInfo([singleShardInfo]);
await nwaku.start({
store: true,
lightpush: true,
relay: true,
pubsubTopic: shardInfoToPubsubTopics(shardInfo)
});
await nwaku.ensureSubscriptions(shardInfoToPubsubTopics(shardInfo));
waku = await createLightNode({
networkConfig: shardInfo
});
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waku.waitForPeers([Protocols.LightPush]);
const encoder = createEncoder({
contentTopic: ContentTopic,
pubsubTopicShardInfo: singleShardInfo
});
expect(encoder.pubsubTopic).to.eq(
singleShardInfoToPubsubTopic(singleShardInfo)
);
const request = await waku.lightPush.send(encoder, {
payload: utf8ToBytes("Hello World")
});
expect(request.successes.length).to.eq(1);
expect(
await messageCollector.waitForMessages(1, {
pubsubTopic: shardInfoToPubsubTopics(shardInfo)[0]
})
).to.eq(true);
});
// dedicated test for Default Cluster ID 0
it("Cluster ID 0 - Default/Global Cluster", async function () {
const singleShardInfo = { clusterId: 0, shard: 1 };
const shardInfo = singleShardInfosToShardInfo([singleShardInfo]);
await nwaku.start({
store: true,
lightpush: true,
relay: true,
pubsubTopic: shardInfoToPubsubTopics(shardInfo)
});
await nwaku.ensureSubscriptions(shardInfoToPubsubTopics(shardInfo));
waku = await createLightNode({
networkConfig: shardInfo
});
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waku.waitForPeers([Protocols.LightPush]);
const encoder = createEncoder({
contentTopic: ContentTopic,
pubsubTopicShardInfo: singleShardInfo
});
const request = await waku.lightPush.send(encoder, {
payload: utf8ToBytes("Hello World")
});
expect(request.successes.length).to.eq(1);
expect(
await messageCollector.waitForMessages(1, {
pubsubTopic: shardInfoToPubsubTopics(shardInfo)[0]
})
).to.eq(true);
});
const numTest = 10;
for (let i = 0; i < numTest; i++) {
// Random clusterId between 2 and 1000
const clusterId = Math.floor(Math.random() * 999) + 2;
// Random shardId between 1 and 1000
const shardId = Math.floor(Math.random() * 1000) + 1;
it(`random static sharding ${
i + 1
} - Cluster ID: ${clusterId}, Shard ID: ${shardId}`, async function () {
afterEach(async () => {
await tearDownNodes(nwaku, waku);
});
const singleShardInfo = { clusterId: clusterId, shard: shardId };
const shardInfo = singleShardInfosToShardInfo([singleShardInfo]);
await nwaku.start({
store: true,
lightpush: true,
relay: true,
clusterId: clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo)
});
waku = await createLightNode({
networkConfig: shardInfo
});
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waku.waitForPeers([Protocols.LightPush]);
const encoder = createEncoder({
contentTopic: ContentTopic,
pubsubTopicShardInfo: singleShardInfo
});
const request = await waku.lightPush.send(encoder, {
payload: utf8ToBytes("Hello World")
});
expect(request.successes.length).to.eq(1);
expect(
await messageCollector.waitForMessages(1, {
pubsubTopic: shardInfoToPubsubTopics(shardInfo)[0]
})
).to.eq(true);
});
} }
}); });
it("shard 0", async function () {
const singleShardInfo = { clusterId: 0, shard: 0 };
const shardInfo = singleShardInfosToShardInfo([singleShardInfo]);
[serviceNodes, waku] = await runMultipleNodes(
this.ctx,
shardInfo,
{ lightpush: true, filter: true },
false,
numServiceNodes,
true
);
const encoder = createEncoder({
contentTopic: ContentTopic,
pubsubTopicShardInfo: singleShardInfo
});
expect(encoder.pubsubTopic).to.eq(
singleShardInfoToPubsubTopic(singleShardInfo)
);
const request = await waku.lightPush.send(encoder, {
payload: utf8ToBytes("Hello World")
});
expect(request.successes.length).to.eq(numServiceNodes);
expect(
await serviceNodes.messageCollector.waitForMessages(1, {
pubsubTopic: encoder.pubsubTopic
})
).to.eq(true);
});
// dedicated test for Default Cluster ID 0
it("Cluster ID 0 - Default/Global Cluster", async function () {
const singleShardInfo = { clusterId: 0, shard: 1 };
const shardInfo = singleShardInfosToShardInfo([singleShardInfo]);
[serviceNodes, waku] = await runMultipleNodes(
this.ctx,
shardInfo,
{ lightpush: true, filter: true },
false,
numServiceNodes,
true
);
const encoder = createEncoder({
contentTopic: ContentTopic,
pubsubTopicShardInfo: singleShardInfo
});
const request = await waku.lightPush.send(encoder, {
payload: utf8ToBytes("Hello World")
});
expect(request.successes.length).to.eq(numServiceNodes);
expect(
await serviceNodes.messageCollector.waitForMessages(1, {
pubsubTopic: shardInfoToPubsubTopics(shardInfo)[0]
})
).to.eq(true);
});
const numTest = 10;
for (let i = 0; i < numTest; i++) {
// Random clusterId between 2 and 1000
const clusterId = Math.floor(Math.random() * 999) + 2;
// Random shardId between 1 and 1000
const shardId = Math.floor(Math.random() * 1000) + 1;
it(`random static sharding ${
i + 1
} - Cluster ID: ${clusterId}, Shard ID: ${shardId}`, async function () {
const singleShardInfo = { clusterId: clusterId, shard: shardId };
const shardInfo = singleShardInfosToShardInfo([singleShardInfo]);
[serviceNodes, waku] = await runMultipleNodes(
this.ctx,
shardInfo,
{ lightpush: true, filter: true },
false,
numServiceNodes,
true
);
const encoder = createEncoder({
contentTopic: ContentTopic,
pubsubTopicShardInfo: singleShardInfo
});
const request = await waku.lightPush.send(encoder, {
payload: utf8ToBytes("Hello World")
});
expect(request.successes.length).to.eq(numServiceNodes);
expect(
await serviceNodes.messageCollector.waitForMessages(1, {
pubsubTopic: shardInfoToPubsubTopics(shardInfo)[0]
})
).to.eq(true);
});
}
describe("Others", function () { describe("Others", function () {
const clusterId = 2; const clusterId = 2;
let shardInfo: ShardInfo;
const shardInfoFirstShard: ShardInfo = {
clusterId: clusterId,
shards: [2]
};
const shardInfoBothShards: ShardInfo = {
clusterId: clusterId,
shards: [2, 3]
};
const singleShardInfo1: SingleShardInfo = { const singleShardInfo1: SingleShardInfo = {
clusterId: clusterId, clusterId: clusterId,
shard: 2 shard: 2
@ -192,32 +148,23 @@ describe("Static Sharding: Running Nodes", function () {
}; };
beforeEachCustom(this, async () => { beforeEachCustom(this, async () => {
shardInfo = { [serviceNodes, waku] = await runMultipleNodes(
clusterId: clusterId, this.ctx,
shards: [2] { clusterId, shards: [2, 3] },
}; { lightpush: true, filter: true },
false,
await nwaku.start({ numServiceNodes,
store: true, true
lightpush: true, );
relay: true,
clusterId: clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo)
});
}); });
afterEachCustom(this, async () => { afterEachCustom(this, async () => {
await tearDownNodes(nwaku, waku); if (serviceNodes) {
await teardownNodesWithRedundancy(serviceNodes, waku ?? []);
}
}); });
it("configure the node with multiple pubsub topics", async function () { it("configure the node with multiple pubsub topics", async function () {
waku = await createLightNode({
networkConfig: shardInfoBothShards
});
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waku.waitForPeers([Protocols.LightPush]);
const encoder1 = createEncoder({ const encoder1 = createEncoder({
contentTopic: ContentTopic, contentTopic: ContentTopic,
pubsubTopicShardInfo: singleShardInfo1 pubsubTopicShardInfo: singleShardInfo1
@ -228,49 +175,53 @@ describe("Static Sharding: Running Nodes", function () {
pubsubTopicShardInfo: singleShardInfo2 pubsubTopicShardInfo: singleShardInfo2
}); });
const request1 = await waku.lightPush.send(encoder1, { const request1 = await waku?.lightPush.send(encoder1, {
payload: utf8ToBytes("Hello World2") payload: utf8ToBytes("Hello World2")
}); });
expect(request1.successes.length).to.eq(1);
expect(request1?.successes.length).to.eq(numServiceNodes);
expect( expect(
await messageCollector.waitForMessages(1, { await serviceNodes?.messageCollector.waitForMessages(1, {
pubsubTopic: shardInfoToPubsubTopics(shardInfo)[0] pubsubTopic: encoder1.pubsubTopic
}) })
).to.eq(true); ).to.eq(true);
const request2 = await waku.lightPush.send(encoder2, { const request2 = await waku?.lightPush.send(encoder2, {
payload: utf8ToBytes("Hello World3") payload: utf8ToBytes("Hello World3")
}); });
expect(request2.successes.length).to.eq(1);
expect(request2?.successes.length).to.eq(numServiceNodes);
expect( expect(
await messageCollector.waitForMessages(1, { await serviceNodes?.messageCollector.waitForMessages(1, {
pubsubTopic: shardInfoToPubsubTopics(shardInfo)[0] pubsubTopic: encoder2.pubsubTopic
}) })
).to.eq(true); ).to.eq(true);
}); });
it("using a protocol with unconfigured pubsub topic should fail", async function () { it("using a protocol with unconfigured pubsub topic should fail", async function () {
this.timeout(15_000); this.timeout(15_000);
waku = await createLightNode({
networkConfig: shardInfoFirstShard
});
await waku.start();
// use a pubsub topic that is not configured // use a pubsub topic that is not configured
const encoder = createEncoder({ const encoder = createEncoder({
contentTopic: ContentTopic, contentTopic: ContentTopic,
pubsubTopicShardInfo: singleShardInfo2 pubsubTopicShardInfo: {
clusterId,
shard: 4
}
}); });
const { successes, failures } = await waku.lightPush.send(encoder, { const request = await waku?.lightPush.send(encoder, {
payload: utf8ToBytes("Hello World") payload: utf8ToBytes("Hello World")
}); });
if (successes.length > 0 || failures?.length === 0) { if (
(request?.successes.length || 0) > 0 ||
request?.failures?.length === 0
) {
throw new Error("The request should've thrown an error"); throw new Error("The request should've thrown an error");
} }
const errors = failures?.map((failure) => failure.error); const errors = request?.failures?.map((failure) => failure.error);
expect(errors).to.include(ProtocolError.TOPIC_NOT_CONFIGURED); expect(errors).to.include(ProtocolError.TOPIC_NOT_CONFIGURED);
}); });

View File

@ -109,8 +109,8 @@ describe("Waku Store, custom pubsub topic", function () {
nwaku2 = new ServiceNode(makeLogFileName(this) + "2"); nwaku2 = new ServiceNode(makeLogFileName(this) + "2");
await nwaku2.start({ await nwaku2.start({
store: true, store: true,
pubsubTopic: [TestDecoder2.pubsubTopic],
clusterId: TestShardInfo.clusterId, clusterId: TestShardInfo.clusterId,
shard: TestShardInfo.shards,
relay: true relay: true
}); });
await nwaku2.ensureSubscriptions([TestDecoder2.pubsubTopic]); await nwaku2.ensureSubscriptions([TestDecoder2.pubsubTopic]);
@ -153,7 +153,8 @@ describe("Waku Store, custom pubsub topic", function () {
}); });
}); });
describe("Waku Store (Autosharding), custom pubsub topic", function () { // TODO: blocked by https://github.com/waku-org/nwaku/issues/3362
describe.skip("Waku Store (Autosharding), custom pubsub topic", function () {
this.timeout(15000); this.timeout(15000);
let waku: LightNode; let waku: LightNode;
let nwaku: ServiceNode; let nwaku: ServiceNode;
@ -162,6 +163,7 @@ describe("Waku Store (Autosharding), custom pubsub topic", function () {
const customContentTopic1 = "/waku/2/content/utf8"; const customContentTopic1 = "/waku/2/content/utf8";
const customContentTopic2 = "/myapp/1/latest/proto"; const customContentTopic2 = "/myapp/1/latest/proto";
const clusterId = 5; const clusterId = 5;
const Shard2 = [1];
const autoshardingPubsubTopic1 = contentTopicToPubsubTopic( const autoshardingPubsubTopic1 = contentTopicToPubsubTopic(
customContentTopic1, customContentTopic1,
clusterId clusterId
@ -244,10 +246,10 @@ describe("Waku Store (Autosharding), custom pubsub topic", function () {
nwaku2 = new ServiceNode(makeLogFileName(this) + "2"); nwaku2 = new ServiceNode(makeLogFileName(this) + "2");
await nwaku2.start({ await nwaku2.start({
store: true, store: true,
pubsubTopic: [autoshardingPubsubTopic2],
contentTopic: [customContentTopic2], contentTopic: [customContentTopic2],
relay: true, relay: true,
clusterId clusterId,
shard: Shard2
}); });
await nwaku2.ensureSubscriptionsAutosharding([customContentTopic2]); await nwaku2.ensureSubscriptionsAutosharding([customContentTopic2]);
@ -368,9 +370,9 @@ describe("Waku Store (named sharding), custom pubsub topic", function () {
nwaku2 = new ServiceNode(makeLogFileName(this) + "2"); nwaku2 = new ServiceNode(makeLogFileName(this) + "2");
await nwaku2.start({ await nwaku2.start({
store: true, store: true,
pubsubTopic: [TestDecoder2.pubsubTopic],
relay: true, relay: true,
clusterId: TestShardInfo.clusterId clusterId: TestShardInfo.clusterId,
shard: TestShardInfo.shards
}); });
await nwaku2.ensureSubscriptions([TestDecoder2.pubsubTopic]); await nwaku2.ensureSubscriptions([TestDecoder2.pubsubTopic]);

View File

@ -13,7 +13,7 @@ import {
tearDownNodes tearDownNodes
} from "../src/index.js"; } from "../src/index.js";
import { runNodes } from "./filter/single_node/utils.js"; import { runNodes } from "./light-push/utils.js";
chai.use(chaiAsPromised); chai.use(chaiAsPromised);

View File

@ -50,7 +50,8 @@ describe("Wait for remote peer", function () {
store: false, store: false,
filter: false, filter: false,
lightpush: false, lightpush: false,
pubsubTopic: [DefaultTestPubsubTopic] clusterId: DefaultTestShardInfo.clusterId,
shard: DefaultTestShardInfo.shards
}); });
const multiAddrWithId = await nwaku.getMultiaddrWithId(); const multiAddrWithId = await nwaku.getMultiaddrWithId();