chore: upgrade nwaku to 0.34.0 and update tests suite for compatibility (#2170)

* chore: upgrade nwaku to v0.33.1

* chore: upgrade to nwaku 0.34.0

* feat: connect nwaku nodes amongst each other over relay

* chore(lightpush): use multiple service nodes for lightpush (instead of just one)
- nwaku now expects >=1 nodes at least connected

* chore: all single-node lightpush requests should now be expected to fail

* chore: update sharding tests

* chore: update tests

* chore: improve Docker network config reliability

* chore: deduplicate ecies encrypted payloads

* chore: update to precise expects

* fix: return early if expect passes

* chore: lightpush 5 times instead of 30

* fix: non duplicacy should happen in application-specific scenario

* chore: update mocha config + fix epehermal tests

* chore: reinstall deps after rebase

* chore: attempt stability for test suite

* fix: store tests to now use multiple nodes, delete uneeded test

* fix: memory leak

* chore: switch while loop with timeout-promise

* chore: remove redundant nodes startup

* chore: add delays for nwaku setup
This commit is contained in:
Danish Arora 2025-02-17 19:21:33 +05:30 committed by GitHub
parent 0a0a92bccb
commit 26ab836900
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 695 additions and 706 deletions

View File

@ -122,14 +122,14 @@ jobs:
uses: ./.github/workflows/test-node.yml uses: ./.github/workflows/test-node.yml
secrets: inherit secrets: inherit
with: with:
nim_wakunode_image: ${{ inputs.nim_wakunode_image || 'wakuorg/nwaku:v0.31.0' }} nim_wakunode_image: ${{ inputs.nim_wakunode_image || 'wakuorg/nwaku:v0.34.0' }}
test_type: node test_type: node
allure_reports: true allure_reports: true
node_optional: node_optional:
uses: ./.github/workflows/test-node.yml uses: ./.github/workflows/test-node.yml
with: with:
nim_wakunode_image: ${{ inputs.nim_wakunode_image || 'wakuorg/nwaku:v0.31.0' }} nim_wakunode_image: ${{ inputs.nim_wakunode_image || 'wakuorg/nwaku:v0.34.0' }}
test_type: node-optional test_type: node-optional
node_with_nwaku_master: node_with_nwaku_master:

View File

@ -33,6 +33,7 @@ env:
jobs: jobs:
node: node:
runs-on: ubuntu-latest runs-on: ubuntu-latest
timeout-minutes: 60 # Add a 1-hour timeout to fail faster
env: env:
WAKUNODE_IMAGE: ${{ inputs.nim_wakunode_image }} WAKUNODE_IMAGE: ${{ inputs.nim_wakunode_image }}
ALLURE_REPORTS: ${{ inputs.allure_reports }} ALLURE_REPORTS: ${{ inputs.allure_reports }}

View File

@ -7,7 +7,8 @@ const config = {
'loader=ts-node/esm' 'loader=ts-node/esm'
], ],
exit: true, exit: true,
retries: 4 retries: 2,
timeout: 150_000
}; };
if (process.env.CI) { if (process.env.CI) {

View File

@ -18,7 +18,7 @@ export default class Dockerode {
public containerId?: string; public containerId?: string;
private static network: Docker.Network; private static network: Docker.Network;
private containerIp: string; public readonly containerIp: string;
private constructor(imageName: string, containerIp: string) { private constructor(imageName: string, containerIp: string) {
this.docker = new Docker(); this.docker = new Docker();
@ -107,6 +107,7 @@ export default class Dockerode {
const container = await this.docker.createContainer({ const container = await this.docker.createContainer({
Image: this.IMAGE_NAME, Image: this.IMAGE_NAME,
HostConfig: { HostConfig: {
NetworkMode: NETWORK_NAME,
AutoRemove: true, AutoRemove: true,
PortBindings: { PortBindings: {
[`${restPort}/tcp`]: [{ HostPort: restPort.toString() }], [`${restPort}/tcp`]: [{ HostPort: restPort.toString() }],
@ -116,6 +117,8 @@ export default class Dockerode {
[`${discv5UdpPort}/udp`]: [{ HostPort: discv5UdpPort.toString() }] [`${discv5UdpPort}/udp`]: [{ HostPort: discv5UdpPort.toString() }]
}) })
}, },
Dns: ["8.8.8.8"],
Links: [],
Mounts: args.rlnRelayEthClientAddress Mounts: args.rlnRelayEthClientAddress
? [ ? [
{ {
@ -135,18 +138,19 @@ export default class Dockerode {
[`${discv5UdpPort}/udp`]: {} [`${discv5UdpPort}/udp`]: {}
}) })
}, },
Cmd: argsArrayWithIP Cmd: argsArrayWithIP,
}); NetworkingConfig: {
await container.start(); EndpointsConfig: {
[NETWORK_NAME]: {
await Dockerode.network.connect({ IPAMConfig: {
Container: container.id, IPv4Address: this.containerIp
EndpointConfig: { }
IPAMConfig: { }
IPv4Address: this.containerIp
} }
} }
}); });
await container.start();
const logStream = fs.createWriteStream(logPath); const logStream = fs.createWriteStream(logPath);
container.logs( container.logs(

View File

@ -29,24 +29,29 @@ export class ServiceNodesFleet {
_args?: Args, _args?: Args,
withoutFilter = false withoutFilter = false
): Promise<ServiceNodesFleet> { ): Promise<ServiceNodesFleet> {
const serviceNodePromises = Array.from( const nodes: ServiceNode[] = [];
{ length: nodesToCreate },
async () => {
const node = new ServiceNode(
makeLogFileName(mochaContext) +
Math.random().toString(36).substring(7)
);
const args = getArgs(networkConfig, _args); for (let i = 0; i < nodesToCreate; i++) {
await node.start(args, { const node = new ServiceNode(
retries: 3 makeLogFileName(mochaContext) + Math.random().toString(36).substring(7)
}); );
return node; const args = getArgs(networkConfig, _args);
// If this is not the first node and previous node had a nodekey, use its multiaddr as static node
if (i > 0) {
const prevNode = nodes[i - 1];
const multiaddr = await prevNode.getExternalWebsocketMultiaddr();
args.staticnode = multiaddr;
} }
);
const nodes = await Promise.all(serviceNodePromises); await node.start(args, {
retries: 3
});
nodes.push(node);
}
return new ServiceNodesFleet(nodes, withoutFilter, strictChecking); return new ServiceNodesFleet(nodes, withoutFilter, strictChecking);
} }
@ -107,7 +112,24 @@ export class ServiceNodesFleet {
return relayMessages.every((message) => message); return relayMessages.every((message) => message);
} }
public async confirmMessageLength(numMessages: number): Promise<void> { public async confirmMessageLength(
numMessages: number,
{ encryptedPayload }: { encryptedPayload?: boolean } = {
encryptedPayload: false
}
): Promise<void> {
if (encryptedPayload) {
const filteredMessageList = Array.from(
new Set(
this.messageCollector.messageList
.filter((msg) => msg.payload?.toString)
.map((msg) => msg.payload.toString())
)
);
expect(filteredMessageList.length).to.equal(numMessages);
return;
}
if (this.strictChecking) { if (this.strictChecking) {
await Promise.all( await Promise.all(
this.nodes.map(async (node) => this.nodes.map(async (node) =>
@ -132,7 +154,7 @@ export class ServiceNodesFleet {
class MultipleNodesMessageCollector { class MultipleNodesMessageCollector {
public callback: (msg: DecodedMessage) => void = () => {}; public callback: (msg: DecodedMessage) => void = () => {};
protected messageList: Array<DecodedMessage> = []; public readonly messageList: Array<DecodedMessage> = [];
public constructor( public constructor(
private messageCollectors: MessageCollector[], private messageCollectors: MessageCollector[],
private relayNodes?: ServiceNode[], private relayNodes?: ServiceNode[],
@ -182,21 +204,21 @@ class MultipleNodesMessageCollector {
} }
): boolean { ): boolean {
if (this.strictChecking) { if (this.strictChecking) {
return this.messageCollectors.every((collector) => { return this.messageCollectors.every((collector, _i) => {
try { try {
collector.verifyReceivedMessage(index, options); collector.verifyReceivedMessage(index, options);
return true; // Verification successful return true;
} catch (error) { } catch (error) {
return false; // Verification failed, continue with the next collector return false;
} }
}); });
} else { } else {
return this.messageCollectors.some((collector) => { return this.messageCollectors.some((collector, _i) => {
try { try {
collector.verifyReceivedMessage(index, options); collector.verifyReceivedMessage(index, options);
return true; // Verification successful return true;
} catch (error) { } catch (error) {
return false; // Verification failed, continue with the next collector return false;
} }
}); });
} }
@ -213,37 +235,122 @@ class MultipleNodesMessageCollector {
exact?: boolean; exact?: boolean;
} }
): Promise<boolean> { ): Promise<boolean> {
const startTime = Date.now();
const pubsubTopic = options?.pubsubTopic || DefaultTestPubsubTopic; const pubsubTopic = options?.pubsubTopic || DefaultTestPubsubTopic;
const timeoutDuration = options?.timeoutDuration || 400; const timeoutDuration = options?.timeoutDuration || 400;
const maxTimeout = Math.min(timeoutDuration * numMessages, 30000);
const exact = options?.exact || false;
try {
const timeoutPromise = new Promise<boolean>((resolve) => {
setTimeout(() => {
log.warn(`Timeout waiting for messages after ${maxTimeout}ms`);
resolve(false);
}, maxTimeout);
});
const checkMessagesPromise = new Promise<boolean>((resolve) => {
const checkMessages = (): void => {
// Check local messages
if (this.messageList.length >= numMessages) {
if (exact && this.messageList.length !== numMessages) {
log.warn(
`Was expecting exactly ${numMessages} messages. Received: ${this.messageList.length}`
);
resolve(false);
return;
}
resolve(true);
return;
}
if (this.relayNodes) {
void Promise.all(
this.relayNodes.map((node) => node.messages(pubsubTopic))
).then((nodeMessages) => {
const hasEnoughMessages = this.strictChecking
? nodeMessages.every((msgs) => msgs.length >= numMessages)
: nodeMessages.some((msgs) => msgs.length >= numMessages);
if (hasEnoughMessages) {
resolve(true);
return;
}
setTimeout(checkMessages, 100);
});
} else {
setTimeout(checkMessages, 100);
}
};
// Start checking
checkMessages();
});
// Race between timeout and message checking
return Promise.race([timeoutPromise, checkMessagesPromise]);
} catch (error) {
log.error("Error in waitForMessages:", error);
return false;
}
}
/**
* Waits for a total number of messages across all nodes using autosharding.
*/
public async waitForMessagesAutosharding(
numMessages: number,
options?: {
contentTopic: string;
timeoutDuration?: number;
exact?: boolean;
}
): Promise<boolean> {
const startTime = Date.now();
const timeoutDuration = options?.timeoutDuration || 400;
const exact = options?.exact || false; const exact = options?.exact || false;
while (this.messageList.length < numMessages) { while (this.messageList.length < numMessages) {
if (this.relayNodes) { if (this.relayNodes) {
if (this.strictChecking) { if (this.strictChecking) {
// In strict mode, all nodes must have the messages
const results = await Promise.all( const results = await Promise.all(
this.relayNodes.map(async (node) => { this.messageCollectors.map(async (collector) => {
const msgs = await node.messages(pubsubTopic); return collector.waitForMessagesAutosharding(
return msgs.length >= numMessages; numMessages,
options
);
}) })
); );
return results.every((result) => result); if (results.every((result) => result)) {
return true;
}
} else { } else {
// In non-strict mode, at least one node must have the messages
const results = await Promise.all( const results = await Promise.all(
this.relayNodes.map(async (node) => { this.messageCollectors.map(async (collector) => {
const msgs = await node.messages(pubsubTopic); return collector.waitForMessagesAutosharding(
return msgs.length >= numMessages; numMessages,
options
);
}) })
); );
return results.some((result) => result); if (results.some((result) => result)) {
return true;
}
} }
}
if (Date.now() - startTime > timeoutDuration * numMessages) { if (Date.now() - startTime > timeoutDuration * numMessages) {
return false; return false;
} }
await delay(10); await delay(10);
} else {
// If no relay nodes, just wait for messages in the list
if (Date.now() - startTime > timeoutDuration * numMessages) {
return false;
}
await delay(10);
}
} }
if (exact) { if (exact) {
@ -253,7 +360,6 @@ class MultipleNodesMessageCollector {
log.warn( log.warn(
`Was expecting exactly ${numMessages} messages. Received: ${this.messageList.length}` `Was expecting exactly ${numMessages} messages. Received: ${this.messageList.length}`
); );
return false; return false;
} }
} else { } else {

View File

@ -27,7 +27,7 @@ const WAKU_SERVICE_NODE_PARAMS =
const NODE_READY_LOG_LINE = "Node setup complete"; const NODE_READY_LOG_LINE = "Node setup complete";
export const DOCKER_IMAGE_NAME = export const DOCKER_IMAGE_NAME =
process.env.WAKUNODE_IMAGE || "wakuorg/nwaku:v0.31.0"; process.env.WAKUNODE_IMAGE || "wakuorg/nwaku:v0.34.0";
const isGoWaku = DOCKER_IMAGE_NAME.includes("go-waku"); const isGoWaku = DOCKER_IMAGE_NAME.includes("go-waku");
@ -402,6 +402,15 @@ export class ServiceNode {
throw `${this.type} container hasn't started`; throw `${this.type} container hasn't started`;
} }
} }
public async getExternalWebsocketMultiaddr(): Promise<string | undefined> {
if (!this.docker?.container) {
return undefined;
}
const containerIp = this.docker.containerIp;
const peerId = await this.getPeerId();
return `/ip4/${containerIp}/tcp/${this.websocketPort}/ws/p2p/${peerId}`;
}
} }
export function defaultArgs(): Args { export function defaultArgs(): Args {
@ -422,3 +431,20 @@ interface RpcInfoResponse {
listenAddresses: string[]; listenAddresses: string[];
enrUri?: string; enrUri?: string;
} }
export async function verifyServiceNodesConnected(
nodes: ServiceNode[]
): Promise<boolean> {
for (const node of nodes) {
const peers = await node.peers();
log.info(`Service node ${node.containerName} peers:`, peers.length);
log.info(`Service node ${node.containerName} peers:`, peers);
if (nodes.length > 1 && peers.length === 0) {
log.error(`Service node ${node.containerName} has no peers connected`);
return false;
}
}
return true;
}

View File

@ -3,7 +3,7 @@ import { promisify } from "util";
const execAsync = promisify(exec); const execAsync = promisify(exec);
const WAKUNODE_IMAGE = process.env.WAKUNODE_IMAGE || "wakuorg/nwaku:v0.31.0"; const WAKUNODE_IMAGE = process.env.WAKUNODE_IMAGE || "wakuorg/nwaku:v0.34.0";
async function main() { async function main() {
try { try {
@ -40,6 +40,15 @@ async function main() {
mocha.on("exit", (code) => { mocha.on("exit", (code) => {
console.log(`Mocha tests exited with code ${code}`); console.log(`Mocha tests exited with code ${code}`);
try {
execAsync(
`docker ps -q -f "ancestor=${WAKUNODE_IMAGE}" | xargs -r docker stop`
).catch((error) => {
console.error("Error cleaning up containers:", error);
});
} catch (error) {
console.error("Error cleaning up containers:", error);
}
process.exit(code || 0); process.exit(code || 0);
}); });
} }

View File

@ -7,7 +7,7 @@ import { ServiceNode } from "./lib/index.js";
const execAsync = promisify(exec); const execAsync = promisify(exec);
const WAKUNODE_IMAGE = process.env.WAKUNODE_IMAGE || "wakuorg/nwaku:v0.31.0"; const WAKUNODE_IMAGE = process.env.WAKUNODE_IMAGE || "wakuorg/nwaku:v0.34.0";
const containerName = "rln_tree"; const containerName = "rln_tree";
async function syncRlnTree() { async function syncRlnTree() {

View File

@ -7,12 +7,13 @@ import {
Protocols Protocols
} from "@waku/interfaces"; } from "@waku/interfaces";
import { createLightNode } from "@waku/sdk"; import { createLightNode } from "@waku/sdk";
import { derivePubsubTopicsFromNetworkConfig } from "@waku/utils"; import { delay, derivePubsubTopicsFromNetworkConfig } from "@waku/utils";
import { Context } from "mocha"; import { Context } from "mocha";
import pRetry from "p-retry"; import pRetry from "p-retry";
import { NOISE_KEY_1 } from "../constants.js"; import { NOISE_KEY_1 } from "../constants.js";
import { ServiceNodesFleet } from "../lib/index.js"; import { ServiceNodesFleet } from "../lib/index.js";
import { verifyServiceNodesConnected } from "../lib/service_node.js";
import { Args } from "../types.js"; import { Args } from "../types.js";
import { waitForConnections } from "./waitForConnections.js"; import { waitForConnections } from "./waitForConnections.js";
@ -35,6 +36,13 @@ export async function runMultipleNodes(
withoutFilter withoutFilter
); );
if (numServiceNodes > 1) {
const success = await verifyServiceNodesConnected(serviceNodes.nodes);
if (!success) {
throw new Error("Failed to verify that service nodes are connected");
}
}
const wakuOptions: CreateNodeOptions = { const wakuOptions: CreateNodeOptions = {
staticNoiseKey: NOISE_KEY_1, staticNoiseKey: NOISE_KEY_1,
libp2p: { libp2p: {
@ -50,24 +58,34 @@ export async function runMultipleNodes(
throw new Error("Failed to initialize waku"); throw new Error("Failed to initialize waku");
} }
//TODO: reinvestigate the need for these delays with nwaku:0.35.0: https://github.com/waku-org/js-waku/issues/2264
await delay(2000);
for (const node of serviceNodes.nodes) { for (const node of serviceNodes.nodes) {
await waku.dial(await node.getMultiaddrWithId()); await waku.dial(await node.getMultiaddrWithId());
await waku.waitForPeers([Protocols.Filter, Protocols.LightPush]); await waku.waitForPeers([Protocols.Filter, Protocols.LightPush]);
await node.ensureSubscriptions( const success = await node.ensureSubscriptions(
derivePubsubTopicsFromNetworkConfig(networkConfig) derivePubsubTopicsFromNetworkConfig(networkConfig)
); );
if (!success) {
const wakuConnections = waku.libp2p.getConnections(); throw new Error("Failed to ensure subscriptions");
if (wakuConnections.length < 1) {
throw new Error(`Expected at least 1 connection for js-waku.`);
} }
//TODO: reinvestigate the need for these delays with nwaku:0.35.0: https://github.com/waku-org/js-waku/issues/2264
await delay(2000);
await node.waitForLog(waku.libp2p.peerId.toString(), 100); await node.waitForLog(waku.libp2p.peerId.toString(), 100);
} }
await waitForConnections(numServiceNodes, waku); await waitForConnections(numServiceNodes, waku);
const wakuConnections = waku.libp2p.getConnections();
if (wakuConnections.length < numServiceNodes) {
throw new Error(
`Expected at least ${numServiceNodes} connections for js-waku.`
);
}
return [serviceNodes, waku]; return [serviceNodes, waku];
} }

View File

@ -6,6 +6,8 @@ import { ServiceNode } from "../lib/service_node.js";
const log = new Logger("test:teardown"); const log = new Logger("test:teardown");
const TEARDOWN_TIMEOUT = 10000; // 10 seconds timeout for teardown
export async function tearDownNodes( export async function tearDownNodes(
nwakuNodes: ServiceNode | ServiceNode[], nwakuNodes: ServiceNode | ServiceNode[],
wakuNodes: IWaku | IWaku[] wakuNodes: IWaku | IWaku[]
@ -13,37 +15,47 @@ export async function tearDownNodes(
const nNodes = Array.isArray(nwakuNodes) ? nwakuNodes : [nwakuNodes]; const nNodes = Array.isArray(nwakuNodes) ? nwakuNodes : [nwakuNodes];
const wNodes = Array.isArray(wakuNodes) ? wakuNodes : [wakuNodes]; const wNodes = Array.isArray(wakuNodes) ? wakuNodes : [wakuNodes];
const stopNwakuNodes = nNodes.map(async (nwaku) => { try {
if (nwaku) { // Use Promise.race to implement timeout
await pRetry( const teardownPromise = Promise.all([
async () => { ...nNodes.map(async (nwaku) => {
try { if (nwaku) {
await nwaku.stop(); await pRetry(
} catch (error) { async () => {
log.error("Nwaku failed to stop:", error); try {
throw error; await nwaku.stop();
} } catch (error) {
}, log.error("Nwaku failed to stop:", error);
{ retries: 3 } throw error;
); }
} },
}); { retries: 3, minTimeout: 1000 }
);
const stopWakuNodes = wNodes.map(async (waku) => { }
if (waku) { }),
await pRetry( ...wNodes.map(async (waku) => {
async () => { if (waku) {
try { try {
await waku.stop(); await waku.stop();
} catch (error) { } catch (error) {
log.error("Waku failed to stop:", error); log.error("Waku failed to stop:", error);
throw error;
} }
}, }
{ retries: 3 } })
); ]);
}
});
await Promise.all([...stopNwakuNodes, ...stopWakuNodes]); await Promise.race([
teardownPromise,
new Promise((_, reject) =>
setTimeout(
() => reject(new Error("Teardown timeout")),
TEARDOWN_TIMEOUT
)
)
]);
} catch (error) {
log.error("Teardown failed:", error);
// Force process cleanup if needed
process.exit(1);
}
} }

View File

@ -56,7 +56,7 @@ describe("DNS Node Discovery [live data]", function () {
}); });
it(`should use DNS peer discovery with light client`, async function () { it(`should use DNS peer discovery with light client`, async function () {
this.timeout(100000); this.timeout(100_000);
const maxQuantity = 3; const maxQuantity = 3;
const nodeRequirements = { const nodeRequirements = {

View File

@ -23,11 +23,11 @@ import {
afterEachCustom, afterEachCustom,
beforeEachCustom, beforeEachCustom,
delay, delay,
makeLogFileName,
NOISE_KEY_1, NOISE_KEY_1,
NOISE_KEY_2, NOISE_KEY_2,
ServiceNode, runMultipleNodes,
tearDownNodes ServiceNodesFleet,
teardownNodesWithRedundancy
} from "../src/index.js"; } from "../src/index.js";
const log = new Logger("test:ephemeral"); const log = new Logger("test:ephemeral");
@ -76,41 +76,39 @@ const SymDecoder = createSymDecoder(SymContentTopic, symKey, PubsubTopic);
describe("Waku Message Ephemeral field", function () { describe("Waku Message Ephemeral field", function () {
let waku: LightNode; let waku: LightNode;
let nwaku: ServiceNode; let serviceNodes: ServiceNodesFleet;
afterEachCustom(this, async () => { afterEachCustom(this, async () => {
await tearDownNodes(nwaku, waku); await teardownNodesWithRedundancy(serviceNodes, waku);
}); });
beforeEachCustom(this, async () => { beforeEachCustom(this, async () => {
nwaku = new ServiceNode(makeLogFileName(this.ctx)); [serviceNodes, waku] = await runMultipleNodes(
await nwaku.start({ this.ctx,
filter: true, {
lightpush: true, clusterId: ClusterId,
store: true, contentTopics: [TestContentTopic, AsymContentTopic, SymContentTopic]
relay: true, },
pubsubTopic: [PubsubTopic], {
contentTopic: [TestContentTopic, AsymContentTopic, SymContentTopic], filter: true,
clusterId: ClusterId lightpush: true,
}); store: true,
await nwaku.ensureSubscriptionsAutosharding([ relay: true,
TestContentTopic, pubsubTopic: [PubsubTopic]
AsymContentTopic, },
SymContentTopic true,
]); 2
);
waku = await createLightNode({ await Promise.all(
staticNoiseKey: NOISE_KEY_1, serviceNodes.nodes.map(async (node) => {
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }, await node.ensureSubscriptionsAutosharding([
networkConfig: { TestContentTopic,
contentTopics: [TestContentTopic, AsymContentTopic, SymContentTopic], AsymContentTopic,
clusterId: ClusterId SymContentTopic
} ]);
}); })
await waku.start(); );
await waku.dial(await nwaku.getMultiaddrWithId());
await waku.waitForPeers([Protocols.Filter, Protocols.LightPush]);
}); });
it("Ephemeral messages are not stored", async function () { it("Ephemeral messages are not stored", async function () {
@ -130,7 +128,7 @@ describe("Waku Message Ephemeral field", function () {
payload: utf8ToBytes(clearText) payload: utf8ToBytes(clearText)
}; };
const [waku1, waku2, nimWakuMultiaddr] = await Promise.all([ const [waku1, waku2] = await Promise.all([
createLightNode({ createLightNode({
staticNoiseKey: NOISE_KEY_1, staticNoiseKey: NOISE_KEY_1,
networkConfig: { networkConfig: {
@ -144,16 +142,18 @@ describe("Waku Message Ephemeral field", function () {
contentTopics: [TestContentTopic, AsymContentTopic, SymContentTopic], contentTopics: [TestContentTopic, AsymContentTopic, SymContentTopic],
clusterId: ClusterId clusterId: ClusterId
} }
}).then((waku) => waku.start().then(() => waku)), }).then((waku) => waku.start().then(() => waku))
nwaku.getMultiaddrWithId()
]); ]);
log.info("Waku nodes created"); log.info("Waku nodes created");
await Promise.all([ await Promise.all(
waku1.dial(nimWakuMultiaddr), serviceNodes.nodes.map(async (node) => {
waku2.dial(nimWakuMultiaddr) const multiaddr = await node.getMultiaddrWithId();
]); await waku1.dial(multiaddr);
await waku2.dial(multiaddr);
})
);
log.info("Waku nodes connected to nwaku"); log.info("Waku nodes connected to nwaku");
@ -186,7 +186,7 @@ describe("Waku Message Ephemeral field", function () {
expect(messages?.length).eq(0); expect(messages?.length).eq(0);
await tearDownNodes([], [waku1, waku2]); await teardownNodesWithRedundancy(serviceNodes, [waku1, waku2]);
}); });
it("Ephemeral field is preserved - encoder v0", async function () { it("Ephemeral field is preserved - encoder v0", async function () {

View File

@ -102,8 +102,7 @@ const runTests = (strictCheckNodes: boolean): void => {
expectedVersion: 1, expectedVersion: 1,
expectedPubsubTopic: TestPubsubTopic expectedPubsubTopic: TestPubsubTopic
}); });
await serviceNodes.confirmMessageLength(1, { encryptedPayload: true });
await serviceNodes.confirmMessageLength(1);
}); });
it("Subscribe and receive symmetrically encrypted messages via lightPush", async function () { it("Subscribe and receive symmetrically encrypted messages via lightPush", async function () {
@ -136,7 +135,7 @@ const runTests = (strictCheckNodes: boolean): void => {
expectedPubsubTopic: TestPubsubTopic expectedPubsubTopic: TestPubsubTopic
}); });
await serviceNodes.confirmMessageLength(1); await serviceNodes.confirmMessageLength(1, { encryptedPayload: true });
}); });
it("Subscribe and receive messages via waku relay post", async function () { it("Subscribe and receive messages via waku relay post", async function () {

View File

@ -50,7 +50,7 @@ const runTests = (strictNodeCheck: boolean): void => {
const pushResponse = await waku.lightPush.send(TestEncoder, { const pushResponse = await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes(testItem.value) payload: utf8ToBytes(testItem.value)
}); });
expect(pushResponse.successes.length).to.eq(numServiceNodes); expect(pushResponse.successes.length).to.equal(numServiceNodes);
expect( expect(
await serviceNodes.messageCollector.waitForMessages(1, { await serviceNodes.messageCollector.waitForMessages(1, {
@ -65,24 +65,24 @@ const runTests = (strictNodeCheck: boolean): void => {
}); });
}); });
it("Push 30 different messages", async function () { it("Push 5 different messages", async function () {
const generateMessageText = (index: number): string => `M${index}`; const generateMessageText = (index: number): string => `M${index}`;
for (let i = 0; i < 30; i++) { for (let i = 0; i < 5; i++) {
const pushResponse = await waku.lightPush.send(TestEncoder, { const pushResponse = await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes(generateMessageText(i)) payload: utf8ToBytes(generateMessageText(i))
}); });
if (pushResponse.failures.length > 0) console.log(i + 1);
expect(pushResponse.successes.length).to.eq(numServiceNodes); expect(pushResponse.successes.length).to.equal(numServiceNodes);
} }
expect( expect(
await serviceNodes.messageCollector.waitForMessages(30, { await serviceNodes.messageCollector.waitForMessages(5, {
pubsubTopic: TestPubsubTopic pubsubTopic: TestPubsubTopic
}) })
).to.eq(true); ).to.eq(true);
for (let i = 0; i < 30; i++) { for (let i = 0; i < 5; i++) {
serviceNodes.messageCollector.verifyReceivedMessage(i, { serviceNodes.messageCollector.verifyReceivedMessage(i, {
expectedMessageText: generateMessageText(i), expectedMessageText: generateMessageText(i),
expectedContentTopic: TestContentTopic, expectedContentTopic: TestContentTopic,
@ -119,7 +119,7 @@ const runTests = (strictNodeCheck: boolean): void => {
customEncoder, customEncoder,
messagePayload messagePayload
); );
expect(pushResponse.successes.length).to.eq(numServiceNodes); expect(pushResponse.successes.length).to.equal(numServiceNodes);
expect( expect(
await serviceNodes.messageCollector.waitForMessages(1, { await serviceNodes.messageCollector.waitForMessages(1, {
@ -156,7 +156,7 @@ const runTests = (strictNodeCheck: boolean): void => {
customTestEncoder, customTestEncoder,
messagePayload messagePayload
); );
expect(pushResponse.successes.length).to.eq(numServiceNodes); expect(pushResponse.successes.length).to.equal(numServiceNodes);
expect( expect(
await serviceNodes.messageCollector.waitForMessages(1, { await serviceNodes.messageCollector.waitForMessages(1, {
@ -190,7 +190,7 @@ const runTests = (strictNodeCheck: boolean): void => {
); );
if (serviceNodes.type == "go-waku") { if (serviceNodes.type == "go-waku") {
expect(pushResponse.successes.length).to.eq(numServiceNodes); expect(pushResponse.successes.length).to.equal(numServiceNodes);
expect( expect(
await serviceNodes.messageCollector.waitForMessages(1, { await serviceNodes.messageCollector.waitForMessages(1, {
pubsubTopic: TestPubsubTopic pubsubTopic: TestPubsubTopic
@ -229,7 +229,7 @@ const runTests = (strictNodeCheck: boolean): void => {
payload: utf8ToBytes(messageText), payload: utf8ToBytes(messageText),
rateLimitProof: rateLimitProof rateLimitProof: rateLimitProof
}); });
expect(pushResponse.successes.length).to.eq(numServiceNodes); expect(pushResponse.successes.length).to.equal(numServiceNodes);
expect( expect(
await serviceNodes.messageCollector.waitForMessages(1, { await serviceNodes.messageCollector.waitForMessages(1, {
@ -253,7 +253,7 @@ const runTests = (strictNodeCheck: boolean): void => {
payload: utf8ToBytes(messageText), payload: utf8ToBytes(messageText),
timestamp: new Date(testItem) timestamp: new Date(testItem)
}); });
expect(pushResponse.successes.length).to.eq(numServiceNodes); expect(pushResponse.successes.length).to.equal(numServiceNodes);
expect( expect(
await serviceNodes.messageCollector.waitForMessages(1, { await serviceNodes.messageCollector.waitForMessages(1, {
@ -274,7 +274,7 @@ const runTests = (strictNodeCheck: boolean): void => {
const pushResponse = await waku.lightPush.send(TestEncoder, { const pushResponse = await waku.lightPush.send(TestEncoder, {
payload: bigPayload payload: bigPayload
}); });
expect(pushResponse.successes.length).to.greaterThan(0); expect(pushResponse.successes.length).to.equal(numServiceNodes);
}); });
it("Fails to push message bigger that 1MB", async function () { it("Fails to push message bigger that 1MB", async function () {

View File

@ -7,7 +7,6 @@ import {
afterEachCustom, afterEachCustom,
beforeEachCustom, beforeEachCustom,
generateRandomUint8Array, generateRandomUint8Array,
MessageCollector,
ServiceNode, ServiceNode,
tearDownNodes, tearDownNodes,
TEST_STRING TEST_STRING
@ -22,16 +21,16 @@ import {
TestShardInfo TestShardInfo
} from "../utils.js"; } from "../utils.js";
describe("Waku Light Push: Single Node", function () { // These tests are expected to fail as service nodes now require at least one more connected node: https://github.com/waku-org/nwaku/pull/2951/files
describe("Waku Light Push: Single Node: Fails as expected", function () {
// Set the timeout for all tests in this suite. Can be overwritten at test level // Set the timeout for all tests in this suite. Can be overwritten at test level
this.timeout(15000); this.timeout(15000);
let waku: LightNode; let waku: LightNode;
let nwaku: ServiceNode; let nwaku: ServiceNode;
let messageCollector: MessageCollector;
beforeEachCustom(this, async () => { beforeEachCustom(this, async () => {
[nwaku, waku] = await runNodes(this.ctx, TestShardInfo); [nwaku, waku] = await runNodes(this.ctx, TestShardInfo);
messageCollector = new MessageCollector(nwaku);
await nwaku.ensureSubscriptions([TestPubsubTopic]); await nwaku.ensureSubscriptions([TestPubsubTopic]);
}); });
@ -45,18 +44,9 @@ describe("Waku Light Push: Single Node", function () {
const pushResponse = await waku.lightPush.send(TestEncoder, { const pushResponse = await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes(testItem.value) payload: utf8ToBytes(testItem.value)
}); });
expect(pushResponse.successes.length).to.eq(1); // Expect failure since node requires another connected node
expect(pushResponse.successes.length).to.eq(0);
expect( expect(pushResponse.failures?.length).to.be.greaterThan(0);
await messageCollector.waitForMessages(1, {
pubsubTopic: TestPubsubTopic
})
).to.eq(true);
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: testItem.value,
expectedContentTopic: TestContentTopic,
expectedPubsubTopic: TestPubsubTopic
});
}); });
}); });
@ -67,73 +57,9 @@ describe("Waku Light Push: Single Node", function () {
const pushResponse = await waku.lightPush.send(TestEncoder, { const pushResponse = await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes(generateMessageText(i)) payload: utf8ToBytes(generateMessageText(i))
}); });
expect(pushResponse.successes.length).to.eq(1); // Expect failure since node requires another connected node
} expect(pushResponse.successes.length).to.eq(0);
expect(pushResponse.failures?.length).to.be.greaterThan(0);
expect(
await messageCollector.waitForMessages(30, {
pubsubTopic: TestPubsubTopic
})
).to.eq(true);
for (let i = 0; i < 30; i++) {
messageCollector.verifyReceivedMessage(i, {
expectedMessageText: generateMessageText(i),
expectedContentTopic: TestContentTopic,
expectedPubsubTopic: TestPubsubTopic
});
}
});
it("Throws when trying to push message with empty payload", async function () {
const pushResponse = await waku.lightPush.send(TestEncoder, {
payload: new Uint8Array()
});
expect(pushResponse.successes.length).to.eq(0);
expect(pushResponse.failures?.map((failure) => failure.error)).to.include(
ProtocolError.EMPTY_PAYLOAD
);
expect(
await messageCollector.waitForMessages(1, {
pubsubTopic: TestPubsubTopic
})
).to.eq(false);
});
TEST_STRING.forEach((testItem) => {
it(`Push message with content topic containing ${testItem.description}`, async function () {
const customEncoder = createEncoder({
contentTopic: testItem.value,
pubsubTopic: TestPubsubTopic
});
const pushResponse = await waku.lightPush.send(
customEncoder,
messagePayload
);
expect(pushResponse.successes.length).to.eq(1);
expect(
await messageCollector.waitForMessages(1, {
pubsubTopic: TestPubsubTopic
})
).to.eq(true);
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: testItem.value,
expectedPubsubTopic: TestPubsubTopic
});
});
});
it("Fails to push message with empty content topic", async function () {
try {
createEncoder({ contentTopic: "" });
expect.fail("Expected an error but didn't get one");
} catch (error) {
expect((error as Error).message).to.equal(
"Content topic must be specified"
);
} }
}); });
@ -148,62 +74,19 @@ describe("Waku Light Push: Single Node", function () {
customTestEncoder, customTestEncoder,
messagePayload messagePayload
); );
expect(pushResponse.successes.length).to.eq(1); expect(pushResponse.successes.length).to.eq(0);
expect(pushResponse.failures?.length).to.be.greaterThan(0);
expect(
await messageCollector.waitForMessages(1, {
pubsubTopic: TestPubsubTopic
})
).to.eq(true);
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: TestContentTopic,
expectedPubsubTopic: TestPubsubTopic
});
}); });
it("Fails to push message with large meta", async function () { it("Fails to push message with empty payload", async function () {
const customTestEncoder = createEncoder({ const pushResponse = await waku.lightPush.send(TestEncoder, {
contentTopic: TestContentTopic, payload: new Uint8Array()
pubsubTopic: TestPubsubTopic,
metaSetter: () => new Uint8Array(105024) // see the note below ***
}); });
// *** note: this test used 10 ** 6 when `nwaku` node had MaxWakuMessageSize == 1MiB ( 1*2^20 .) expect(pushResponse.successes.length).to.eq(0);
// `nwaku` establishes the max lightpush msg size as `const MaxRpcSize* = MaxWakuMessageSize + 64 * 1024` expect(pushResponse.failures?.map((failure) => failure.error)).to.include(
// see: https://github.com/waku-org/nwaku/blob/07beea02095035f4f4c234ec2dec1f365e6955b8/waku/waku_lightpush/rpc_codec.nim#L15 ProtocolError.EMPTY_PAYLOAD
// In the PR https://github.com/waku-org/nwaku/pull/2298 we reduced the MaxWakuMessageSize
// from 1MiB to 150KiB. Therefore, the 105024 number comes from substracting ( 1*2^20 - 150*2^10 )
// to the original 10^6 that this test had when MaxWakuMessageSize == 1*2^20
const pushResponse = await waku.lightPush.send(
customTestEncoder,
messagePayload
); );
if (nwaku.type == "go-waku") {
expect(pushResponse.successes.length).to.eq(1);
expect(
await messageCollector.waitForMessages(1, {
pubsubTopic: TestPubsubTopic
})
).to.eq(true);
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: TestContentTopic,
expectedPubsubTopic: TestPubsubTopic
});
} else {
expect(pushResponse.successes.length).to.eq(0);
expect(pushResponse.failures?.map((failure) => failure.error)).to.include(
ProtocolError.REMOTE_PEER_REJECTED
);
expect(
await messageCollector.waitForMessages(1, {
pubsubTopic: TestPubsubTopic
})
).to.eq(false);
}
}); });
it("Push message with rate limit", async function () { it("Push message with rate limit", async function () {
@ -221,18 +104,8 @@ describe("Waku Light Push: Single Node", function () {
payload: utf8ToBytes(messageText), payload: utf8ToBytes(messageText),
rateLimitProof: rateLimitProof rateLimitProof: rateLimitProof
}); });
expect(pushResponse.successes.length).to.eq(1); expect(pushResponse.successes.length).to.eq(0);
expect(pushResponse.failures?.length).to.be.greaterThan(0);
expect(
await messageCollector.waitForMessages(1, {
pubsubTopic: TestPubsubTopic
})
).to.eq(true);
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: TestContentTopic,
expectedPubsubTopic: TestPubsubTopic
});
}); });
[ [
@ -245,19 +118,8 @@ describe("Waku Light Push: Single Node", function () {
payload: utf8ToBytes(messageText), payload: utf8ToBytes(messageText),
timestamp: new Date(testItem) timestamp: new Date(testItem)
}); });
expect(pushResponse.successes.length).to.eq(1); expect(pushResponse.successes.length).to.eq(0);
expect(pushResponse.failures?.length).to.be.greaterThan(0);
expect(
await messageCollector.waitForMessages(1, {
pubsubTopic: TestPubsubTopic
})
).to.eq(true);
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedTimestamp: testItem,
expectedContentTopic: TestContentTopic,
expectedPubsubTopic: TestPubsubTopic
});
}); });
}); });
@ -266,7 +128,8 @@ describe("Waku Light Push: Single Node", function () {
const pushResponse = await waku.lightPush.send(TestEncoder, { const pushResponse = await waku.lightPush.send(TestEncoder, {
payload: bigPayload payload: bigPayload
}); });
expect(pushResponse.successes.length).to.greaterThan(0); expect(pushResponse.successes.length).to.eq(0);
expect(pushResponse.failures?.length).to.be.greaterThan(0);
}); });
it("Fails to push message bigger that 1MB", async function () { it("Fails to push message bigger that 1MB", async function () {
@ -279,10 +142,5 @@ describe("Waku Light Push: Single Node", function () {
expect(pushResponse.failures?.map((failure) => failure.error)).to.include( expect(pushResponse.failures?.map((failure) => failure.error)).to.include(
ProtocolError.SIZE_TOO_BIG ProtocolError.SIZE_TOO_BIG
); );
expect(
await messageCollector.waitForMessages(1, {
pubsubTopic: TestPubsubTopic
})
).to.eq(false);
}); });
}); });

View File

@ -16,24 +16,20 @@ import {
} from "@waku/utils"; } from "@waku/utils";
import { utf8ToBytes } from "@waku/utils/bytes"; import { utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai"; import { expect } from "chai";
import { Context } from "mocha";
import { import {
afterEachCustom, afterEachCustom,
beforeEachCustom, beforeEachCustom,
makeLogFileName, runMultipleNodes,
MessageCollector, ServiceNodesFleet,
ServiceNode,
tearDownNodes tearDownNodes
} from "../../../src/index.js"; } from "../../../src/index.js";
import { messageText, runNodes } from "../utils.js"; import { messageText } from "../utils.js";
describe("Waku Light Push : Multiple PubsubTopics", function () { describe("Waku Light Push : Multiple PubsubTopics", function () {
this.timeout(30000); this.timeout(30000);
let waku: LightNode; let waku: LightNode;
let nwaku: ServiceNode; let serviceNodes: ServiceNodesFleet;
let nwaku2: ServiceNode;
let messageCollector: MessageCollector;
const shardInfo: ShardInfo = { clusterId: 3, shards: [1, 2] }; const shardInfo: ShardInfo = { clusterId: 3, shards: [1, 2] };
const singleShardInfo1: SingleShardInfo = { clusterId: 3, shard: 1 }; const singleShardInfo1: SingleShardInfo = { clusterId: 3, shard: 1 };
@ -55,13 +51,19 @@ describe("Waku Light Push : Multiple PubsubTopics", function () {
let node1PeerId: PeerId; let node1PeerId: PeerId;
beforeEachCustom(this, async () => { beforeEachCustom(this, async () => {
[nwaku, waku] = await runNodes(this.ctx, shardInfo); [serviceNodes, waku] = await runMultipleNodes(
messageCollector = new MessageCollector(nwaku); this.ctx,
node1PeerId = await nwaku.getPeerId(); shardInfo,
undefined,
true,
2,
true
);
node1PeerId = await serviceNodes.nodes[0].getPeerId();
}); });
afterEachCustom(this, async () => { afterEachCustom(this, async () => {
await tearDownNodes([nwaku, nwaku2], waku); await tearDownNodes(serviceNodes.nodes, waku);
}); });
it("Push message on custom pubsubTopic", async function () { it("Push message on custom pubsubTopic", async function () {
@ -72,11 +74,11 @@ describe("Waku Light Push : Multiple PubsubTopics", function () {
expect(pushResponse.successes[0].toString()).to.eq(node1PeerId.toString()); expect(pushResponse.successes[0].toString()).to.eq(node1PeerId.toString());
expect( expect(
await messageCollector.waitForMessages(1, { await serviceNodes.messageCollector.waitForMessages(1, {
pubsubTopic: customPubsubTopic1 pubsubTopic: customPubsubTopic1
}) })
).to.eq(true); ).to.eq(true);
messageCollector.verifyReceivedMessage(0, { serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText, expectedMessageText: messageText,
expectedContentTopic: customContentTopic1 expectedContentTopic: customContentTopic1
}); });
@ -92,50 +94,48 @@ describe("Waku Light Push : Multiple PubsubTopics", function () {
expect(pushResponse1.successes[0].toString()).to.eq(node1PeerId.toString()); expect(pushResponse1.successes[0].toString()).to.eq(node1PeerId.toString());
expect(pushResponse2.successes[0].toString()).to.eq(node1PeerId.toString()); expect(pushResponse2.successes[0].toString()).to.eq(node1PeerId.toString());
const messageCollector2 = new MessageCollector(nwaku);
expect( expect(
await messageCollector.waitForMessages(1, { await serviceNodes.messageCollector.waitForMessages(1, {
pubsubTopic: customPubsubTopic1 pubsubTopic: customPubsubTopic1
}) })
).to.eq(true); ).to.eq(true);
expect( expect(
await messageCollector2.waitForMessages(1, { await serviceNodes.messageCollector.waitForMessages(1, {
pubsubTopic: customPubsubTopic2 pubsubTopic: customPubsubTopic2
}) })
).to.eq(true); ).to.eq(true);
messageCollector.verifyReceivedMessage(0, { serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: "M1", expectedMessageText: "M1",
expectedContentTopic: customContentTopic1, expectedContentTopic: customContentTopic1,
expectedPubsubTopic: customPubsubTopic1 expectedPubsubTopic: customPubsubTopic1
}); });
messageCollector2.verifyReceivedMessage(0, { serviceNodes.messageCollector.verifyReceivedMessage(1, {
expectedMessageText: "M2", expectedMessageText: "M2",
expectedContentTopic: customContentTopic2, expectedContentTopic: customContentTopic2,
expectedPubsubTopic: customPubsubTopic2 expectedPubsubTopic: customPubsubTopic2
}); });
}); });
it("Light push messages to 2 nwaku nodes each with different pubsubtopics", async function () { it("Light push messages to 2 service nodes each with different pubsubtopics", async function () {
// Set up and start a new nwaku node with Default PubsubTopic const [serviceNodes2, waku2] = await runMultipleNodes(
nwaku2 = new ServiceNode(makeLogFileName(this) + "2"); this.ctx,
await nwaku2.start({ {
filter: true, clusterId: singleShardInfo2.clusterId,
lightpush: true, shards: [singleShardInfo2.shard!]
relay: true, },
pubsubTopic: [singleShardInfoToPubsubTopic(singleShardInfo2)], undefined,
clusterId: singleShardInfo2.clusterId true,
}); 1
await nwaku2.ensureSubscriptions([ );
await serviceNodes2.nodes[0].ensureSubscriptions([
singleShardInfoToPubsubTopic(singleShardInfo2) singleShardInfoToPubsubTopic(singleShardInfo2)
]); ]);
await waku.dial(await nwaku2.getMultiaddrWithId()); await waku.dial(await serviceNodes2.nodes[0].getMultiaddrWithId());
await waku.waitForPeers([Protocols.LightPush]); await waku.waitForPeers([Protocols.LightPush]);
const messageCollector2 = new MessageCollector(nwaku2);
await waku.lightPush.send(customEncoder1, { await waku.lightPush.send(customEncoder1, {
payload: utf8ToBytes("M1") payload: utf8ToBytes("M1")
}); });
@ -143,33 +143,33 @@ describe("Waku Light Push : Multiple PubsubTopics", function () {
payload: utf8ToBytes("M2") payload: utf8ToBytes("M2")
}); });
await messageCollector.waitForMessages(1, { await serviceNodes.messageCollector.waitForMessages(1, {
pubsubTopic: customPubsubTopic1 pubsubTopic: customPubsubTopic1
}); });
await serviceNodes2.messageCollector.waitForMessages(1, {
await messageCollector2.waitForMessages(1, {
pubsubTopic: singleShardInfoToPubsubTopic(singleShardInfo2) pubsubTopic: singleShardInfoToPubsubTopic(singleShardInfo2)
}); });
messageCollector.verifyReceivedMessage(0, { serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: "M1", expectedMessageText: "M1",
expectedContentTopic: customContentTopic1, expectedContentTopic: customContentTopic1,
expectedPubsubTopic: customPubsubTopic1 expectedPubsubTopic: customPubsubTopic1
}); });
messageCollector2.verifyReceivedMessage(0, { serviceNodes2.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: "M2", expectedMessageText: "M2",
expectedContentTopic: customContentTopic2, expectedContentTopic: customContentTopic2,
expectedPubsubTopic: singleShardInfoToPubsubTopic(singleShardInfo2) expectedPubsubTopic: singleShardInfoToPubsubTopic(singleShardInfo2)
}); });
// Clean up second fleet
await tearDownNodes(serviceNodes2.nodes, waku2);
}); });
}); });
describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () { describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () {
this.timeout(30000); this.timeout(30000);
let waku: LightNode; let waku: LightNode;
let nwaku: ServiceNode; let serviceNodes: ServiceNodesFleet;
let nwaku2: ServiceNode;
let messageCollector: MessageCollector;
const clusterId = 4; const clusterId = 4;
const customContentTopic1 = "/waku/2/content/test.js"; const customContentTopic1 = "/waku/2/content/test.js";
@ -198,13 +198,19 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () {
let node1PeerId: PeerId; let node1PeerId: PeerId;
beforeEachCustom(this, async () => { beforeEachCustom(this, async () => {
[nwaku, waku] = await runNodes(this.ctx, shardInfo); [serviceNodes, waku] = await runMultipleNodes(
messageCollector = new MessageCollector(nwaku); this.ctx,
node1PeerId = await nwaku.getPeerId(); shardInfo,
undefined,
true,
2,
true
);
node1PeerId = await serviceNodes.nodes[0].getPeerId();
}); });
afterEachCustom(this, async () => { afterEachCustom(this, async () => {
await tearDownNodes([nwaku, nwaku2], waku); await tearDownNodes(serviceNodes.nodes, waku);
}); });
it("Push message on custom pubsubTopic", async function () { it("Push message on custom pubsubTopic", async function () {
@ -212,15 +218,14 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () {
payload: utf8ToBytes(messageText) payload: utf8ToBytes(messageText)
}); });
expect(pushResponse.failures).to.be.empty;
expect(pushResponse.successes[0].toString()).to.eq(node1PeerId.toString()); expect(pushResponse.successes[0].toString()).to.eq(node1PeerId.toString());
expect( expect(
await messageCollector.waitForMessagesAutosharding(1, { await serviceNodes.messageCollector.waitForMessages(1, {
contentTopic: customContentTopic1 pubsubTopic: autoshardingPubsubTopic1
}) })
).to.eq(true); ).to.eq(true);
messageCollector.verifyReceivedMessage(0, { serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText, expectedMessageText: messageText,
expectedContentTopic: customContentTopic1 expectedContentTopic: customContentTopic1
}); });
@ -236,47 +241,45 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () {
expect(pushResponse1.successes[0].toString()).to.eq(node1PeerId.toString()); expect(pushResponse1.successes[0].toString()).to.eq(node1PeerId.toString());
expect(pushResponse2.successes[0].toString()).to.eq(node1PeerId.toString()); expect(pushResponse2.successes[0].toString()).to.eq(node1PeerId.toString());
const messageCollector2 = new MessageCollector(nwaku);
expect( expect(
await messageCollector.waitForMessagesAutosharding(1, { await serviceNodes.messageCollector.waitForMessages(1, {
contentTopic: customContentTopic1 pubsubTopic: autoshardingPubsubTopic1
}) })
).to.eq(true); ).to.eq(true);
expect( expect(
await messageCollector2.waitForMessagesAutosharding(1, { await serviceNodes.messageCollector.waitForMessages(1, {
contentTopic: customContentTopic2 pubsubTopic: autoshardingPubsubTopic2
}) })
).to.eq(true); ).to.eq(true);
messageCollector.verifyReceivedMessage(0, { serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: "M1", expectedMessageText: "M1",
expectedContentTopic: customContentTopic1, expectedContentTopic: customContentTopic1,
expectedPubsubTopic: autoshardingPubsubTopic1 expectedPubsubTopic: autoshardingPubsubTopic1
}); });
messageCollector2.verifyReceivedMessage(0, { serviceNodes.messageCollector.verifyReceivedMessage(1, {
expectedMessageText: "M2", expectedMessageText: "M2",
expectedContentTopic: customContentTopic2, expectedContentTopic: customContentTopic2,
expectedPubsubTopic: autoshardingPubsubTopic2 expectedPubsubTopic: autoshardingPubsubTopic2
}); });
}); });
it("Light push messages to 2 nwaku nodes each with different pubsubtopics", async function () { it("Light push messages to 2 service nodes each with different pubsubtopics", async function () {
// Set up and start a new nwaku node with Default PubsubTopic // Create a second fleet for the second pubsub topic
nwaku2 = new ServiceNode(makeLogFileName(this) + "2"); const [serviceNodes2, waku2] = await runMultipleNodes(
await nwaku2.start({ this.ctx,
filter: true, { clusterId, contentTopics: [customContentTopic2] },
lightpush: true, undefined,
relay: true, true,
pubsubTopic: [autoshardingPubsubTopic2], 1 // Only need one node for second fleet
clusterId: shardInfo.clusterId );
});
await nwaku2.ensureSubscriptionsAutosharding([customContentTopic2]);
await waku.dial(await nwaku2.getMultiaddrWithId());
await waku.waitForPeers([Protocols.LightPush]);
const messageCollector2 = new MessageCollector(nwaku2); await serviceNodes2.nodes[0].ensureSubscriptionsAutosharding([
customContentTopic2
]);
await waku.dial(await serviceNodes2.nodes[0].getMultiaddrWithId());
await waku.waitForPeers([Protocols.LightPush]);
await waku.lightPush.send(customEncoder1, { await waku.lightPush.send(customEncoder1, {
payload: utf8ToBytes("M1") payload: utf8ToBytes("M1")
@ -285,34 +288,33 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () {
payload: utf8ToBytes("M2") payload: utf8ToBytes("M2")
}); });
await messageCollector.waitForMessagesAutosharding(1, { await serviceNodes.messageCollector.waitForMessagesAutosharding(1, {
contentTopic: customContentTopic1 contentTopic: customContentTopic1
}); });
await messageCollector2.waitForMessagesAutosharding(1, { await serviceNodes2.messageCollector.waitForMessagesAutosharding(1, {
contentTopic: customContentTopic2 contentTopic: customContentTopic2
}); });
messageCollector.verifyReceivedMessage(0, { serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: "M1", expectedMessageText: "M1",
expectedContentTopic: customContentTopic1, expectedContentTopic: customContentTopic1,
expectedPubsubTopic: autoshardingPubsubTopic1 expectedPubsubTopic: autoshardingPubsubTopic1
}); });
messageCollector2.verifyReceivedMessage(0, { serviceNodes2.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: "M2", expectedMessageText: "M2",
expectedContentTopic: customContentTopic2, expectedContentTopic: customContentTopic2,
expectedPubsubTopic: autoshardingPubsubTopic2 expectedPubsubTopic: autoshardingPubsubTopic2
}); });
// Clean up second fleet
await tearDownNodes(serviceNodes2.nodes, waku2);
}); });
}); });
describe("Waku Light Push (named sharding): Multiple PubsubTopics", function () { describe("Waku Light Push (named sharding): Multiple PubsubTopics", function () {
this.timeout(30000); this.timeout(30000);
let waku: LightNode; let waku: LightNode;
let waku2: LightNode; let serviceNodes: ServiceNodesFleet;
let nwaku: ServiceNode;
let nwaku2: ServiceNode;
let messageCollector: MessageCollector;
let ctx: Context;
const clusterId = 3; const clusterId = 3;
const customContentTopic1 = "/waku/2/content/utf8"; const customContentTopic1 = "/waku/2/content/utf8";
@ -355,14 +357,19 @@ describe("Waku Light Push (named sharding): Multiple PubsubTopics", function ()
let node1PeerId: PeerId; let node1PeerId: PeerId;
beforeEachCustom(this, async () => { beforeEachCustom(this, async () => {
ctx = this.ctx; [serviceNodes, waku] = await runMultipleNodes(
[nwaku, waku] = await runNodes(ctx, testShardInfo); this.ctx,
messageCollector = new MessageCollector(nwaku); testShardInfo,
node1PeerId = await nwaku.getPeerId(); undefined,
true,
2,
true
);
node1PeerId = await serviceNodes.nodes[0].getPeerId();
}); });
afterEachCustom(this, async () => { afterEachCustom(this, async () => {
await tearDownNodes([nwaku, nwaku2], [waku, waku2]); await tearDownNodes(serviceNodes.nodes, waku);
}); });
it("Push message on custom pubsubTopic", async function () { it("Push message on custom pubsubTopic", async function () {
@ -373,11 +380,11 @@ describe("Waku Light Push (named sharding): Multiple PubsubTopics", function ()
expect(pushResponse.successes[0].toString()).to.eq(node1PeerId.toString()); expect(pushResponse.successes[0].toString()).to.eq(node1PeerId.toString());
expect( expect(
await messageCollector.waitForMessages(1, { await serviceNodes.messageCollector.waitForMessages(1, {
pubsubTopic: autoshardingPubsubTopic1 pubsubTopic: autoshardingPubsubTopic1
}) })
).to.eq(true); ).to.eq(true);
messageCollector.verifyReceivedMessage(0, { serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText, expectedMessageText: messageText,
expectedContentTopic: customContentTopic1 expectedContentTopic: customContentTopic1
}); });
@ -393,68 +400,71 @@ describe("Waku Light Push (named sharding): Multiple PubsubTopics", function ()
expect(pushResponse1.successes[0].toString()).to.eq(node1PeerId.toString()); expect(pushResponse1.successes[0].toString()).to.eq(node1PeerId.toString());
expect(pushResponse2.successes[0].toString()).to.eq(node1PeerId.toString()); expect(pushResponse2.successes[0].toString()).to.eq(node1PeerId.toString());
const messageCollector2 = new MessageCollector(nwaku);
expect( expect(
await messageCollector.waitForMessages(1, { await serviceNodes.messageCollector.waitForMessages(1, {
pubsubTopic: autoshardingPubsubTopic1 pubsubTopic: autoshardingPubsubTopic1
}) })
).to.eq(true); ).to.eq(true);
expect( expect(
await messageCollector2.waitForMessages(1, { await serviceNodes.messageCollector.waitForMessages(1, {
pubsubTopic: autoshardingPubsubTopic2 pubsubTopic: autoshardingPubsubTopic2
}) })
).to.eq(true); ).to.eq(true);
messageCollector.verifyReceivedMessage(0, { serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: "M1", expectedMessageText: "M1",
expectedContentTopic: customContentTopic1, expectedContentTopic: customContentTopic1,
expectedPubsubTopic: autoshardingPubsubTopic1 expectedPubsubTopic: autoshardingPubsubTopic1
}); });
messageCollector2.verifyReceivedMessage(0, { serviceNodes.messageCollector.verifyReceivedMessage(1, {
expectedMessageText: "M2", expectedMessageText: "M2",
expectedContentTopic: customContentTopic2, expectedContentTopic: customContentTopic2,
expectedPubsubTopic: autoshardingPubsubTopic2 expectedPubsubTopic: autoshardingPubsubTopic2
}); });
}); });
it("Light push messages to 2 nwaku nodes each with different pubsubtopics", async function () { it("Light push messages to 2 service nodes each with different pubsubtopics", async function () {
// Set up and start a new nwaku node with Default PubsubTopic const [serviceNodes2, waku2] = await runMultipleNodes(
[nwaku2] = await runNodes(ctx, shardInfo2); this.ctx,
shardInfo2,
undefined,
true,
1
);
await nwaku2.ensureSubscriptions([autoshardingPubsubTopic2]); await serviceNodes2.nodes[0].ensureSubscriptions([
await waku.dial(await nwaku2.getMultiaddrWithId()); autoshardingPubsubTopic2
]);
await waku.dial(await serviceNodes2.nodes[0].getMultiaddrWithId());
await waku.waitForPeers([Protocols.LightPush]); await waku.waitForPeers([Protocols.LightPush]);
const messageCollector2 = new MessageCollector(nwaku2); await waku.lightPush.send(customEncoder1, {
const { failures: f1 } = await waku.lightPush.send(customEncoder1, {
payload: utf8ToBytes("M1") payload: utf8ToBytes("M1")
}); });
const { failures: f2 } = await waku.lightPush.send(customEncoder2, { await waku.lightPush.send(customEncoder2, {
payload: utf8ToBytes("M2") payload: utf8ToBytes("M2")
}); });
expect(f1).to.be.empty; await serviceNodes.messageCollector.waitForMessages(1, {
expect(f2).to.be.empty;
await messageCollector.waitForMessages(1, {
pubsubTopic: autoshardingPubsubTopic1 pubsubTopic: autoshardingPubsubTopic1
}); });
await messageCollector2.waitForMessages(1, { await serviceNodes2.messageCollector.waitForMessages(1, {
pubsubTopic: autoshardingPubsubTopic2 pubsubTopic: autoshardingPubsubTopic2
}); });
messageCollector.verifyReceivedMessage(0, { serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: "M1", expectedMessageText: "M1",
expectedContentTopic: customContentTopic1, expectedContentTopic: customContentTopic1,
expectedPubsubTopic: autoshardingPubsubTopic1 expectedPubsubTopic: autoshardingPubsubTopic1
}); });
messageCollector2.verifyReceivedMessage(0, { serviceNodes2.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: "M2", expectedMessageText: "M2",
expectedContentTopic: customContentTopic2, expectedContentTopic: customContentTopic2,
expectedPubsubTopic: autoshardingPubsubTopic2 expectedPubsubTopic: autoshardingPubsubTopic2
}); });
// Clean up second fleet
await tearDownNodes(serviceNodes2.nodes, waku2);
}); });
}); });

