diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 22f6207a45..947d07f6f3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -122,14 +122,14 @@ jobs: uses: ./.github/workflows/test-node.yml secrets: inherit with: - nim_wakunode_image: ${{ inputs.nim_wakunode_image || 'wakuorg/nwaku:v0.31.0' }} + nim_wakunode_image: ${{ inputs.nim_wakunode_image || 'wakuorg/nwaku:v0.34.0' }} test_type: node allure_reports: true node_optional: uses: ./.github/workflows/test-node.yml with: - nim_wakunode_image: ${{ inputs.nim_wakunode_image || 'wakuorg/nwaku:v0.31.0' }} + nim_wakunode_image: ${{ inputs.nim_wakunode_image || 'wakuorg/nwaku:v0.34.0' }} test_type: node-optional node_with_nwaku_master: diff --git a/.github/workflows/test-node.yml b/.github/workflows/test-node.yml index 980bfce964..902a7c7292 100644 --- a/.github/workflows/test-node.yml +++ b/.github/workflows/test-node.yml @@ -33,6 +33,7 @@ env: jobs: node: runs-on: ubuntu-latest + timeout-minutes: 60 # Add a 1-hour timeout to fail faster env: WAKUNODE_IMAGE: ${{ inputs.nim_wakunode_image }} ALLURE_REPORTS: ${{ inputs.allure_reports }} diff --git a/packages/tests/.mocharc.cjs b/packages/tests/.mocharc.cjs index f2ea3f66ce..357ba32890 100644 --- a/packages/tests/.mocharc.cjs +++ b/packages/tests/.mocharc.cjs @@ -7,7 +7,8 @@ const config = { 'loader=ts-node/esm' ], exit: true, - retries: 4 + retries: 2, + timeout: 150_000 }; if (process.env.CI) { diff --git a/packages/tests/src/lib/dockerode.ts b/packages/tests/src/lib/dockerode.ts index 823a02859b..5aa8e2db22 100644 --- a/packages/tests/src/lib/dockerode.ts +++ b/packages/tests/src/lib/dockerode.ts @@ -18,7 +18,7 @@ export default class Dockerode { public containerId?: string; private static network: Docker.Network; - private containerIp: string; + public readonly containerIp: string; private constructor(imageName: string, containerIp: string) { this.docker = new Docker(); @@ -107,6 +107,7 @@ export default class Dockerode { const container = await this.docker.createContainer({ Image: this.IMAGE_NAME, HostConfig: { + NetworkMode: NETWORK_NAME, AutoRemove: true, PortBindings: { [`${restPort}/tcp`]: [{ HostPort: restPort.toString() }], @@ -116,6 +117,8 @@ export default class Dockerode { [`${discv5UdpPort}/udp`]: [{ HostPort: discv5UdpPort.toString() }] }) }, + Dns: ["8.8.8.8"], + Links: [], Mounts: args.rlnRelayEthClientAddress ? [ { @@ -135,18 +138,19 @@ export default class Dockerode { [`${discv5UdpPort}/udp`]: {} }) }, - Cmd: argsArrayWithIP - }); - await container.start(); - - await Dockerode.network.connect({ - Container: container.id, - EndpointConfig: { - IPAMConfig: { - IPv4Address: this.containerIp + Cmd: argsArrayWithIP, + NetworkingConfig: { + EndpointsConfig: { + [NETWORK_NAME]: { + IPAMConfig: { + IPv4Address: this.containerIp + } + } } } }); + await container.start(); + const logStream = fs.createWriteStream(logPath); container.logs( diff --git a/packages/tests/src/lib/index.ts b/packages/tests/src/lib/index.ts index 8f804633a3..8562a2e989 100644 --- a/packages/tests/src/lib/index.ts +++ b/packages/tests/src/lib/index.ts @@ -29,24 +29,29 @@ export class ServiceNodesFleet { _args?: Args, withoutFilter = false ): Promise { - const serviceNodePromises = Array.from( - { length: nodesToCreate }, - async () => { - const node = new ServiceNode( - makeLogFileName(mochaContext) + - Math.random().toString(36).substring(7) - ); + const nodes: ServiceNode[] = []; - const args = getArgs(networkConfig, _args); - await node.start(args, { - retries: 3 - }); + for (let i = 0; i < nodesToCreate; i++) { + const node = new ServiceNode( + makeLogFileName(mochaContext) + Math.random().toString(36).substring(7) + ); - return node; + const args = getArgs(networkConfig, _args); + + // If this is not the first node and previous node had a nodekey, use its multiaddr as static node + if (i > 0) { + const prevNode = nodes[i - 1]; + const multiaddr = await prevNode.getExternalWebsocketMultiaddr(); + args.staticnode = multiaddr; } - ); - const nodes = await Promise.all(serviceNodePromises); + await node.start(args, { + retries: 3 + }); + + nodes.push(node); + } + return new ServiceNodesFleet(nodes, withoutFilter, strictChecking); } @@ -107,7 +112,24 @@ export class ServiceNodesFleet { return relayMessages.every((message) => message); } - public async confirmMessageLength(numMessages: number): Promise { + public async confirmMessageLength( + numMessages: number, + { encryptedPayload }: { encryptedPayload?: boolean } = { + encryptedPayload: false + } + ): Promise { + if (encryptedPayload) { + const filteredMessageList = Array.from( + new Set( + this.messageCollector.messageList + .filter((msg) => msg.payload?.toString) + .map((msg) => msg.payload.toString()) + ) + ); + expect(filteredMessageList.length).to.equal(numMessages); + return; + } + if (this.strictChecking) { await Promise.all( this.nodes.map(async (node) => @@ -132,7 +154,7 @@ export class ServiceNodesFleet { class MultipleNodesMessageCollector { public callback: (msg: DecodedMessage) => void = () => {}; - protected messageList: Array = []; + public readonly messageList: Array = []; public constructor( private messageCollectors: MessageCollector[], private relayNodes?: ServiceNode[], @@ -182,21 +204,21 @@ class MultipleNodesMessageCollector { } ): boolean { if (this.strictChecking) { - return this.messageCollectors.every((collector) => { + return this.messageCollectors.every((collector, _i) => { try { collector.verifyReceivedMessage(index, options); - return true; // Verification successful + return true; } catch (error) { - return false; // Verification failed, continue with the next collector + return false; } }); } else { - return this.messageCollectors.some((collector) => { + return this.messageCollectors.some((collector, _i) => { try { collector.verifyReceivedMessage(index, options); - return true; // Verification successful + return true; } catch (error) { - return false; // Verification failed, continue with the next collector + return false; } }); } @@ -213,37 +235,122 @@ class MultipleNodesMessageCollector { exact?: boolean; } ): Promise { - const startTime = Date.now(); const pubsubTopic = options?.pubsubTopic || DefaultTestPubsubTopic; const timeoutDuration = options?.timeoutDuration || 400; + const maxTimeout = Math.min(timeoutDuration * numMessages, 30000); + const exact = options?.exact || false; + + try { + const timeoutPromise = new Promise((resolve) => { + setTimeout(() => { + log.warn(`Timeout waiting for messages after ${maxTimeout}ms`); + resolve(false); + }, maxTimeout); + }); + + const checkMessagesPromise = new Promise((resolve) => { + const checkMessages = (): void => { + // Check local messages + if (this.messageList.length >= numMessages) { + if (exact && this.messageList.length !== numMessages) { + log.warn( + `Was expecting exactly ${numMessages} messages. Received: ${this.messageList.length}` + ); + resolve(false); + return; + } + resolve(true); + return; + } + + if (this.relayNodes) { + void Promise.all( + this.relayNodes.map((node) => node.messages(pubsubTopic)) + ).then((nodeMessages) => { + const hasEnoughMessages = this.strictChecking + ? nodeMessages.every((msgs) => msgs.length >= numMessages) + : nodeMessages.some((msgs) => msgs.length >= numMessages); + + if (hasEnoughMessages) { + resolve(true); + return; + } + setTimeout(checkMessages, 100); + }); + } else { + setTimeout(checkMessages, 100); + } + }; + + // Start checking + checkMessages(); + }); + + // Race between timeout and message checking + return Promise.race([timeoutPromise, checkMessagesPromise]); + } catch (error) { + log.error("Error in waitForMessages:", error); + return false; + } + } + + /** + * Waits for a total number of messages across all nodes using autosharding. + */ + public async waitForMessagesAutosharding( + numMessages: number, + options?: { + contentTopic: string; + timeoutDuration?: number; + exact?: boolean; + } + ): Promise { + const startTime = Date.now(); + const timeoutDuration = options?.timeoutDuration || 400; const exact = options?.exact || false; while (this.messageList.length < numMessages) { if (this.relayNodes) { if (this.strictChecking) { + // In strict mode, all nodes must have the messages const results = await Promise.all( - this.relayNodes.map(async (node) => { - const msgs = await node.messages(pubsubTopic); - return msgs.length >= numMessages; + this.messageCollectors.map(async (collector) => { + return collector.waitForMessagesAutosharding( + numMessages, + options + ); }) ); - return results.every((result) => result); + if (results.every((result) => result)) { + return true; + } } else { + // In non-strict mode, at least one node must have the messages const results = await Promise.all( - this.relayNodes.map(async (node) => { - const msgs = await node.messages(pubsubTopic); - return msgs.length >= numMessages; + this.messageCollectors.map(async (collector) => { + return collector.waitForMessagesAutosharding( + numMessages, + options + ); }) ); - return results.some((result) => result); + if (results.some((result) => result)) { + return true; + } } - } - if (Date.now() - startTime > timeoutDuration * numMessages) { - return false; - } + if (Date.now() - startTime > timeoutDuration * numMessages) { + return false; + } - await delay(10); + await delay(10); + } else { + // If no relay nodes, just wait for messages in the list + if (Date.now() - startTime > timeoutDuration * numMessages) { + return false; + } + await delay(10); + } } if (exact) { @@ -253,7 +360,6 @@ class MultipleNodesMessageCollector { log.warn( `Was expecting exactly ${numMessages} messages. Received: ${this.messageList.length}` ); - return false; } } else { diff --git a/packages/tests/src/lib/service_node.ts b/packages/tests/src/lib/service_node.ts index b443fd6cb2..52ec598cbd 100644 --- a/packages/tests/src/lib/service_node.ts +++ b/packages/tests/src/lib/service_node.ts @@ -27,7 +27,7 @@ const WAKU_SERVICE_NODE_PARAMS = const NODE_READY_LOG_LINE = "Node setup complete"; export const DOCKER_IMAGE_NAME = - process.env.WAKUNODE_IMAGE || "wakuorg/nwaku:v0.31.0"; + process.env.WAKUNODE_IMAGE || "wakuorg/nwaku:v0.34.0"; const isGoWaku = DOCKER_IMAGE_NAME.includes("go-waku"); @@ -402,6 +402,15 @@ export class ServiceNode { throw `${this.type} container hasn't started`; } } + + public async getExternalWebsocketMultiaddr(): Promise { + if (!this.docker?.container) { + return undefined; + } + const containerIp = this.docker.containerIp; + const peerId = await this.getPeerId(); + return `/ip4/${containerIp}/tcp/${this.websocketPort}/ws/p2p/${peerId}`; + } } export function defaultArgs(): Args { @@ -422,3 +431,20 @@ interface RpcInfoResponse { listenAddresses: string[]; enrUri?: string; } + +export async function verifyServiceNodesConnected( + nodes: ServiceNode[] +): Promise { + for (const node of nodes) { + const peers = await node.peers(); + log.info(`Service node ${node.containerName} peers:`, peers.length); + log.info(`Service node ${node.containerName} peers:`, peers); + + if (nodes.length > 1 && peers.length === 0) { + log.error(`Service node ${node.containerName} has no peers connected`); + return false; + } + } + + return true; +} diff --git a/packages/tests/src/run-tests.js b/packages/tests/src/run-tests.js index b468bdc00e..60f3cf6c33 100644 --- a/packages/tests/src/run-tests.js +++ b/packages/tests/src/run-tests.js @@ -3,7 +3,7 @@ import { promisify } from "util"; const execAsync = promisify(exec); -const WAKUNODE_IMAGE = process.env.WAKUNODE_IMAGE || "wakuorg/nwaku:v0.31.0"; +const WAKUNODE_IMAGE = process.env.WAKUNODE_IMAGE || "wakuorg/nwaku:v0.34.0"; async function main() { try { @@ -40,6 +40,15 @@ async function main() { mocha.on("exit", (code) => { console.log(`Mocha tests exited with code ${code}`); + try { + execAsync( + `docker ps -q -f "ancestor=${WAKUNODE_IMAGE}" | xargs -r docker stop` + ).catch((error) => { + console.error("Error cleaning up containers:", error); + }); + } catch (error) { + console.error("Error cleaning up containers:", error); + } process.exit(code || 0); }); } diff --git a/packages/tests/src/sync-rln-tree.js b/packages/tests/src/sync-rln-tree.js index d69c3d1899..a84a40d741 100644 --- a/packages/tests/src/sync-rln-tree.js +++ b/packages/tests/src/sync-rln-tree.js @@ -7,7 +7,7 @@ import { ServiceNode } from "./lib/index.js"; const execAsync = promisify(exec); -const WAKUNODE_IMAGE = process.env.WAKUNODE_IMAGE || "wakuorg/nwaku:v0.31.0"; +const WAKUNODE_IMAGE = process.env.WAKUNODE_IMAGE || "wakuorg/nwaku:v0.34.0"; const containerName = "rln_tree"; async function syncRlnTree() { diff --git a/packages/tests/src/utils/nodes.ts b/packages/tests/src/utils/nodes.ts index c3e8cc7a9b..b5f3512094 100644 --- a/packages/tests/src/utils/nodes.ts +++ b/packages/tests/src/utils/nodes.ts @@ -7,12 +7,13 @@ import { Protocols } from "@waku/interfaces"; import { createLightNode } from "@waku/sdk"; -import { derivePubsubTopicsFromNetworkConfig } from "@waku/utils"; +import { delay, derivePubsubTopicsFromNetworkConfig } from "@waku/utils"; import { Context } from "mocha"; import pRetry from "p-retry"; import { NOISE_KEY_1 } from "../constants.js"; import { ServiceNodesFleet } from "../lib/index.js"; +import { verifyServiceNodesConnected } from "../lib/service_node.js"; import { Args } from "../types.js"; import { waitForConnections } from "./waitForConnections.js"; @@ -35,6 +36,13 @@ export async function runMultipleNodes( withoutFilter ); + if (numServiceNodes > 1) { + const success = await verifyServiceNodesConnected(serviceNodes.nodes); + if (!success) { + throw new Error("Failed to verify that service nodes are connected"); + } + } + const wakuOptions: CreateNodeOptions = { staticNoiseKey: NOISE_KEY_1, libp2p: { @@ -50,24 +58,34 @@ export async function runMultipleNodes( throw new Error("Failed to initialize waku"); } + //TODO: reinvestigate the need for these delays with nwaku:0.35.0: https://github.com/waku-org/js-waku/issues/2264 + await delay(2000); + for (const node of serviceNodes.nodes) { await waku.dial(await node.getMultiaddrWithId()); await waku.waitForPeers([Protocols.Filter, Protocols.LightPush]); - await node.ensureSubscriptions( + const success = await node.ensureSubscriptions( derivePubsubTopicsFromNetworkConfig(networkConfig) ); - - const wakuConnections = waku.libp2p.getConnections(); - - if (wakuConnections.length < 1) { - throw new Error(`Expected at least 1 connection for js-waku.`); + if (!success) { + throw new Error("Failed to ensure subscriptions"); } + //TODO: reinvestigate the need for these delays with nwaku:0.35.0: https://github.com/waku-org/js-waku/issues/2264 + await delay(2000); + await node.waitForLog(waku.libp2p.peerId.toString(), 100); } await waitForConnections(numServiceNodes, waku); + const wakuConnections = waku.libp2p.getConnections(); + if (wakuConnections.length < numServiceNodes) { + throw new Error( + `Expected at least ${numServiceNodes} connections for js-waku.` + ); + } + return [serviceNodes, waku]; } diff --git a/packages/tests/src/utils/teardown.ts b/packages/tests/src/utils/teardown.ts index 86257ceb41..a4fc2573b2 100644 --- a/packages/tests/src/utils/teardown.ts +++ b/packages/tests/src/utils/teardown.ts @@ -6,6 +6,8 @@ import { ServiceNode } from "../lib/service_node.js"; const log = new Logger("test:teardown"); +const TEARDOWN_TIMEOUT = 10000; // 10 seconds timeout for teardown + export async function tearDownNodes( nwakuNodes: ServiceNode | ServiceNode[], wakuNodes: IWaku | IWaku[] @@ -13,37 +15,47 @@ export async function tearDownNodes( const nNodes = Array.isArray(nwakuNodes) ? nwakuNodes : [nwakuNodes]; const wNodes = Array.isArray(wakuNodes) ? wakuNodes : [wakuNodes]; - const stopNwakuNodes = nNodes.map(async (nwaku) => { - if (nwaku) { - await pRetry( - async () => { - try { - await nwaku.stop(); - } catch (error) { - log.error("Nwaku failed to stop:", error); - throw error; - } - }, - { retries: 3 } - ); - } - }); - - const stopWakuNodes = wNodes.map(async (waku) => { - if (waku) { - await pRetry( - async () => { + try { + // Use Promise.race to implement timeout + const teardownPromise = Promise.all([ + ...nNodes.map(async (nwaku) => { + if (nwaku) { + await pRetry( + async () => { + try { + await nwaku.stop(); + } catch (error) { + log.error("Nwaku failed to stop:", error); + throw error; + } + }, + { retries: 3, minTimeout: 1000 } + ); + } + }), + ...wNodes.map(async (waku) => { + if (waku) { try { await waku.stop(); } catch (error) { log.error("Waku failed to stop:", error); - throw error; } - }, - { retries: 3 } - ); - } - }); + } + }) + ]); - await Promise.all([...stopNwakuNodes, ...stopWakuNodes]); + await Promise.race([ + teardownPromise, + new Promise((_, reject) => + setTimeout( + () => reject(new Error("Teardown timeout")), + TEARDOWN_TIMEOUT + ) + ) + ]); + } catch (error) { + log.error("Teardown failed:", error); + // Force process cleanup if needed + process.exit(1); + } } diff --git a/packages/tests/tests/dns-peer-discovery.spec.ts b/packages/tests/tests/dns-peer-discovery.spec.ts index ad3ab013db..e05d97fe8c 100644 --- a/packages/tests/tests/dns-peer-discovery.spec.ts +++ b/packages/tests/tests/dns-peer-discovery.spec.ts @@ -56,7 +56,7 @@ describe("DNS Node Discovery [live data]", function () { }); it(`should use DNS peer discovery with light client`, async function () { - this.timeout(100000); + this.timeout(100_000); const maxQuantity = 3; const nodeRequirements = { diff --git a/packages/tests/tests/ephemeral.node.spec.ts b/packages/tests/tests/ephemeral.node.spec.ts index 2abed950d2..6219b081ad 100644 --- a/packages/tests/tests/ephemeral.node.spec.ts +++ b/packages/tests/tests/ephemeral.node.spec.ts @@ -23,11 +23,11 @@ import { afterEachCustom, beforeEachCustom, delay, - makeLogFileName, NOISE_KEY_1, NOISE_KEY_2, - ServiceNode, - tearDownNodes + runMultipleNodes, + ServiceNodesFleet, + teardownNodesWithRedundancy } from "../src/index.js"; const log = new Logger("test:ephemeral"); @@ -76,41 +76,39 @@ const SymDecoder = createSymDecoder(SymContentTopic, symKey, PubsubTopic); describe("Waku Message Ephemeral field", function () { let waku: LightNode; - let nwaku: ServiceNode; + let serviceNodes: ServiceNodesFleet; afterEachCustom(this, async () => { - await tearDownNodes(nwaku, waku); + await teardownNodesWithRedundancy(serviceNodes, waku); }); beforeEachCustom(this, async () => { - nwaku = new ServiceNode(makeLogFileName(this.ctx)); - await nwaku.start({ - filter: true, - lightpush: true, - store: true, - relay: true, - pubsubTopic: [PubsubTopic], - contentTopic: [TestContentTopic, AsymContentTopic, SymContentTopic], - clusterId: ClusterId - }); - await nwaku.ensureSubscriptionsAutosharding([ - TestContentTopic, - AsymContentTopic, - SymContentTopic - ]); + [serviceNodes, waku] = await runMultipleNodes( + this.ctx, + { + clusterId: ClusterId, + contentTopics: [TestContentTopic, AsymContentTopic, SymContentTopic] + }, + { + filter: true, + lightpush: true, + store: true, + relay: true, + pubsubTopic: [PubsubTopic] + }, + true, + 2 + ); - waku = await createLightNode({ - staticNoiseKey: NOISE_KEY_1, - libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }, - networkConfig: { - contentTopics: [TestContentTopic, AsymContentTopic, SymContentTopic], - clusterId: ClusterId - } - }); - await waku.start(); - await waku.dial(await nwaku.getMultiaddrWithId()); - - await waku.waitForPeers([Protocols.Filter, Protocols.LightPush]); + await Promise.all( + serviceNodes.nodes.map(async (node) => { + await node.ensureSubscriptionsAutosharding([ + TestContentTopic, + AsymContentTopic, + SymContentTopic + ]); + }) + ); }); it("Ephemeral messages are not stored", async function () { @@ -130,7 +128,7 @@ describe("Waku Message Ephemeral field", function () { payload: utf8ToBytes(clearText) }; - const [waku1, waku2, nimWakuMultiaddr] = await Promise.all([ + const [waku1, waku2] = await Promise.all([ createLightNode({ staticNoiseKey: NOISE_KEY_1, networkConfig: { @@ -144,16 +142,18 @@ describe("Waku Message Ephemeral field", function () { contentTopics: [TestContentTopic, AsymContentTopic, SymContentTopic], clusterId: ClusterId } - }).then((waku) => waku.start().then(() => waku)), - nwaku.getMultiaddrWithId() + }).then((waku) => waku.start().then(() => waku)) ]); log.info("Waku nodes created"); - await Promise.all([ - waku1.dial(nimWakuMultiaddr), - waku2.dial(nimWakuMultiaddr) - ]); + await Promise.all( + serviceNodes.nodes.map(async (node) => { + const multiaddr = await node.getMultiaddrWithId(); + await waku1.dial(multiaddr); + await waku2.dial(multiaddr); + }) + ); log.info("Waku nodes connected to nwaku"); @@ -186,7 +186,7 @@ describe("Waku Message Ephemeral field", function () { expect(messages?.length).eq(0); - await tearDownNodes([], [waku1, waku2]); + await teardownNodesWithRedundancy(serviceNodes, [waku1, waku2]); }); it("Ephemeral field is preserved - encoder v0", async function () { diff --git a/packages/tests/tests/filter/subscribe.node.spec.ts b/packages/tests/tests/filter/subscribe.node.spec.ts index af281c039c..8098fd6745 100644 --- a/packages/tests/tests/filter/subscribe.node.spec.ts +++ b/packages/tests/tests/filter/subscribe.node.spec.ts @@ -102,8 +102,7 @@ const runTests = (strictCheckNodes: boolean): void => { expectedVersion: 1, expectedPubsubTopic: TestPubsubTopic }); - - await serviceNodes.confirmMessageLength(1); + await serviceNodes.confirmMessageLength(1, { encryptedPayload: true }); }); it("Subscribe and receive symmetrically encrypted messages via lightPush", async function () { @@ -136,7 +135,7 @@ const runTests = (strictCheckNodes: boolean): void => { expectedPubsubTopic: TestPubsubTopic }); - await serviceNodes.confirmMessageLength(1); + await serviceNodes.confirmMessageLength(1, { encryptedPayload: true }); }); it("Subscribe and receive messages via waku relay post", async function () { diff --git a/packages/tests/tests/light-push/index.node.spec.ts b/packages/tests/tests/light-push/index.node.spec.ts index d9bc508bc5..060a283982 100644 --- a/packages/tests/tests/light-push/index.node.spec.ts +++ b/packages/tests/tests/light-push/index.node.spec.ts @@ -50,7 +50,7 @@ const runTests = (strictNodeCheck: boolean): void => { const pushResponse = await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes(testItem.value) }); - expect(pushResponse.successes.length).to.eq(numServiceNodes); + expect(pushResponse.successes.length).to.equal(numServiceNodes); expect( await serviceNodes.messageCollector.waitForMessages(1, { @@ -65,24 +65,24 @@ const runTests = (strictNodeCheck: boolean): void => { }); }); - it("Push 30 different messages", async function () { + it("Push 5 different messages", async function () { const generateMessageText = (index: number): string => `M${index}`; - for (let i = 0; i < 30; i++) { + for (let i = 0; i < 5; i++) { const pushResponse = await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes(generateMessageText(i)) }); - - expect(pushResponse.successes.length).to.eq(numServiceNodes); + if (pushResponse.failures.length > 0) console.log(i + 1); + expect(pushResponse.successes.length).to.equal(numServiceNodes); } expect( - await serviceNodes.messageCollector.waitForMessages(30, { + await serviceNodes.messageCollector.waitForMessages(5, { pubsubTopic: TestPubsubTopic }) ).to.eq(true); - for (let i = 0; i < 30; i++) { + for (let i = 0; i < 5; i++) { serviceNodes.messageCollector.verifyReceivedMessage(i, { expectedMessageText: generateMessageText(i), expectedContentTopic: TestContentTopic, @@ -119,7 +119,7 @@ const runTests = (strictNodeCheck: boolean): void => { customEncoder, messagePayload ); - expect(pushResponse.successes.length).to.eq(numServiceNodes); + expect(pushResponse.successes.length).to.equal(numServiceNodes); expect( await serviceNodes.messageCollector.waitForMessages(1, { @@ -156,7 +156,7 @@ const runTests = (strictNodeCheck: boolean): void => { customTestEncoder, messagePayload ); - expect(pushResponse.successes.length).to.eq(numServiceNodes); + expect(pushResponse.successes.length).to.equal(numServiceNodes); expect( await serviceNodes.messageCollector.waitForMessages(1, { @@ -190,7 +190,7 @@ const runTests = (strictNodeCheck: boolean): void => { ); if (serviceNodes.type == "go-waku") { - expect(pushResponse.successes.length).to.eq(numServiceNodes); + expect(pushResponse.successes.length).to.equal(numServiceNodes); expect( await serviceNodes.messageCollector.waitForMessages(1, { pubsubTopic: TestPubsubTopic @@ -229,7 +229,7 @@ const runTests = (strictNodeCheck: boolean): void => { payload: utf8ToBytes(messageText), rateLimitProof: rateLimitProof }); - expect(pushResponse.successes.length).to.eq(numServiceNodes); + expect(pushResponse.successes.length).to.equal(numServiceNodes); expect( await serviceNodes.messageCollector.waitForMessages(1, { @@ -253,7 +253,7 @@ const runTests = (strictNodeCheck: boolean): void => { payload: utf8ToBytes(messageText), timestamp: new Date(testItem) }); - expect(pushResponse.successes.length).to.eq(numServiceNodes); + expect(pushResponse.successes.length).to.equal(numServiceNodes); expect( await serviceNodes.messageCollector.waitForMessages(1, { @@ -274,7 +274,7 @@ const runTests = (strictNodeCheck: boolean): void => { const pushResponse = await waku.lightPush.send(TestEncoder, { payload: bigPayload }); - expect(pushResponse.successes.length).to.greaterThan(0); + expect(pushResponse.successes.length).to.equal(numServiceNodes); }); it("Fails to push message bigger that 1MB", async function () { diff --git a/packages/tests/tests/light-push/single_node/index.node.spec.ts b/packages/tests/tests/light-push/single_node/index.node.spec.ts index a8b8835264..af0c3232d6 100644 --- a/packages/tests/tests/light-push/single_node/index.node.spec.ts +++ b/packages/tests/tests/light-push/single_node/index.node.spec.ts @@ -7,7 +7,6 @@ import { afterEachCustom, beforeEachCustom, generateRandomUint8Array, - MessageCollector, ServiceNode, tearDownNodes, TEST_STRING @@ -22,16 +21,16 @@ import { TestShardInfo } from "../utils.js"; -describe("Waku Light Push: Single Node", function () { +// These tests are expected to fail as service nodes now require at least one more connected node: https://github.com/waku-org/nwaku/pull/2951/files + +describe("Waku Light Push: Single Node: Fails as expected", function () { // Set the timeout for all tests in this suite. Can be overwritten at test level this.timeout(15000); let waku: LightNode; let nwaku: ServiceNode; - let messageCollector: MessageCollector; beforeEachCustom(this, async () => { [nwaku, waku] = await runNodes(this.ctx, TestShardInfo); - messageCollector = new MessageCollector(nwaku); await nwaku.ensureSubscriptions([TestPubsubTopic]); }); @@ -45,18 +44,9 @@ describe("Waku Light Push: Single Node", function () { const pushResponse = await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes(testItem.value) }); - expect(pushResponse.successes.length).to.eq(1); - - expect( - await messageCollector.waitForMessages(1, { - pubsubTopic: TestPubsubTopic - }) - ).to.eq(true); - messageCollector.verifyReceivedMessage(0, { - expectedMessageText: testItem.value, - expectedContentTopic: TestContentTopic, - expectedPubsubTopic: TestPubsubTopic - }); + // Expect failure since node requires another connected node + expect(pushResponse.successes.length).to.eq(0); + expect(pushResponse.failures?.length).to.be.greaterThan(0); }); }); @@ -67,73 +57,9 @@ describe("Waku Light Push: Single Node", function () { const pushResponse = await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes(generateMessageText(i)) }); - expect(pushResponse.successes.length).to.eq(1); - } - - expect( - await messageCollector.waitForMessages(30, { - pubsubTopic: TestPubsubTopic - }) - ).to.eq(true); - - for (let i = 0; i < 30; i++) { - messageCollector.verifyReceivedMessage(i, { - expectedMessageText: generateMessageText(i), - expectedContentTopic: TestContentTopic, - expectedPubsubTopic: TestPubsubTopic - }); - } - }); - - it("Throws when trying to push message with empty payload", async function () { - const pushResponse = await waku.lightPush.send(TestEncoder, { - payload: new Uint8Array() - }); - - expect(pushResponse.successes.length).to.eq(0); - expect(pushResponse.failures?.map((failure) => failure.error)).to.include( - ProtocolError.EMPTY_PAYLOAD - ); - expect( - await messageCollector.waitForMessages(1, { - pubsubTopic: TestPubsubTopic - }) - ).to.eq(false); - }); - - TEST_STRING.forEach((testItem) => { - it(`Push message with content topic containing ${testItem.description}`, async function () { - const customEncoder = createEncoder({ - contentTopic: testItem.value, - pubsubTopic: TestPubsubTopic - }); - const pushResponse = await waku.lightPush.send( - customEncoder, - messagePayload - ); - expect(pushResponse.successes.length).to.eq(1); - - expect( - await messageCollector.waitForMessages(1, { - pubsubTopic: TestPubsubTopic - }) - ).to.eq(true); - messageCollector.verifyReceivedMessage(0, { - expectedMessageText: messageText, - expectedContentTopic: testItem.value, - expectedPubsubTopic: TestPubsubTopic - }); - }); - }); - - it("Fails to push message with empty content topic", async function () { - try { - createEncoder({ contentTopic: "" }); - expect.fail("Expected an error but didn't get one"); - } catch (error) { - expect((error as Error).message).to.equal( - "Content topic must be specified" - ); + // Expect failure since node requires another connected node + expect(pushResponse.successes.length).to.eq(0); + expect(pushResponse.failures?.length).to.be.greaterThan(0); } }); @@ -148,62 +74,19 @@ describe("Waku Light Push: Single Node", function () { customTestEncoder, messagePayload ); - expect(pushResponse.successes.length).to.eq(1); - - expect( - await messageCollector.waitForMessages(1, { - pubsubTopic: TestPubsubTopic - }) - ).to.eq(true); - messageCollector.verifyReceivedMessage(0, { - expectedMessageText: messageText, - expectedContentTopic: TestContentTopic, - expectedPubsubTopic: TestPubsubTopic - }); + expect(pushResponse.successes.length).to.eq(0); + expect(pushResponse.failures?.length).to.be.greaterThan(0); }); - it("Fails to push message with large meta", async function () { - const customTestEncoder = createEncoder({ - contentTopic: TestContentTopic, - pubsubTopic: TestPubsubTopic, - metaSetter: () => new Uint8Array(105024) // see the note below *** + it("Fails to push message with empty payload", async function () { + const pushResponse = await waku.lightPush.send(TestEncoder, { + payload: new Uint8Array() }); - // *** note: this test used 10 ** 6 when `nwaku` node had MaxWakuMessageSize == 1MiB ( 1*2^20 .) - // `nwaku` establishes the max lightpush msg size as `const MaxRpcSize* = MaxWakuMessageSize + 64 * 1024` - // see: https://github.com/waku-org/nwaku/blob/07beea02095035f4f4c234ec2dec1f365e6955b8/waku/waku_lightpush/rpc_codec.nim#L15 - // In the PR https://github.com/waku-org/nwaku/pull/2298 we reduced the MaxWakuMessageSize - // from 1MiB to 150KiB. Therefore, the 105024 number comes from substracting ( 1*2^20 - 150*2^10 ) - // to the original 10^6 that this test had when MaxWakuMessageSize == 1*2^20 - - const pushResponse = await waku.lightPush.send( - customTestEncoder, - messagePayload + expect(pushResponse.successes.length).to.eq(0); + expect(pushResponse.failures?.map((failure) => failure.error)).to.include( + ProtocolError.EMPTY_PAYLOAD ); - - if (nwaku.type == "go-waku") { - expect(pushResponse.successes.length).to.eq(1); - expect( - await messageCollector.waitForMessages(1, { - pubsubTopic: TestPubsubTopic - }) - ).to.eq(true); - messageCollector.verifyReceivedMessage(0, { - expectedMessageText: messageText, - expectedContentTopic: TestContentTopic, - expectedPubsubTopic: TestPubsubTopic - }); - } else { - expect(pushResponse.successes.length).to.eq(0); - expect(pushResponse.failures?.map((failure) => failure.error)).to.include( - ProtocolError.REMOTE_PEER_REJECTED - ); - expect( - await messageCollector.waitForMessages(1, { - pubsubTopic: TestPubsubTopic - }) - ).to.eq(false); - } }); it("Push message with rate limit", async function () { @@ -221,18 +104,8 @@ describe("Waku Light Push: Single Node", function () { payload: utf8ToBytes(messageText), rateLimitProof: rateLimitProof }); - expect(pushResponse.successes.length).to.eq(1); - - expect( - await messageCollector.waitForMessages(1, { - pubsubTopic: TestPubsubTopic - }) - ).to.eq(true); - messageCollector.verifyReceivedMessage(0, { - expectedMessageText: messageText, - expectedContentTopic: TestContentTopic, - expectedPubsubTopic: TestPubsubTopic - }); + expect(pushResponse.successes.length).to.eq(0); + expect(pushResponse.failures?.length).to.be.greaterThan(0); }); [ @@ -245,19 +118,8 @@ describe("Waku Light Push: Single Node", function () { payload: utf8ToBytes(messageText), timestamp: new Date(testItem) }); - expect(pushResponse.successes.length).to.eq(1); - - expect( - await messageCollector.waitForMessages(1, { - pubsubTopic: TestPubsubTopic - }) - ).to.eq(true); - messageCollector.verifyReceivedMessage(0, { - expectedMessageText: messageText, - expectedTimestamp: testItem, - expectedContentTopic: TestContentTopic, - expectedPubsubTopic: TestPubsubTopic - }); + expect(pushResponse.successes.length).to.eq(0); + expect(pushResponse.failures?.length).to.be.greaterThan(0); }); }); @@ -266,7 +128,8 @@ describe("Waku Light Push: Single Node", function () { const pushResponse = await waku.lightPush.send(TestEncoder, { payload: bigPayload }); - expect(pushResponse.successes.length).to.greaterThan(0); + expect(pushResponse.successes.length).to.eq(0); + expect(pushResponse.failures?.length).to.be.greaterThan(0); }); it("Fails to push message bigger that 1MB", async function () { @@ -279,10 +142,5 @@ describe("Waku Light Push: Single Node", function () { expect(pushResponse.failures?.map((failure) => failure.error)).to.include( ProtocolError.SIZE_TOO_BIG ); - expect( - await messageCollector.waitForMessages(1, { - pubsubTopic: TestPubsubTopic - }) - ).to.eq(false); }); }); diff --git a/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts b/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts index 02e6c7668a..7e5fdd71fe 100644 --- a/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts +++ b/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts @@ -16,24 +16,20 @@ import { } from "@waku/utils"; import { utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; -import { Context } from "mocha"; import { afterEachCustom, beforeEachCustom, - makeLogFileName, - MessageCollector, - ServiceNode, + runMultipleNodes, + ServiceNodesFleet, tearDownNodes } from "../../../src/index.js"; -import { messageText, runNodes } from "../utils.js"; +import { messageText } from "../utils.js"; describe("Waku Light Push : Multiple PubsubTopics", function () { this.timeout(30000); let waku: LightNode; - let nwaku: ServiceNode; - let nwaku2: ServiceNode; - let messageCollector: MessageCollector; + let serviceNodes: ServiceNodesFleet; const shardInfo: ShardInfo = { clusterId: 3, shards: [1, 2] }; const singleShardInfo1: SingleShardInfo = { clusterId: 3, shard: 1 }; @@ -55,13 +51,19 @@ describe("Waku Light Push : Multiple PubsubTopics", function () { let node1PeerId: PeerId; beforeEachCustom(this, async () => { - [nwaku, waku] = await runNodes(this.ctx, shardInfo); - messageCollector = new MessageCollector(nwaku); - node1PeerId = await nwaku.getPeerId(); + [serviceNodes, waku] = await runMultipleNodes( + this.ctx, + shardInfo, + undefined, + true, + 2, + true + ); + node1PeerId = await serviceNodes.nodes[0].getPeerId(); }); afterEachCustom(this, async () => { - await tearDownNodes([nwaku, nwaku2], waku); + await tearDownNodes(serviceNodes.nodes, waku); }); it("Push message on custom pubsubTopic", async function () { @@ -72,11 +74,11 @@ describe("Waku Light Push : Multiple PubsubTopics", function () { expect(pushResponse.successes[0].toString()).to.eq(node1PeerId.toString()); expect( - await messageCollector.waitForMessages(1, { + await serviceNodes.messageCollector.waitForMessages(1, { pubsubTopic: customPubsubTopic1 }) ).to.eq(true); - messageCollector.verifyReceivedMessage(0, { + serviceNodes.messageCollector.verifyReceivedMessage(0, { expectedMessageText: messageText, expectedContentTopic: customContentTopic1 }); @@ -92,50 +94,48 @@ describe("Waku Light Push : Multiple PubsubTopics", function () { expect(pushResponse1.successes[0].toString()).to.eq(node1PeerId.toString()); expect(pushResponse2.successes[0].toString()).to.eq(node1PeerId.toString()); - const messageCollector2 = new MessageCollector(nwaku); - expect( - await messageCollector.waitForMessages(1, { + await serviceNodes.messageCollector.waitForMessages(1, { pubsubTopic: customPubsubTopic1 }) ).to.eq(true); expect( - await messageCollector2.waitForMessages(1, { + await serviceNodes.messageCollector.waitForMessages(1, { pubsubTopic: customPubsubTopic2 }) ).to.eq(true); - messageCollector.verifyReceivedMessage(0, { + serviceNodes.messageCollector.verifyReceivedMessage(0, { expectedMessageText: "M1", expectedContentTopic: customContentTopic1, expectedPubsubTopic: customPubsubTopic1 }); - messageCollector2.verifyReceivedMessage(0, { + serviceNodes.messageCollector.verifyReceivedMessage(1, { expectedMessageText: "M2", expectedContentTopic: customContentTopic2, expectedPubsubTopic: customPubsubTopic2 }); }); - it("Light push messages to 2 nwaku nodes each with different pubsubtopics", async function () { - // Set up and start a new nwaku node with Default PubsubTopic - nwaku2 = new ServiceNode(makeLogFileName(this) + "2"); - await nwaku2.start({ - filter: true, - lightpush: true, - relay: true, - pubsubTopic: [singleShardInfoToPubsubTopic(singleShardInfo2)], - clusterId: singleShardInfo2.clusterId - }); - await nwaku2.ensureSubscriptions([ + it("Light push messages to 2 service nodes each with different pubsubtopics", async function () { + const [serviceNodes2, waku2] = await runMultipleNodes( + this.ctx, + { + clusterId: singleShardInfo2.clusterId, + shards: [singleShardInfo2.shard!] + }, + undefined, + true, + 1 + ); + + await serviceNodes2.nodes[0].ensureSubscriptions([ singleShardInfoToPubsubTopic(singleShardInfo2) ]); - await waku.dial(await nwaku2.getMultiaddrWithId()); + await waku.dial(await serviceNodes2.nodes[0].getMultiaddrWithId()); await waku.waitForPeers([Protocols.LightPush]); - const messageCollector2 = new MessageCollector(nwaku2); - await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") }); @@ -143,33 +143,33 @@ describe("Waku Light Push : Multiple PubsubTopics", function () { payload: utf8ToBytes("M2") }); - await messageCollector.waitForMessages(1, { + await serviceNodes.messageCollector.waitForMessages(1, { pubsubTopic: customPubsubTopic1 }); - - await messageCollector2.waitForMessages(1, { + await serviceNodes2.messageCollector.waitForMessages(1, { pubsubTopic: singleShardInfoToPubsubTopic(singleShardInfo2) }); - messageCollector.verifyReceivedMessage(0, { + serviceNodes.messageCollector.verifyReceivedMessage(0, { expectedMessageText: "M1", expectedContentTopic: customContentTopic1, expectedPubsubTopic: customPubsubTopic1 }); - messageCollector2.verifyReceivedMessage(0, { + serviceNodes2.messageCollector.verifyReceivedMessage(0, { expectedMessageText: "M2", expectedContentTopic: customContentTopic2, expectedPubsubTopic: singleShardInfoToPubsubTopic(singleShardInfo2) }); + + // Clean up second fleet + await tearDownNodes(serviceNodes2.nodes, waku2); }); }); describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () { this.timeout(30000); let waku: LightNode; - let nwaku: ServiceNode; - let nwaku2: ServiceNode; - let messageCollector: MessageCollector; + let serviceNodes: ServiceNodesFleet; const clusterId = 4; const customContentTopic1 = "/waku/2/content/test.js"; @@ -198,13 +198,19 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () { let node1PeerId: PeerId; beforeEachCustom(this, async () => { - [nwaku, waku] = await runNodes(this.ctx, shardInfo); - messageCollector = new MessageCollector(nwaku); - node1PeerId = await nwaku.getPeerId(); + [serviceNodes, waku] = await runMultipleNodes( + this.ctx, + shardInfo, + undefined, + true, + 2, + true + ); + node1PeerId = await serviceNodes.nodes[0].getPeerId(); }); afterEachCustom(this, async () => { - await tearDownNodes([nwaku, nwaku2], waku); + await tearDownNodes(serviceNodes.nodes, waku); }); it("Push message on custom pubsubTopic", async function () { @@ -212,15 +218,14 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () { payload: utf8ToBytes(messageText) }); - expect(pushResponse.failures).to.be.empty; expect(pushResponse.successes[0].toString()).to.eq(node1PeerId.toString()); expect( - await messageCollector.waitForMessagesAutosharding(1, { - contentTopic: customContentTopic1 + await serviceNodes.messageCollector.waitForMessages(1, { + pubsubTopic: autoshardingPubsubTopic1 }) ).to.eq(true); - messageCollector.verifyReceivedMessage(0, { + serviceNodes.messageCollector.verifyReceivedMessage(0, { expectedMessageText: messageText, expectedContentTopic: customContentTopic1 }); @@ -236,47 +241,45 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () { expect(pushResponse1.successes[0].toString()).to.eq(node1PeerId.toString()); expect(pushResponse2.successes[0].toString()).to.eq(node1PeerId.toString()); - const messageCollector2 = new MessageCollector(nwaku); - expect( - await messageCollector.waitForMessagesAutosharding(1, { - contentTopic: customContentTopic1 + await serviceNodes.messageCollector.waitForMessages(1, { + pubsubTopic: autoshardingPubsubTopic1 }) ).to.eq(true); expect( - await messageCollector2.waitForMessagesAutosharding(1, { - contentTopic: customContentTopic2 + await serviceNodes.messageCollector.waitForMessages(1, { + pubsubTopic: autoshardingPubsubTopic2 }) ).to.eq(true); - messageCollector.verifyReceivedMessage(0, { + serviceNodes.messageCollector.verifyReceivedMessage(0, { expectedMessageText: "M1", expectedContentTopic: customContentTopic1, expectedPubsubTopic: autoshardingPubsubTopic1 }); - messageCollector2.verifyReceivedMessage(0, { + serviceNodes.messageCollector.verifyReceivedMessage(1, { expectedMessageText: "M2", expectedContentTopic: customContentTopic2, expectedPubsubTopic: autoshardingPubsubTopic2 }); }); - it("Light push messages to 2 nwaku nodes each with different pubsubtopics", async function () { - // Set up and start a new nwaku node with Default PubsubTopic - nwaku2 = new ServiceNode(makeLogFileName(this) + "2"); - await nwaku2.start({ - filter: true, - lightpush: true, - relay: true, - pubsubTopic: [autoshardingPubsubTopic2], - clusterId: shardInfo.clusterId - }); - await nwaku2.ensureSubscriptionsAutosharding([customContentTopic2]); - await waku.dial(await nwaku2.getMultiaddrWithId()); - await waku.waitForPeers([Protocols.LightPush]); + it("Light push messages to 2 service nodes each with different pubsubtopics", async function () { + // Create a second fleet for the second pubsub topic + const [serviceNodes2, waku2] = await runMultipleNodes( + this.ctx, + { clusterId, contentTopics: [customContentTopic2] }, + undefined, + true, + 1 // Only need one node for second fleet + ); - const messageCollector2 = new MessageCollector(nwaku2); + await serviceNodes2.nodes[0].ensureSubscriptionsAutosharding([ + customContentTopic2 + ]); + await waku.dial(await serviceNodes2.nodes[0].getMultiaddrWithId()); + await waku.waitForPeers([Protocols.LightPush]); await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") @@ -285,34 +288,33 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () { payload: utf8ToBytes("M2") }); - await messageCollector.waitForMessagesAutosharding(1, { + await serviceNodes.messageCollector.waitForMessagesAutosharding(1, { contentTopic: customContentTopic1 }); - await messageCollector2.waitForMessagesAutosharding(1, { + await serviceNodes2.messageCollector.waitForMessagesAutosharding(1, { contentTopic: customContentTopic2 }); - messageCollector.verifyReceivedMessage(0, { + serviceNodes.messageCollector.verifyReceivedMessage(0, { expectedMessageText: "M1", expectedContentTopic: customContentTopic1, expectedPubsubTopic: autoshardingPubsubTopic1 }); - messageCollector2.verifyReceivedMessage(0, { + serviceNodes2.messageCollector.verifyReceivedMessage(0, { expectedMessageText: "M2", expectedContentTopic: customContentTopic2, expectedPubsubTopic: autoshardingPubsubTopic2 }); + + // Clean up second fleet + await tearDownNodes(serviceNodes2.nodes, waku2); }); }); describe("Waku Light Push (named sharding): Multiple PubsubTopics", function () { this.timeout(30000); let waku: LightNode; - let waku2: LightNode; - let nwaku: ServiceNode; - let nwaku2: ServiceNode; - let messageCollector: MessageCollector; - let ctx: Context; + let serviceNodes: ServiceNodesFleet; const clusterId = 3; const customContentTopic1 = "/waku/2/content/utf8"; @@ -355,14 +357,19 @@ describe("Waku Light Push (named sharding): Multiple PubsubTopics", function () let node1PeerId: PeerId; beforeEachCustom(this, async () => { - ctx = this.ctx; - [nwaku, waku] = await runNodes(ctx, testShardInfo); - messageCollector = new MessageCollector(nwaku); - node1PeerId = await nwaku.getPeerId(); + [serviceNodes, waku] = await runMultipleNodes( + this.ctx, + testShardInfo, + undefined, + true, + 2, + true + ); + node1PeerId = await serviceNodes.nodes[0].getPeerId(); }); afterEachCustom(this, async () => { - await tearDownNodes([nwaku, nwaku2], [waku, waku2]); + await tearDownNodes(serviceNodes.nodes, waku); }); it("Push message on custom pubsubTopic", async function () { @@ -373,11 +380,11 @@ describe("Waku Light Push (named sharding): Multiple PubsubTopics", function () expect(pushResponse.successes[0].toString()).to.eq(node1PeerId.toString()); expect( - await messageCollector.waitForMessages(1, { + await serviceNodes.messageCollector.waitForMessages(1, { pubsubTopic: autoshardingPubsubTopic1 }) ).to.eq(true); - messageCollector.verifyReceivedMessage(0, { + serviceNodes.messageCollector.verifyReceivedMessage(0, { expectedMessageText: messageText, expectedContentTopic: customContentTopic1 }); @@ -393,68 +400,71 @@ describe("Waku Light Push (named sharding): Multiple PubsubTopics", function () expect(pushResponse1.successes[0].toString()).to.eq(node1PeerId.toString()); expect(pushResponse2.successes[0].toString()).to.eq(node1PeerId.toString()); - const messageCollector2 = new MessageCollector(nwaku); - expect( - await messageCollector.waitForMessages(1, { + await serviceNodes.messageCollector.waitForMessages(1, { pubsubTopic: autoshardingPubsubTopic1 }) ).to.eq(true); expect( - await messageCollector2.waitForMessages(1, { + await serviceNodes.messageCollector.waitForMessages(1, { pubsubTopic: autoshardingPubsubTopic2 }) ).to.eq(true); - messageCollector.verifyReceivedMessage(0, { + serviceNodes.messageCollector.verifyReceivedMessage(0, { expectedMessageText: "M1", expectedContentTopic: customContentTopic1, expectedPubsubTopic: autoshardingPubsubTopic1 }); - messageCollector2.verifyReceivedMessage(0, { + serviceNodes.messageCollector.verifyReceivedMessage(1, { expectedMessageText: "M2", expectedContentTopic: customContentTopic2, expectedPubsubTopic: autoshardingPubsubTopic2 }); }); - it("Light push messages to 2 nwaku nodes each with different pubsubtopics", async function () { - // Set up and start a new nwaku node with Default PubsubTopic - [nwaku2] = await runNodes(ctx, shardInfo2); + it("Light push messages to 2 service nodes each with different pubsubtopics", async function () { + const [serviceNodes2, waku2] = await runMultipleNodes( + this.ctx, + shardInfo2, + undefined, + true, + 1 + ); - await nwaku2.ensureSubscriptions([autoshardingPubsubTopic2]); - await waku.dial(await nwaku2.getMultiaddrWithId()); + await serviceNodes2.nodes[0].ensureSubscriptions([ + autoshardingPubsubTopic2 + ]); + await waku.dial(await serviceNodes2.nodes[0].getMultiaddrWithId()); await waku.waitForPeers([Protocols.LightPush]); - const messageCollector2 = new MessageCollector(nwaku2); - - const { failures: f1 } = await waku.lightPush.send(customEncoder1, { + await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") }); - const { failures: f2 } = await waku.lightPush.send(customEncoder2, { + await waku.lightPush.send(customEncoder2, { payload: utf8ToBytes("M2") }); - expect(f1).to.be.empty; - expect(f2).to.be.empty; - - await messageCollector.waitForMessages(1, { + await serviceNodes.messageCollector.waitForMessages(1, { pubsubTopic: autoshardingPubsubTopic1 }); - await messageCollector2.waitForMessages(1, { + await serviceNodes2.messageCollector.waitForMessages(1, { pubsubTopic: autoshardingPubsubTopic2 }); - messageCollector.verifyReceivedMessage(0, { + serviceNodes.messageCollector.verifyReceivedMessage(0, { expectedMessageText: "M1", expectedContentTopic: customContentTopic1, expectedPubsubTopic: autoshardingPubsubTopic1 }); - messageCollector2.verifyReceivedMessage(0, { + serviceNodes2.messageCollector.verifyReceivedMessage(0, { expectedMessageText: "M2", expectedContentTopic: customContentTopic2, expectedPubsubTopic: autoshardingPubsubTopic2 }); + + // Clean up second fleet + await tearDownNodes(serviceNodes2.nodes, waku2); }); }); diff --git a/packages/tests/tests/sharding/auto_sharding.spec.ts b/packages/tests/tests/sharding/auto_sharding.spec.ts index e2cdca6d41..37410df49d 100644 --- a/packages/tests/tests/sharding/auto_sharding.spec.ts +++ b/packages/tests/tests/sharding/auto_sharding.spec.ts @@ -1,4 +1,4 @@ -import { LightNode, ProtocolError, Protocols } from "@waku/interfaces"; +import { LightNode, ProtocolError } from "@waku/interfaces"; import { createEncoder, createLightNode, utf8ToBytes } from "@waku/sdk"; import { contentTopicToPubsubTopic, @@ -8,10 +8,8 @@ import { expect } from "chai"; import { afterEachCustom, - beforeEachCustom, - makeLogFileName, - MessageCollector, - ServiceNode, + runMultipleNodes, + ServiceNodesFleet, tearDownNodes } from "../../src/index.js"; @@ -22,41 +20,33 @@ describe("Autosharding: Running Nodes", function () { this.timeout(50000); const clusterId = 10; let waku: LightNode; - let nwaku: ServiceNode; - let messageCollector: MessageCollector; - - beforeEachCustom(this, async () => { - nwaku = new ServiceNode(makeLogFileName(this.ctx)); - messageCollector = new MessageCollector(nwaku); - }); + let serviceNodes: ServiceNodesFleet; afterEachCustom(this, async () => { - await tearDownNodes(nwaku, waku); + await tearDownNodes(serviceNodes.nodes, waku); }); describe("Different clusters and topics", function () { - // js-waku allows autosharding for cluster IDs different than 1 it("Cluster ID 0 - Default/Global Cluster", async function () { const clusterId = 0; const pubsubTopics = [contentTopicToPubsubTopic(ContentTopic, clusterId)]; - await nwaku.start({ - store: true, - lightpush: true, - relay: true, - clusterId: clusterId, - pubsubTopic: pubsubTopics - }); - await nwaku.ensureSubscriptions(pubsubTopics); - - waku = await createLightNode({ - networkConfig: { - clusterId: clusterId, + [serviceNodes, waku] = await runMultipleNodes( + this.ctx, + { + clusterId, contentTopics: [ContentTopic] - } - }); - await waku.dial(await nwaku.getMultiaddrWithId()); - await waku.waitForPeers([Protocols.LightPush]); + }, + { + store: true, + lightpush: true, + relay: true, + pubsubTopic: pubsubTopics + }, + false, + 2, + true + ); const encoder = createEncoder({ contentTopic: ContentTopic, @@ -70,9 +60,9 @@ describe("Autosharding: Running Nodes", function () { payload: utf8ToBytes("Hello World") }); - expect(request.successes.length).to.eq(1); + expect(request.successes.length).to.eq(2); // Expect 2 successes for 2 nodes expect( - await messageCollector.waitForMessagesAutosharding(1, { + await serviceNodes.messageCollector.waitForMessagesAutosharding(1, { contentTopic: ContentTopic }) ).to.eq(true); @@ -81,24 +71,23 @@ describe("Autosharding: Running Nodes", function () { it("Non TWN Cluster", async function () { const clusterId = 5; const pubsubTopics = [contentTopicToPubsubTopic(ContentTopic, clusterId)]; - await nwaku.start({ - store: true, - lightpush: true, - relay: true, - clusterId: clusterId, - pubsubTopic: pubsubTopics - }); - await nwaku.ensureSubscriptions(pubsubTopics); - - waku = await createLightNode({ - networkConfig: { - clusterId: clusterId, + [serviceNodes, waku] = await runMultipleNodes( + this.ctx, + { + clusterId, contentTopics: [ContentTopic] - } - }); - await waku.dial(await nwaku.getMultiaddrWithId()); - await waku.waitForPeers([Protocols.LightPush]); + }, + { + store: true, + lightpush: true, + relay: true, + pubsubTopic: pubsubTopics + }, + false, + 2, + true + ); const encoder = createEncoder({ contentTopic: ContentTopic, @@ -112,9 +101,9 @@ describe("Autosharding: Running Nodes", function () { payload: utf8ToBytes("Hello World") }); - expect(request.successes.length).to.eq(1); + expect(request.successes.length).to.eq(2); // Expect 2 successes for 2 nodes expect( - await messageCollector.waitForMessagesAutosharding(1, { + await serviceNodes.messageCollector.waitForMessagesAutosharding(1, { contentTopic: ContentTopic }) ).to.eq(true); @@ -138,24 +127,23 @@ describe("Autosharding: Running Nodes", function () { contentTopicToPubsubTopic(ContentTopic, clusterId) ]; - await nwaku.start({ - store: true, - lightpush: true, - relay: true, - clusterId: clusterId, - pubsubTopic: pubsubTopics, - contentTopic: [ContentTopic] - }); - - waku = await createLightNode({ - networkConfig: { - clusterId: clusterId, + [serviceNodes, waku] = await runMultipleNodes( + this.ctx, + { + clusterId, contentTopics: [ContentTopic] - } - }); - - await waku.dial(await nwaku.getMultiaddrWithId()); - await waku.waitForPeers([Protocols.LightPush]); + }, + { + store: true, + lightpush: true, + relay: true, + pubsubTopic: pubsubTopics, + contentTopic: [ContentTopic] + }, + false, + 2, + true + ); const encoder = createEncoder({ contentTopic: ContentTopic, @@ -169,9 +157,9 @@ describe("Autosharding: Running Nodes", function () { payload: utf8ToBytes("Hello World") }); - expect(request.successes.length).to.eq(1); + expect(request.successes.length).to.eq(2); // Expect 2 successes for 2 nodes expect( - await messageCollector.waitForMessagesAutosharding(1, { + await serviceNodes.messageCollector.waitForMessagesAutosharding(1, { contentTopic: ContentTopic }) ).to.eq(true); @@ -201,24 +189,23 @@ describe("Autosharding: Running Nodes", function () { contentTopicToPubsubTopic(ContentTopic2, clusterId) ]; - await nwaku.start({ - store: true, - lightpush: true, - relay: true, - clusterId: clusterId, - pubsubTopic: pubsubTopics, - contentTopic: [ContentTopic, ContentTopic2] - }); - - waku = await createLightNode({ - networkConfig: { - clusterId: clusterId, - // For autosharding, we configure multiple pubsub topics by using two content topics that hash to different shards + [serviceNodes, waku] = await runMultipleNodes( + this.ctx, + { + clusterId, contentTopics: [ContentTopic, ContentTopic2] - } - }); - await waku.dial(await nwaku.getMultiaddrWithId()); - await waku.waitForPeers([Protocols.LightPush]); + }, + { + store: true, + lightpush: true, + relay: true, + pubsubTopic: pubsubTopics, + contentTopic: [ContentTopic, ContentTopic2] + }, + false, + 2, + true + ); const encoder1 = createEncoder({ contentTopic: ContentTopic, @@ -239,9 +226,9 @@ describe("Autosharding: Running Nodes", function () { const request1 = await waku.lightPush.send(encoder1, { payload: utf8ToBytes("Hello World") }); - expect(request1.successes.length).to.eq(1); + expect(request1.successes.length).to.eq(2); // Expect 2 successes for 2 nodes expect( - await messageCollector.waitForMessagesAutosharding(1, { + await serviceNodes.messageCollector.waitForMessagesAutosharding(1, { contentTopic: ContentTopic }) ).to.eq(true); @@ -249,9 +236,9 @@ describe("Autosharding: Running Nodes", function () { const request2 = await waku.lightPush.send(encoder2, { payload: utf8ToBytes("Hello World") }); - expect(request2.successes.length).to.eq(1); + expect(request2.successes.length).to.eq(2); // Expect 2 successes for 2 nodes expect( - await messageCollector.waitForMessagesAutosharding(1, { + await serviceNodes.messageCollector.waitForMessagesAutosharding(1, { contentTopic: ContentTopic }) ).to.eq(true); @@ -259,21 +246,24 @@ describe("Autosharding: Running Nodes", function () { it("using a protocol with unconfigured pubsub topic should fail", async function () { const pubsubTopics = [contentTopicToPubsubTopic(ContentTopic, clusterId)]; - await nwaku.start({ - store: true, - lightpush: true, - relay: true, - clusterId: clusterId, - pubsubTopic: pubsubTopics, - contentTopic: [ContentTopic] - }); - waku = await createLightNode({ - networkConfig: { - clusterId: clusterId, + [serviceNodes, waku] = await runMultipleNodes( + this.ctx, + { + clusterId, contentTopics: [ContentTopic] - } - }); + }, + { + store: true, + lightpush: true, + relay: true, + pubsubTopic: pubsubTopics, + contentTopic: [ContentTopic] + }, + false, + 2, + true + ); // use a content topic that is not configured const encoder = createEncoder({ @@ -288,17 +278,17 @@ describe("Autosharding: Running Nodes", function () { payload: utf8ToBytes("Hello World") }); - if (successes.length > 0 || failures?.length === 0) { + if (successes.length > 0 || !failures?.length) { throw new Error("The request should've thrown an error"); } - const errors = failures?.map((failure) => failure.error); + const errors = failures.map((failure) => failure.error); expect(errors).to.include(ProtocolError.TOPIC_NOT_CONFIGURED); }); it("start node with empty content topic", async function () { try { - waku = await createLightNode({ + await createLightNode({ networkConfig: { clusterId: clusterId, contentTopics: [] diff --git a/packages/tests/tests/sharding/static_sharding.spec.ts b/packages/tests/tests/sharding/static_sharding.spec.ts index f4cdccb4ee..61db888be5 100644 --- a/packages/tests/tests/sharding/static_sharding.spec.ts +++ b/packages/tests/tests/sharding/static_sharding.spec.ts @@ -1,7 +1,6 @@ import { LightNode, ProtocolError, - Protocols, ShardInfo, SingleShardInfo } from "@waku/interfaces"; @@ -15,28 +14,20 @@ import { expect } from "chai"; import { afterEachCustom, - beforeEachCustom, - makeLogFileName, - MessageCollector, - ServiceNode, + runMultipleNodes, + ServiceNodesFleet, tearDownNodes } from "../../src/index.js"; const ContentTopic = "/waku/2/content/test.js"; describe("Static Sharding: Running Nodes", function () { - this.timeout(15_000); + this.timeout(60_000); let waku: LightNode; - let nwaku: ServiceNode; - let messageCollector: MessageCollector; - - beforeEachCustom(this, async () => { - nwaku = new ServiceNode(makeLogFileName(this.ctx)); - messageCollector = new MessageCollector(nwaku); - }); + let serviceNodes: ServiceNodesFleet; afterEachCustom(this, async () => { - await tearDownNodes(nwaku, waku); + await tearDownNodes(serviceNodes.nodes, waku); }); describe("Different clusters and shards", function () { @@ -44,20 +35,20 @@ describe("Static Sharding: Running Nodes", function () { const singleShardInfo = { clusterId: 0, shard: 0 }; const shardInfo = singleShardInfosToShardInfo([singleShardInfo]); - await nwaku.start({ - store: true, - lightpush: true, - relay: true, - pubsubTopic: shardInfoToPubsubTopics(shardInfo) - }); - - await nwaku.ensureSubscriptions(shardInfoToPubsubTopics(shardInfo)); - - waku = await createLightNode({ - networkConfig: shardInfo - }); - await waku.dial(await nwaku.getMultiaddrWithId()); - await waku.waitForPeers([Protocols.LightPush]); + [serviceNodes, waku] = await runMultipleNodes( + this.ctx, + shardInfo, + { + store: true, + lightpush: true, + relay: true, + pubsubTopic: shardInfoToPubsubTopics(shardInfo), + clusterId: singleShardInfo.clusterId + }, + false, + 2, + true + ); const encoder = createEncoder({ contentTopic: ContentTopic, @@ -71,33 +62,32 @@ describe("Static Sharding: Running Nodes", function () { payload: utf8ToBytes("Hello World") }); - expect(request.successes.length).to.eq(1); + expect(request.successes.length).to.eq(2); expect( - await messageCollector.waitForMessages(1, { + await serviceNodes.messageCollector.waitForMessages(1, { pubsubTopic: shardInfoToPubsubTopics(shardInfo)[0] }) ).to.eq(true); }); - // dedicated test for Default Cluster ID 0 it("Cluster ID 0 - Default/Global Cluster", async function () { const singleShardInfo = { clusterId: 0, shard: 1 }; const shardInfo = singleShardInfosToShardInfo([singleShardInfo]); - await nwaku.start({ - store: true, - lightpush: true, - relay: true, - pubsubTopic: shardInfoToPubsubTopics(shardInfo) - }); - - await nwaku.ensureSubscriptions(shardInfoToPubsubTopics(shardInfo)); - - waku = await createLightNode({ - networkConfig: shardInfo - }); - await waku.dial(await nwaku.getMultiaddrWithId()); - await waku.waitForPeers([Protocols.LightPush]); + [serviceNodes, waku] = await runMultipleNodes( + this.ctx, + shardInfo, + { + store: true, + lightpush: true, + relay: true, + pubsubTopic: shardInfoToPubsubTopics(shardInfo), + clusterId: singleShardInfo.clusterId + }, + false, + 2, + true + ); const encoder = createEncoder({ contentTopic: ContentTopic, @@ -108,9 +98,9 @@ describe("Static Sharding: Running Nodes", function () { payload: utf8ToBytes("Hello World") }); - expect(request.successes.length).to.eq(1); + expect(request.successes.length).to.eq(2); expect( - await messageCollector.waitForMessages(1, { + await serviceNodes.messageCollector.waitForMessages(1, { pubsubTopic: shardInfoToPubsubTopics(shardInfo)[0] }) ).to.eq(true); @@ -120,33 +110,29 @@ describe("Static Sharding: Running Nodes", function () { for (let i = 0; i < numTest; i++) { // Random clusterId between 2 and 1000 const clusterId = Math.floor(Math.random() * 999) + 2; - // Random shardId between 1 and 1000 const shardId = Math.floor(Math.random() * 1000) + 1; it(`random static sharding ${ i + 1 } - Cluster ID: ${clusterId}, Shard ID: ${shardId}`, async function () { - afterEach(async () => { - await tearDownNodes(nwaku, waku); - }); - const singleShardInfo = { clusterId: clusterId, shard: shardId }; const shardInfo = singleShardInfosToShardInfo([singleShardInfo]); - await nwaku.start({ - store: true, - lightpush: true, - relay: true, - clusterId: clusterId, - pubsubTopic: shardInfoToPubsubTopics(shardInfo) - }); - - waku = await createLightNode({ - networkConfig: shardInfo - }); - await waku.dial(await nwaku.getMultiaddrWithId()); - await waku.waitForPeers([Protocols.LightPush]); + [serviceNodes, waku] = await runMultipleNodes( + this.ctx, + shardInfo, + { + store: true, + lightpush: true, + relay: true, + clusterId: clusterId, + pubsubTopic: shardInfoToPubsubTopics(shardInfo) + }, + false, + 2, + true + ); const encoder = createEncoder({ contentTopic: ContentTopic, @@ -157,9 +143,9 @@ describe("Static Sharding: Running Nodes", function () { payload: utf8ToBytes("Hello World") }); - expect(request.successes.length).to.eq(1); + expect(request.successes.length).to.eq(2); expect( - await messageCollector.waitForMessages(1, { + await serviceNodes.messageCollector.waitForMessages(1, { pubsubTopic: shardInfoToPubsubTopics(shardInfo)[0] }) ).to.eq(true); @@ -169,12 +155,14 @@ describe("Static Sharding: Running Nodes", function () { describe("Others", function () { const clusterId = 2; - let shardInfo: ShardInfo; - const shardInfoFirstShard: ShardInfo = { clusterId: clusterId, shards: [2] }; + const shardInfoSecondShard: ShardInfo = { + clusterId: clusterId, + shards: [3] + }; const shardInfoBothShards: ShardInfo = { clusterId: clusterId, shards: [2, 3] @@ -188,31 +176,21 @@ describe("Static Sharding: Running Nodes", function () { shard: 3 }; - beforeEachCustom(this, async () => { - shardInfo = { - clusterId: clusterId, - shards: [2] - }; - - await nwaku.start({ - store: true, - lightpush: true, - relay: true, - clusterId: clusterId, - pubsubTopic: shardInfoToPubsubTopics(shardInfo) - }); - }); - - afterEachCustom(this, async () => { - await tearDownNodes(nwaku, waku); - }); - it("configure the node with multiple pubsub topics", async function () { - waku = await createLightNode({ - networkConfig: shardInfoBothShards - }); - await waku.dial(await nwaku.getMultiaddrWithId()); - await waku.waitForPeers([Protocols.LightPush]); + [serviceNodes, waku] = await runMultipleNodes( + this.ctx, + shardInfoBothShards, + { + store: true, + lightpush: true, + relay: true, + clusterId: clusterId, + pubsubTopic: shardInfoToPubsubTopics(shardInfoBothShards) + }, + false, + 2, + true + ); const encoder1 = createEncoder({ contentTopic: ContentTopic, @@ -227,29 +205,40 @@ describe("Static Sharding: Running Nodes", function () { const request1 = await waku.lightPush.send(encoder1, { payload: utf8ToBytes("Hello World2") }); - expect(request1.successes.length).to.eq(1); + expect(request1.successes.length).to.eq(2); expect( - await messageCollector.waitForMessages(1, { - pubsubTopic: shardInfoToPubsubTopics(shardInfo)[0] + await serviceNodes.messageCollector.waitForMessages(1, { + pubsubTopic: shardInfoToPubsubTopics(shardInfoFirstShard)[0] }) ).to.eq(true); const request2 = await waku.lightPush.send(encoder2, { payload: utf8ToBytes("Hello World3") }); - expect(request2.successes.length).to.eq(1); + expect(request2.successes.length).to.eq(2); expect( - await messageCollector.waitForMessages(1, { - pubsubTopic: shardInfoToPubsubTopics(shardInfo)[0] + await serviceNodes.messageCollector.waitForMessages(1, { + pubsubTopic: shardInfoToPubsubTopics(shardInfoSecondShard)[0] }) ).to.eq(true); }); it("using a protocol with unconfigured pubsub topic should fail", async function () { this.timeout(15_000); - waku = await createLightNode({ - networkConfig: shardInfoFirstShard - }); + [serviceNodes, waku] = await runMultipleNodes( + this.ctx, + shardInfoFirstShard, + { + store: true, + lightpush: true, + relay: true, + clusterId: clusterId, + pubsubTopic: shardInfoToPubsubTopics(shardInfoFirstShard) + }, + false, + 2, + true + ); // use a pubsub topic that is not configured const encoder = createEncoder({ @@ -261,17 +250,17 @@ describe("Static Sharding: Running Nodes", function () { payload: utf8ToBytes("Hello World") }); - if (successes.length > 0 || failures?.length === 0) { + if (successes.length > 0 || !failures?.length) { throw new Error("The request should've thrown an error"); } - const errors = failures?.map((failure) => failure.error); + const errors = failures.map((failure) => failure.error); expect(errors).to.include(ProtocolError.TOPIC_NOT_CONFIGURED); }); it("start node with empty shard should fail", async function () { try { - waku = await createLightNode({ + await createLightNode({ networkConfig: { clusterId: clusterId, shards: [] } }); throw new Error( diff --git a/packages/tests/tests/store/index.node.spec.ts b/packages/tests/tests/store/index.node.spec.ts index d9a055fb3e..c95acffc86 100644 --- a/packages/tests/tests/store/index.node.spec.ts +++ b/packages/tests/tests/store/index.node.spec.ts @@ -304,13 +304,10 @@ describe("Waku Store, general", function () { for await (const msg of query) { if (msg) { messages.push(msg as DecodedMessage); - console.log(bytesToUtf8(msg.payload!)); } } } - console.log(messages.length); - // Messages are ordered from oldest to latest within a page (1 page query) expect(bytesToUtf8(messages[0].payload!)).to.eq(asymText); expect(bytesToUtf8(messages[1].payload!)).to.eq(symText); diff --git a/packages/tests/tests/store/multiple_pubsub.spec.ts b/packages/tests/tests/store/multiple_pubsub.spec.ts index 99878acb96..fb4303a25e 100644 --- a/packages/tests/tests/store/multiple_pubsub.spec.ts +++ b/packages/tests/tests/store/multiple_pubsub.spec.ts @@ -1,26 +1,24 @@ import { createDecoder } from "@waku/core"; -import type { ContentTopicInfo, IMessage, LightNode } from "@waku/interfaces"; -import { createLightNode, Protocols } from "@waku/sdk"; -import { - contentTopicToPubsubTopic, - pubsubTopicToSingleShardInfo -} from "@waku/utils"; +import type { AutoSharding, IMessage, LightNode } from "@waku/interfaces"; +import { Protocols } from "@waku/sdk"; +import { contentTopicToPubsubTopic } from "@waku/utils"; import { expect } from "chai"; import { afterEachCustom, beforeEachCustom, makeLogFileName, - NOISE_KEY_1, + runMultipleNodes, ServiceNode, - tearDownNodes + ServiceNodesFleet, + tearDownNodes, + teardownNodesWithRedundancy } from "../../src/index.js"; import { processQueriedMessages, runStoreNodes, sendMessages, - sendMessagesAutosharding, TestDecoder, TestDecoder2, TestShardInfo, @@ -156,8 +154,7 @@ describe("Waku Store, custom pubsub topic", function () { describe("Waku Store (Autosharding), custom pubsub topic", function () { this.timeout(15000); let waku: LightNode; - let nwaku: ServiceNode; - let nwaku2: ServiceNode; + let serviceNodes: ServiceNodesFleet; const customContentTopic1 = "/waku/2/content/utf8"; const customContentTopic2 = "/myapp/1/latest/proto"; @@ -170,29 +167,35 @@ describe("Waku Store (Autosharding), custom pubsub topic", function () { customContentTopic2, clusterId ); - const customDecoder1 = createDecoder( - customContentTopic1, - pubsubTopicToSingleShardInfo(autoshardingPubsubTopic1) - ); - const customDecoder2 = createDecoder( - customContentTopic2, - pubsubTopicToSingleShardInfo(autoshardingPubsubTopic2) - ); - const contentTopicInfoBothShards: ContentTopicInfo = { + const customDecoder1 = createDecoder(customContentTopic1, { clusterId: 5 }); + const customDecoder2 = createDecoder(customContentTopic2, { clusterId: 5 }); + const contentTopicInfoBothShards: AutoSharding = { clusterId, contentTopics: [customContentTopic1, customContentTopic2] }; beforeEachCustom(this, async () => { - [nwaku, waku] = await runStoreNodes(this.ctx, contentTopicInfoBothShards); + [serviceNodes, waku] = await runMultipleNodes( + this.ctx, + contentTopicInfoBothShards, + { store: true } + ); }); afterEachCustom(this, async () => { - await tearDownNodes([nwaku, nwaku2], waku); + await teardownNodesWithRedundancy(serviceNodes, waku); }); it("Generator, custom pubsub topic", async function () { - await sendMessagesAutosharding(nwaku, totalMsgs, customContentTopic1); + for (let i = 0; i < totalMsgs; i++) { + await serviceNodes.sendRelayMessage( + ServiceNode.toMessageRpcQuery({ + payload: new Uint8Array([0]), + contentTopic: customContentTopic1 + }), + autoshardingPubsubTopic1 + ); + } const messages = await processQueriedMessages( waku, @@ -211,8 +214,22 @@ describe("Waku Store (Autosharding), custom pubsub topic", function () { this.timeout(10000); const totalMsgs = 10; - await sendMessagesAutosharding(nwaku, totalMsgs, customContentTopic1); - await sendMessagesAutosharding(nwaku, totalMsgs, customContentTopic2); + for (let i = 0; i < totalMsgs; i++) { + await serviceNodes.sendRelayMessage( + ServiceNode.toMessageRpcQuery({ + payload: new Uint8Array([i]), + contentTopic: customContentTopic1 + }), + autoshardingPubsubTopic1 + ); + await serviceNodes.sendRelayMessage( + ServiceNode.toMessageRpcQuery({ + payload: new Uint8Array([i]), + contentTopic: customContentTopic2 + }), + autoshardingPubsubTopic2 + ); + } const customMessages = await processQueriedMessages( waku, @@ -236,54 +253,6 @@ describe("Waku Store (Autosharding), custom pubsub topic", function () { }); expect(result2).to.not.eq(-1); }); - - it("Generator, 2 nwaku nodes each with different pubsubtopics", async function () { - this.timeout(10000); - - // Set up and start a new nwaku node with Default Pubsubtopic - nwaku2 = new ServiceNode(makeLogFileName(this) + "2"); - await nwaku2.start({ - store: true, - pubsubTopic: [autoshardingPubsubTopic2], - contentTopic: [customContentTopic2], - relay: true, - clusterId - }); - await nwaku2.ensureSubscriptionsAutosharding([customContentTopic2]); - - const totalMsgs = 10; - await sendMessagesAutosharding(nwaku, totalMsgs, customContentTopic1); - await sendMessagesAutosharding(nwaku2, totalMsgs, customContentTopic2); - - waku = await createLightNode({ - staticNoiseKey: NOISE_KEY_1, - networkConfig: contentTopicInfoBothShards - }); - await waku.start(); - - await waku.dial(await nwaku.getMultiaddrWithId()); - await waku.dial(await nwaku2.getMultiaddrWithId()); - await waku.waitForPeers([Protocols.Store]); - - let customMessages: IMessage[] = []; - let testMessages: IMessage[] = []; - - while ( - customMessages.length != totalMsgs || - testMessages.length != totalMsgs - ) { - customMessages = await processQueriedMessages( - waku, - [customDecoder1], - autoshardingPubsubTopic1 - ); - testMessages = await processQueriedMessages( - waku, - [customDecoder2], - autoshardingPubsubTopic2 - ); - } - }); }); describe("Waku Store (named sharding), custom pubsub topic", function () {