View File

@ -1,4 +1,4 @@
import { LightNode, ProtocolError, Protocols } from "@waku/interfaces"; import { LightNode, ProtocolError } from "@waku/interfaces";
import { createEncoder, createLightNode, utf8ToBytes } from "@waku/sdk"; import { createEncoder, createLightNode, utf8ToBytes } from "@waku/sdk";
import { import {
contentTopicToPubsubTopic, contentTopicToPubsubTopic,
@ -8,10 +8,8 @@ import { expect } from "chai";
import { import {
afterEachCustom, afterEachCustom,
beforeEachCustom, runMultipleNodes,
makeLogFileName, ServiceNodesFleet,
MessageCollector,
ServiceNode,
tearDownNodes tearDownNodes
} from "../../src/index.js"; } from "../../src/index.js";
@ -22,41 +20,33 @@ describe("Autosharding: Running Nodes", function () {
this.timeout(50000); this.timeout(50000);
const clusterId = 10; const clusterId = 10;
let waku: LightNode; let waku: LightNode;
let nwaku: ServiceNode; let serviceNodes: ServiceNodesFleet;
let messageCollector: MessageCollector;
beforeEachCustom(this, async () => {
nwaku = new ServiceNode(makeLogFileName(this.ctx));
messageCollector = new MessageCollector(nwaku);
});
afterEachCustom(this, async () => { afterEachCustom(this, async () => {
await tearDownNodes(nwaku, waku); await tearDownNodes(serviceNodes.nodes, waku);
}); });
describe("Different clusters and topics", function () { describe("Different clusters and topics", function () {
// js-waku allows autosharding for cluster IDs different than 1
it("Cluster ID 0 - Default/Global Cluster", async function () { it("Cluster ID 0 - Default/Global Cluster", async function () {
const clusterId = 0; const clusterId = 0;
const pubsubTopics = [contentTopicToPubsubTopic(ContentTopic, clusterId)]; const pubsubTopics = [contentTopicToPubsubTopic(ContentTopic, clusterId)];
await nwaku.start({
store: true,
lightpush: true,
relay: true,
clusterId: clusterId,
pubsubTopic: pubsubTopics
});
await nwaku.ensureSubscriptions(pubsubTopics); [serviceNodes, waku] = await runMultipleNodes(
this.ctx,
waku = await createLightNode({ {
networkConfig: { clusterId,
clusterId: clusterId,
contentTopics: [ContentTopic] contentTopics: [ContentTopic]
} },
}); {
await waku.dial(await nwaku.getMultiaddrWithId()); store: true,
await waku.waitForPeers([Protocols.LightPush]); lightpush: true,
relay: true,
pubsubTopic: pubsubTopics
},
false,
2,
true
);
const encoder = createEncoder({ const encoder = createEncoder({
contentTopic: ContentTopic, contentTopic: ContentTopic,
@ -70,9 +60,9 @@ describe("Autosharding: Running Nodes", function () {
payload: utf8ToBytes("Hello World") payload: utf8ToBytes("Hello World")
}); });
expect(request.successes.length).to.eq(1); expect(request.successes.length).to.eq(2); // Expect 2 successes for 2 nodes
expect( expect(
await messageCollector.waitForMessagesAutosharding(1, { await serviceNodes.messageCollector.waitForMessagesAutosharding(1, {
contentTopic: ContentTopic contentTopic: ContentTopic
}) })
).to.eq(true); ).to.eq(true);
@ -81,24 +71,23 @@ describe("Autosharding: Running Nodes", function () {
it("Non TWN Cluster", async function () { it("Non TWN Cluster", async function () {
const clusterId = 5; const clusterId = 5;
const pubsubTopics = [contentTopicToPubsubTopic(ContentTopic, clusterId)]; const pubsubTopics = [contentTopicToPubsubTopic(ContentTopic, clusterId)];
await nwaku.start({
store: true,
lightpush: true,
relay: true,
clusterId: clusterId,
pubsubTopic: pubsubTopics
});
await nwaku.ensureSubscriptions(pubsubTopics); [serviceNodes, waku] = await runMultipleNodes(
this.ctx,
waku = await createLightNode({ {
networkConfig: { clusterId,
clusterId: clusterId,
contentTopics: [ContentTopic] contentTopics: [ContentTopic]
} },
}); {
await waku.dial(await nwaku.getMultiaddrWithId()); store: true,
await waku.waitForPeers([Protocols.LightPush]); lightpush: true,
relay: true,
pubsubTopic: pubsubTopics
},
false,
2,
true
);
const encoder = createEncoder({ const encoder = createEncoder({
contentTopic: ContentTopic, contentTopic: ContentTopic,
@ -112,9 +101,9 @@ describe("Autosharding: Running Nodes", function () {
payload: utf8ToBytes("Hello World") payload: utf8ToBytes("Hello World")
}); });
expect(request.successes.length).to.eq(1); expect(request.successes.length).to.eq(2); // Expect 2 successes for 2 nodes
expect( expect(
await messageCollector.waitForMessagesAutosharding(1, { await serviceNodes.messageCollector.waitForMessagesAutosharding(1, {
contentTopic: ContentTopic contentTopic: ContentTopic
}) })
).to.eq(true); ).to.eq(true);
@ -138,24 +127,23 @@ describe("Autosharding: Running Nodes", function () {
contentTopicToPubsubTopic(ContentTopic, clusterId) contentTopicToPubsubTopic(ContentTopic, clusterId)
]; ];
await nwaku.start({ [serviceNodes, waku] = await runMultipleNodes(
store: true, this.ctx,
lightpush: true, {
relay: true, clusterId,
clusterId: clusterId,
pubsubTopic: pubsubTopics,
contentTopic: [ContentTopic]
});
waku = await createLightNode({
networkConfig: {
clusterId: clusterId,
contentTopics: [ContentTopic] contentTopics: [ContentTopic]
} },
}); {
store: true,
await waku.dial(await nwaku.getMultiaddrWithId()); lightpush: true,
await waku.waitForPeers([Protocols.LightPush]); relay: true,
pubsubTopic: pubsubTopics,
contentTopic: [ContentTopic]
},
false,
2,
true
);
const encoder = createEncoder({ const encoder = createEncoder({
contentTopic: ContentTopic, contentTopic: ContentTopic,
@ -169,9 +157,9 @@ describe("Autosharding: Running Nodes", function () {
payload: utf8ToBytes("Hello World") payload: utf8ToBytes("Hello World")
}); });
expect(request.successes.length).to.eq(1); expect(request.successes.length).to.eq(2); // Expect 2 successes for 2 nodes
expect( expect(
await messageCollector.waitForMessagesAutosharding(1, { await serviceNodes.messageCollector.waitForMessagesAutosharding(1, {
contentTopic: ContentTopic contentTopic: ContentTopic
}) })
).to.eq(true); ).to.eq(true);
@ -201,24 +189,23 @@ describe("Autosharding: Running Nodes", function () {
contentTopicToPubsubTopic(ContentTopic2, clusterId) contentTopicToPubsubTopic(ContentTopic2, clusterId)
]; ];
await nwaku.start({ [serviceNodes, waku] = await runMultipleNodes(
store: true, this.ctx,
lightpush: true, {
relay: true, clusterId,
clusterId: clusterId,
pubsubTopic: pubsubTopics,
contentTopic: [ContentTopic, ContentTopic2]
});
waku = await createLightNode({
networkConfig: {
clusterId: clusterId,
// For autosharding, we configure multiple pubsub topics by using two content topics that hash to different shards
contentTopics: [ContentTopic, ContentTopic2] contentTopics: [ContentTopic, ContentTopic2]
} },
}); {
await waku.dial(await nwaku.getMultiaddrWithId()); store: true,
await waku.waitForPeers([Protocols.LightPush]); lightpush: true,
relay: true,
pubsubTopic: pubsubTopics,
contentTopic: [ContentTopic, ContentTopic2]
},
false,
2,
true
);
const encoder1 = createEncoder({ const encoder1 = createEncoder({
contentTopic: ContentTopic, contentTopic: ContentTopic,
@ -239,9 +226,9 @@ describe("Autosharding: Running Nodes", function () {
const request1 = await waku.lightPush.send(encoder1, { const request1 = await waku.lightPush.send(encoder1, {
payload: utf8ToBytes("Hello World") payload: utf8ToBytes("Hello World")
}); });
expect(request1.successes.length).to.eq(1); expect(request1.successes.length).to.eq(2); // Expect 2 successes for 2 nodes
expect( expect(
await messageCollector.waitForMessagesAutosharding(1, { await serviceNodes.messageCollector.waitForMessagesAutosharding(1, {
contentTopic: ContentTopic contentTopic: ContentTopic
}) })
).to.eq(true); ).to.eq(true);
@ -249,9 +236,9 @@ describe("Autosharding: Running Nodes", function () {
const request2 = await waku.lightPush.send(encoder2, { const request2 = await waku.lightPush.send(encoder2, {
payload: utf8ToBytes("Hello World") payload: utf8ToBytes("Hello World")
}); });
expect(request2.successes.length).to.eq(1); expect(request2.successes.length).to.eq(2); // Expect 2 successes for 2 nodes
expect( expect(
await messageCollector.waitForMessagesAutosharding(1, { await serviceNodes.messageCollector.waitForMessagesAutosharding(1, {
contentTopic: ContentTopic contentTopic: ContentTopic
}) })
).to.eq(true); ).to.eq(true);
@ -259,21 +246,24 @@ describe("Autosharding: Running Nodes", function () {
it("using a protocol with unconfigured pubsub topic should fail", async function () { it("using a protocol with unconfigured pubsub topic should fail", async function () {
const pubsubTopics = [contentTopicToPubsubTopic(ContentTopic, clusterId)]; const pubsubTopics = [contentTopicToPubsubTopic(ContentTopic, clusterId)];
await nwaku.start({
store: true,
lightpush: true,
relay: true,
clusterId: clusterId,
pubsubTopic: pubsubTopics,
contentTopic: [ContentTopic]
});
waku = await createLightNode({ [serviceNodes, waku] = await runMultipleNodes(
networkConfig: { this.ctx,
clusterId: clusterId, {
clusterId,
contentTopics: [ContentTopic] contentTopics: [ContentTopic]
} },
}); {
store: true,
lightpush: true,
relay: true,
pubsubTopic: pubsubTopics,
contentTopic: [ContentTopic]
},
false,
2,
true
);
// use a content topic that is not configured // use a content topic that is not configured
const encoder = createEncoder({ const encoder = createEncoder({
@ -288,17 +278,17 @@ describe("Autosharding: Running Nodes", function () {
payload: utf8ToBytes("Hello World") payload: utf8ToBytes("Hello World")
}); });
if (successes.length > 0 || failures?.length === 0) { if (successes.length > 0 || !failures?.length) {
throw new Error("The request should've thrown an error"); throw new Error("The request should've thrown an error");
} }
const errors = failures?.map((failure) => failure.error); const errors = failures.map((failure) => failure.error);
expect(errors).to.include(ProtocolError.TOPIC_NOT_CONFIGURED); expect(errors).to.include(ProtocolError.TOPIC_NOT_CONFIGURED);
}); });
it("start node with empty content topic", async function () { it("start node with empty content topic", async function () {
try { try {
waku = await createLightNode({ await createLightNode({
networkConfig: { networkConfig: {
clusterId: clusterId, clusterId: clusterId,
contentTopics: [] contentTopics: []

View File

@ -1,7 +1,6 @@
import { import {
LightNode, LightNode,
ProtocolError, ProtocolError,
Protocols,
ShardInfo, ShardInfo,
SingleShardInfo SingleShardInfo
} from "@waku/interfaces"; } from "@waku/interfaces";
@ -15,28 +14,20 @@ import { expect } from "chai";
import { import {
afterEachCustom, afterEachCustom,
beforeEachCustom, runMultipleNodes,
makeLogFileName, ServiceNodesFleet,
MessageCollector,
ServiceNode,
tearDownNodes tearDownNodes
} from "../../src/index.js"; } from "../../src/index.js";
const ContentTopic = "/waku/2/content/test.js"; const ContentTopic = "/waku/2/content/test.js";
describe("Static Sharding: Running Nodes", function () { describe("Static Sharding: Running Nodes", function () {
this.timeout(15_000); this.timeout(60_000);
let waku: LightNode; let waku: LightNode;
let nwaku: ServiceNode; let serviceNodes: ServiceNodesFleet;
let messageCollector: MessageCollector;
beforeEachCustom(this, async () => {
nwaku = new ServiceNode(makeLogFileName(this.ctx));
messageCollector = new MessageCollector(nwaku);
});
afterEachCustom(this, async () => { afterEachCustom(this, async () => {
await tearDownNodes(nwaku, waku); await tearDownNodes(serviceNodes.nodes, waku);
}); });
describe("Different clusters and shards", function () { describe("Different clusters and shards", function () {
@ -44,20 +35,20 @@ describe("Static Sharding: Running Nodes", function () {
const singleShardInfo = { clusterId: 0, shard: 0 }; const singleShardInfo = { clusterId: 0, shard: 0 };
const shardInfo = singleShardInfosToShardInfo([singleShardInfo]); const shardInfo = singleShardInfosToShardInfo([singleShardInfo]);
await nwaku.start({ [serviceNodes, waku] = await runMultipleNodes(
store: true, this.ctx,
lightpush: true, shardInfo,
relay: true, {
pubsubTopic: shardInfoToPubsubTopics(shardInfo) store: true,
}); lightpush: true,
relay: true,
await nwaku.ensureSubscriptions(shardInfoToPubsubTopics(shardInfo)); pubsubTopic: shardInfoToPubsubTopics(shardInfo),
clusterId: singleShardInfo.clusterId
waku = await createLightNode({ },
networkConfig: shardInfo false,
}); 2,
await waku.dial(await nwaku.getMultiaddrWithId()); true
await waku.waitForPeers([Protocols.LightPush]); );
const encoder = createEncoder({ const encoder = createEncoder({
contentTopic: ContentTopic, contentTopic: ContentTopic,
@ -71,33 +62,32 @@ describe("Static Sharding: Running Nodes", function () {
payload: utf8ToBytes("Hello World") payload: utf8ToBytes("Hello World")
}); });
expect(request.successes.length).to.eq(1); expect(request.successes.length).to.eq(2);
expect( expect(
await messageCollector.waitForMessages(1, { await serviceNodes.messageCollector.waitForMessages(1, {
pubsubTopic: shardInfoToPubsubTopics(shardInfo)[0] pubsubTopic: shardInfoToPubsubTopics(shardInfo)[0]
}) })
).to.eq(true); ).to.eq(true);
}); });
// dedicated test for Default Cluster ID 0
it("Cluster ID 0 - Default/Global Cluster", async function () { it("Cluster ID 0 - Default/Global Cluster", async function () {
const singleShardInfo = { clusterId: 0, shard: 1 }; const singleShardInfo = { clusterId: 0, shard: 1 };
const shardInfo = singleShardInfosToShardInfo([singleShardInfo]); const shardInfo = singleShardInfosToShardInfo([singleShardInfo]);
await nwaku.start({ [serviceNodes, waku] = await runMultipleNodes(
store: true, this.ctx,
lightpush: true, shardInfo,
relay: true, {
pubsubTopic: shardInfoToPubsubTopics(shardInfo) store: true,
}); lightpush: true,
relay: true,
await nwaku.ensureSubscriptions(shardInfoToPubsubTopics(shardInfo)); pubsubTopic: shardInfoToPubsubTopics(shardInfo),
clusterId: singleShardInfo.clusterId
waku = await createLightNode({ },
networkConfig: shardInfo false,
}); 2,
await waku.dial(await nwaku.getMultiaddrWithId()); true
await waku.waitForPeers([Protocols.LightPush]); );
const encoder = createEncoder({ const encoder = createEncoder({
contentTopic: ContentTopic, contentTopic: ContentTopic,
@ -108,9 +98,9 @@ describe("Static Sharding: Running Nodes", function () {
payload: utf8ToBytes("Hello World") payload: utf8ToBytes("Hello World")
}); });
expect(request.successes.length).to.eq(1); expect(request.successes.length).to.eq(2);
expect( expect(
await messageCollector.waitForMessages(1, { await serviceNodes.messageCollector.waitForMessages(1, {
pubsubTopic: shardInfoToPubsubTopics(shardInfo)[0] pubsubTopic: shardInfoToPubsubTopics(shardInfo)[0]
}) })
).to.eq(true); ).to.eq(true);
@ -120,33 +110,29 @@ describe("Static Sharding: Running Nodes", function () {
for (let i = 0; i < numTest; i++) { for (let i = 0; i < numTest; i++) {
// Random clusterId between 2 and 1000 // Random clusterId between 2 and 1000
const clusterId = Math.floor(Math.random() * 999) + 2; const clusterId = Math.floor(Math.random() * 999) + 2;
// Random shardId between 1 and 1000 // Random shardId between 1 and 1000
const shardId = Math.floor(Math.random() * 1000) + 1; const shardId = Math.floor(Math.random() * 1000) + 1;
it(`random static sharding ${ it(`random static sharding ${
i + 1 i + 1
} - Cluster ID: ${clusterId}, Shard ID: ${shardId}`, async function () { } - Cluster ID: ${clusterId}, Shard ID: ${shardId}`, async function () {
afterEach(async () => {
await tearDownNodes(nwaku, waku);
});
const singleShardInfo = { clusterId: clusterId, shard: shardId }; const singleShardInfo = { clusterId: clusterId, shard: shardId };
const shardInfo = singleShardInfosToShardInfo([singleShardInfo]); const shardInfo = singleShardInfosToShardInfo([singleShardInfo]);
await nwaku.start({ [serviceNodes, waku] = await runMultipleNodes(
store: true, this.ctx,
lightpush: true, shardInfo,
relay: true, {
clusterId: clusterId, store: true,
pubsubTopic: shardInfoToPubsubTopics(shardInfo) lightpush: true,
}); relay: true,
clusterId: clusterId,
waku = await createLightNode({ pubsubTopic: shardInfoToPubsubTopics(shardInfo)
networkConfig: shardInfo },
}); false,
await waku.dial(await nwaku.getMultiaddrWithId()); 2,
await waku.waitForPeers([Protocols.LightPush]); true
);
const encoder = createEncoder({ const encoder = createEncoder({
contentTopic: ContentTopic, contentTopic: ContentTopic,
@ -157,9 +143,9 @@ describe("Static Sharding: Running Nodes", function () {
payload: utf8ToBytes("Hello World") payload: utf8ToBytes("Hello World")
}); });
expect(request.successes.length).to.eq(1); expect(request.successes.length).to.eq(2);
expect( expect(
await messageCollector.waitForMessages(1, { await serviceNodes.messageCollector.waitForMessages(1, {
pubsubTopic: shardInfoToPubsubTopics(shardInfo)[0] pubsubTopic: shardInfoToPubsubTopics(shardInfo)[0]
}) })
).to.eq(true); ).to.eq(true);
@ -169,12 +155,14 @@ describe("Static Sharding: Running Nodes", function () {
describe("Others", function () { describe("Others", function () {
const clusterId = 2; const clusterId = 2;
let shardInfo: ShardInfo;
const shardInfoFirstShard: ShardInfo = { const shardInfoFirstShard: ShardInfo = {
clusterId: clusterId, clusterId: clusterId,
shards: [2] shards: [2]
}; };
const shardInfoSecondShard: ShardInfo = {
clusterId: clusterId,
shards: [3]
};
const shardInfoBothShards: ShardInfo = { const shardInfoBothShards: ShardInfo = {
clusterId: clusterId, clusterId: clusterId,
shards: [2, 3] shards: [2, 3]
@ -188,31 +176,21 @@ describe("Static Sharding: Running Nodes", function () {
shard: 3 shard: 3
}; };
beforeEachCustom(this, async () => {
shardInfo = {
clusterId: clusterId,
shards: [2]
};
await nwaku.start({
store: true,
lightpush: true,
relay: true,
clusterId: clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo)
});
});
afterEachCustom(this, async () => {
await tearDownNodes(nwaku, waku);
});
it("configure the node with multiple pubsub topics", async function () { it("configure the node with multiple pubsub topics", async function () {
waku = await createLightNode({ [serviceNodes, waku] = await runMultipleNodes(
networkConfig: shardInfoBothShards this.ctx,
}); shardInfoBothShards,
await waku.dial(await nwaku.getMultiaddrWithId()); {
await waku.waitForPeers([Protocols.LightPush]); store: true,
lightpush: true,
relay: true,
clusterId: clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfoBothShards)
},
false,
2,
true
);
const encoder1 = createEncoder({ const encoder1 = createEncoder({
contentTopic: ContentTopic, contentTopic: ContentTopic,
@ -227,29 +205,40 @@ describe("Static Sharding: Running Nodes", function () {
const request1 = await waku.lightPush.send(encoder1, { const request1 = await waku.lightPush.send(encoder1, {
payload: utf8ToBytes("Hello World2") payload: utf8ToBytes("Hello World2")
}); });
expect(request1.successes.length).to.eq(1); expect(request1.successes.length).to.eq(2);
expect( expect(
await messageCollector.waitForMessages(1, { await serviceNodes.messageCollector.waitForMessages(1, {
pubsubTopic: shardInfoToPubsubTopics(shardInfo)[0] pubsubTopic: shardInfoToPubsubTopics(shardInfoFirstShard)[0]
}) })
).to.eq(true); ).to.eq(true);
const request2 = await waku.lightPush.send(encoder2, { const request2 = await waku.lightPush.send(encoder2, {
payload: utf8ToBytes("Hello World3") payload: utf8ToBytes("Hello World3")
}); });
expect(request2.successes.length).to.eq(1); expect(request2.successes.length).to.eq(2);
expect( expect(
await messageCollector.waitForMessages(1, { await serviceNodes.messageCollector.waitForMessages(1, {
pubsubTopic: shardInfoToPubsubTopics(shardInfo)[0] pubsubTopic: shardInfoToPubsubTopics(shardInfoSecondShard)[0]
}) })
).to.eq(true); ).to.eq(true);
}); });
it("using a protocol with unconfigured pubsub topic should fail", async function () { it("using a protocol with unconfigured pubsub topic should fail", async function () {
this.timeout(15_000); this.timeout(15_000);
waku = await createLightNode({ [serviceNodes, waku] = await runMultipleNodes(
networkConfig: shardInfoFirstShard this.ctx,
}); shardInfoFirstShard,
{
store: true,
lightpush: true,
relay: true,
clusterId: clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfoFirstShard)
},
false,
2,
true
);
// use a pubsub topic that is not configured // use a pubsub topic that is not configured
const encoder = createEncoder({ const encoder = createEncoder({
@ -261,17 +250,17 @@ describe("Static Sharding: Running Nodes", function () {
payload: utf8ToBytes("Hello World") payload: utf8ToBytes("Hello World")
}); });
if (successes.length > 0 || failures?.length === 0) { if (successes.length > 0 || !failures?.length) {
throw new Error("The request should've thrown an error"); throw new Error("The request should've thrown an error");
} }
const errors = failures?.map((failure) => failure.error); const errors = failures.map((failure) => failure.error);
expect(errors).to.include(ProtocolError.TOPIC_NOT_CONFIGURED); expect(errors).to.include(ProtocolError.TOPIC_NOT_CONFIGURED);
}); });
it("start node with empty shard should fail", async function () { it("start node with empty shard should fail", async function () {
try { try {
waku = await createLightNode({ await createLightNode({
networkConfig: { clusterId: clusterId, shards: [] } networkConfig: { clusterId: clusterId, shards: [] }
}); });
throw new Error( throw new Error(

View File

@ -304,13 +304,10 @@ describe("Waku Store, general", function () {
for await (const msg of query) { for await (const msg of query) {
if (msg) { if (msg) {
messages.push(msg as DecodedMessage); messages.push(msg as DecodedMessage);
console.log(bytesToUtf8(msg.payload!));
} }
} }
} }
console.log(messages.length);
// Messages are ordered from oldest to latest within a page (1 page query) // Messages are ordered from oldest to latest within a page (1 page query)
expect(bytesToUtf8(messages[0].payload!)).to.eq(asymText); expect(bytesToUtf8(messages[0].payload!)).to.eq(asymText);
expect(bytesToUtf8(messages[1].payload!)).to.eq(symText); expect(bytesToUtf8(messages[1].payload!)).to.eq(symText);

View File

@ -1,26 +1,24 @@
import { createDecoder } from "@waku/core"; import { createDecoder } from "@waku/core";
import type { ContentTopicInfo, IMessage, LightNode } from "@waku/interfaces"; import type { AutoSharding, IMessage, LightNode } from "@waku/interfaces";
import { createLightNode, Protocols } from "@waku/sdk"; import { Protocols } from "@waku/sdk";
import { import { contentTopicToPubsubTopic } from "@waku/utils";
contentTopicToPubsubTopic,
pubsubTopicToSingleShardInfo
} from "@waku/utils";
import { expect } from "chai"; import { expect } from "chai";
import { import {
afterEachCustom, afterEachCustom,
beforeEachCustom, beforeEachCustom,
makeLogFileName, makeLogFileName,
NOISE_KEY_1, runMultipleNodes,
ServiceNode, ServiceNode,
tearDownNodes ServiceNodesFleet,
tearDownNodes,
teardownNodesWithRedundancy
} from "../../src/index.js"; } from "../../src/index.js";
import { import {
processQueriedMessages, processQueriedMessages,
runStoreNodes, runStoreNodes,
sendMessages, sendMessages,
sendMessagesAutosharding,
TestDecoder, TestDecoder,
TestDecoder2, TestDecoder2,
TestShardInfo, TestShardInfo,
@ -156,8 +154,7 @@ describe("Waku Store, custom pubsub topic", function () {
describe("Waku Store (Autosharding), custom pubsub topic", function () { describe("Waku Store (Autosharding), custom pubsub topic", function () {
this.timeout(15000); this.timeout(15000);
let waku: LightNode; let waku: LightNode;
let nwaku: ServiceNode; let serviceNodes: ServiceNodesFleet;
let nwaku2: ServiceNode;
const customContentTopic1 = "/waku/2/content/utf8"; const customContentTopic1 = "/waku/2/content/utf8";
const customContentTopic2 = "/myapp/1/latest/proto"; const customContentTopic2 = "/myapp/1/latest/proto";
@ -170,29 +167,35 @@ describe("Waku Store (Autosharding), custom pubsub topic", function () {
customContentTopic2, customContentTopic2,
clusterId clusterId
); );
const customDecoder1 = createDecoder( const customDecoder1 = createDecoder(customContentTopic1, { clusterId: 5 });
customContentTopic1, const customDecoder2 = createDecoder(customContentTopic2, { clusterId: 5 });
pubsubTopicToSingleShardInfo(autoshardingPubsubTopic1) const contentTopicInfoBothShards: AutoSharding = {
);
const customDecoder2 = createDecoder(
customContentTopic2,
pubsubTopicToSingleShardInfo(autoshardingPubsubTopic2)
);
const contentTopicInfoBothShards: ContentTopicInfo = {
clusterId, clusterId,
contentTopics: [customContentTopic1, customContentTopic2] contentTopics: [customContentTopic1, customContentTopic2]
}; };
beforeEachCustom(this, async () => { beforeEachCustom(this, async () => {
[nwaku, waku] = await runStoreNodes(this.ctx, contentTopicInfoBothShards); [serviceNodes, waku] = await runMultipleNodes(
this.ctx,
contentTopicInfoBothShards,
{ store: true }
);
}); });
afterEachCustom(this, async () => { afterEachCustom(this, async () => {
await tearDownNodes([nwaku, nwaku2], waku); await teardownNodesWithRedundancy(serviceNodes, waku);
}); });
it("Generator, custom pubsub topic", async function () { it("Generator, custom pubsub topic", async function () {
await sendMessagesAutosharding(nwaku, totalMsgs, customContentTopic1); for (let i = 0; i < totalMsgs; i++) {
await serviceNodes.sendRelayMessage(
ServiceNode.toMessageRpcQuery({
payload: new Uint8Array([0]),
contentTopic: customContentTopic1
}),
autoshardingPubsubTopic1
);
}
const messages = await processQueriedMessages( const messages = await processQueriedMessages(
waku, waku,
@ -211,8 +214,22 @@ describe("Waku Store (Autosharding), custom pubsub topic", function () {
this.timeout(10000); this.timeout(10000);
const totalMsgs = 10; const totalMsgs = 10;
await sendMessagesAutosharding(nwaku, totalMsgs, customContentTopic1); for (let i = 0; i < totalMsgs; i++) {
await sendMessagesAutosharding(nwaku, totalMsgs, customContentTopic2); await serviceNodes.sendRelayMessage(
ServiceNode.toMessageRpcQuery({
payload: new Uint8Array([i]),
contentTopic: customContentTopic1
}),
autoshardingPubsubTopic1
);
await serviceNodes.sendRelayMessage(
ServiceNode.toMessageRpcQuery({
payload: new Uint8Array([i]),
contentTopic: customContentTopic2
}),
autoshardingPubsubTopic2
);
}
const customMessages = await processQueriedMessages( const customMessages = await processQueriedMessages(
waku, waku,
@ -236,54 +253,6 @@ describe("Waku Store (Autosharding), custom pubsub topic", function () {
}); });
expect(result2).to.not.eq(-1); expect(result2).to.not.eq(-1);
}); });
it("Generator, 2 nwaku nodes each with different pubsubtopics", async function () {
this.timeout(10000);
// Set up and start a new nwaku node with Default Pubsubtopic
nwaku2 = new ServiceNode(makeLogFileName(this) + "2");
await nwaku2.start({
store: true,
pubsubTopic: [autoshardingPubsubTopic2],
contentTopic: [customContentTopic2],
relay: true,
clusterId
});
await nwaku2.ensureSubscriptionsAutosharding([customContentTopic2]);
const totalMsgs = 10;
await sendMessagesAutosharding(nwaku, totalMsgs, customContentTopic1);
await sendMessagesAutosharding(nwaku2, totalMsgs, customContentTopic2);
waku = await createLightNode({
staticNoiseKey: NOISE_KEY_1,
networkConfig: contentTopicInfoBothShards
});
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waku.dial(await nwaku2.getMultiaddrWithId());
await waku.waitForPeers([Protocols.Store]);
let customMessages: IMessage[] = [];
let testMessages: IMessage[] = [];
while (
customMessages.length != totalMsgs ||
testMessages.length != totalMsgs
) {
customMessages = await processQueriedMessages(
waku,
[customDecoder1],
autoshardingPubsubTopic1
);
testMessages = await processQueriedMessages(
waku,
[customDecoder2],
autoshardingPubsubTopic2
);
}
});
}); });
describe("Waku Store (named sharding), custom pubsub topic", function () { describe("Waku Store (named sharding), custom pubsub topic", function () {