diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 947d07f6f3..22f6207a45 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -122,14 +122,14 @@ jobs: uses: ./.github/workflows/test-node.yml secrets: inherit with: - nim_wakunode_image: ${{ inputs.nim_wakunode_image || 'wakuorg/nwaku:v0.34.0' }} + nim_wakunode_image: ${{ inputs.nim_wakunode_image || 'wakuorg/nwaku:v0.31.0' }} test_type: node allure_reports: true node_optional: uses: ./.github/workflows/test-node.yml with: - nim_wakunode_image: ${{ inputs.nim_wakunode_image || 'wakuorg/nwaku:v0.34.0' }} + nim_wakunode_image: ${{ inputs.nim_wakunode_image || 'wakuorg/nwaku:v0.31.0' }} test_type: node-optional node_with_nwaku_master: diff --git a/.github/workflows/test-node.yml b/.github/workflows/test-node.yml index 902a7c7292..980bfce964 100644 --- a/.github/workflows/test-node.yml +++ b/.github/workflows/test-node.yml @@ -33,7 +33,6 @@ env: jobs: node: runs-on: ubuntu-latest - timeout-minutes: 60 # Add a 1-hour timeout to fail faster env: WAKUNODE_IMAGE: ${{ inputs.nim_wakunode_image }} ALLURE_REPORTS: ${{ inputs.allure_reports }} diff --git a/packages/tests/.mocharc.cjs b/packages/tests/.mocharc.cjs index 357ba32890..f2ea3f66ce 100644 --- a/packages/tests/.mocharc.cjs +++ b/packages/tests/.mocharc.cjs @@ -7,8 +7,7 @@ const config = { 'loader=ts-node/esm' ], exit: true, - retries: 2, - timeout: 150_000 + retries: 4 }; if (process.env.CI) { diff --git a/packages/tests/src/lib/dockerode.ts b/packages/tests/src/lib/dockerode.ts index 5aa8e2db22..823a02859b 100644 --- a/packages/tests/src/lib/dockerode.ts +++ b/packages/tests/src/lib/dockerode.ts @@ -18,7 +18,7 @@ export default class Dockerode { public containerId?: string; private static network: Docker.Network; - public readonly containerIp: string; + private containerIp: string; private constructor(imageName: string, containerIp: string) { this.docker = new Docker(); @@ -107,7 +107,6 @@ export default class Dockerode { const container = await this.docker.createContainer({ Image: this.IMAGE_NAME, HostConfig: { - NetworkMode: NETWORK_NAME, AutoRemove: true, PortBindings: { [`${restPort}/tcp`]: [{ HostPort: restPort.toString() }], @@ -117,8 +116,6 @@ export default class Dockerode { [`${discv5UdpPort}/udp`]: [{ HostPort: discv5UdpPort.toString() }] }) }, - Dns: ["8.8.8.8"], - Links: [], Mounts: args.rlnRelayEthClientAddress ? [ { @@ -138,19 +135,18 @@ export default class Dockerode { [`${discv5UdpPort}/udp`]: {} }) }, - Cmd: argsArrayWithIP, - NetworkingConfig: { - EndpointsConfig: { - [NETWORK_NAME]: { - IPAMConfig: { - IPv4Address: this.containerIp - } - } - } - } + Cmd: argsArrayWithIP }); await container.start(); + await Dockerode.network.connect({ + Container: container.id, + EndpointConfig: { + IPAMConfig: { + IPv4Address: this.containerIp + } + } + }); const logStream = fs.createWriteStream(logPath); container.logs( diff --git a/packages/tests/src/lib/index.ts b/packages/tests/src/lib/index.ts index 8562a2e989..8f804633a3 100644 --- a/packages/tests/src/lib/index.ts +++ b/packages/tests/src/lib/index.ts @@ -29,29 +29,24 @@ export class ServiceNodesFleet { _args?: Args, withoutFilter = false ): Promise { - const nodes: ServiceNode[] = []; + const serviceNodePromises = Array.from( + { length: nodesToCreate }, + async () => { + const node = new ServiceNode( + makeLogFileName(mochaContext) + + Math.random().toString(36).substring(7) + ); - for (let i = 0; i < nodesToCreate; i++) { - const node = new ServiceNode( - makeLogFileName(mochaContext) + Math.random().toString(36).substring(7) - ); + const args = getArgs(networkConfig, _args); + await node.start(args, { + retries: 3 + }); - const args = getArgs(networkConfig, _args); - - // If this is not the first node and previous node had a nodekey, use its multiaddr as static node - if (i > 0) { - const prevNode = nodes[i - 1]; - const multiaddr = await prevNode.getExternalWebsocketMultiaddr(); - args.staticnode = multiaddr; + return node; } + ); - await node.start(args, { - retries: 3 - }); - - nodes.push(node); - } - + const nodes = await Promise.all(serviceNodePromises); return new ServiceNodesFleet(nodes, withoutFilter, strictChecking); } @@ -112,24 +107,7 @@ export class ServiceNodesFleet { return relayMessages.every((message) => message); } - public async confirmMessageLength( - numMessages: number, - { encryptedPayload }: { encryptedPayload?: boolean } = { - encryptedPayload: false - } - ): Promise { - if (encryptedPayload) { - const filteredMessageList = Array.from( - new Set( - this.messageCollector.messageList - .filter((msg) => msg.payload?.toString) - .map((msg) => msg.payload.toString()) - ) - ); - expect(filteredMessageList.length).to.equal(numMessages); - return; - } - + public async confirmMessageLength(numMessages: number): Promise { if (this.strictChecking) { await Promise.all( this.nodes.map(async (node) => @@ -154,7 +132,7 @@ export class ServiceNodesFleet { class MultipleNodesMessageCollector { public callback: (msg: DecodedMessage) => void = () => {}; - public readonly messageList: Array = []; + protected messageList: Array = []; public constructor( private messageCollectors: MessageCollector[], private relayNodes?: ServiceNode[], @@ -204,21 +182,21 @@ class MultipleNodesMessageCollector { } ): boolean { if (this.strictChecking) { - return this.messageCollectors.every((collector, _i) => { + return this.messageCollectors.every((collector) => { try { collector.verifyReceivedMessage(index, options); - return true; + return true; // Verification successful } catch (error) { - return false; + return false; // Verification failed, continue with the next collector } }); } else { - return this.messageCollectors.some((collector, _i) => { + return this.messageCollectors.some((collector) => { try { collector.verifyReceivedMessage(index, options); - return true; + return true; // Verification successful } catch (error) { - return false; + return false; // Verification failed, continue with the next collector } }); } @@ -234,123 +212,38 @@ class MultipleNodesMessageCollector { timeoutDuration?: number; exact?: boolean; } - ): Promise { - const pubsubTopic = options?.pubsubTopic || DefaultTestPubsubTopic; - const timeoutDuration = options?.timeoutDuration || 400; - const maxTimeout = Math.min(timeoutDuration * numMessages, 30000); - const exact = options?.exact || false; - - try { - const timeoutPromise = new Promise((resolve) => { - setTimeout(() => { - log.warn(`Timeout waiting for messages after ${maxTimeout}ms`); - resolve(false); - }, maxTimeout); - }); - - const checkMessagesPromise = new Promise((resolve) => { - const checkMessages = (): void => { - // Check local messages - if (this.messageList.length >= numMessages) { - if (exact && this.messageList.length !== numMessages) { - log.warn( - `Was expecting exactly ${numMessages} messages. Received: ${this.messageList.length}` - ); - resolve(false); - return; - } - resolve(true); - return; - } - - if (this.relayNodes) { - void Promise.all( - this.relayNodes.map((node) => node.messages(pubsubTopic)) - ).then((nodeMessages) => { - const hasEnoughMessages = this.strictChecking - ? nodeMessages.every((msgs) => msgs.length >= numMessages) - : nodeMessages.some((msgs) => msgs.length >= numMessages); - - if (hasEnoughMessages) { - resolve(true); - return; - } - setTimeout(checkMessages, 100); - }); - } else { - setTimeout(checkMessages, 100); - } - }; - - // Start checking - checkMessages(); - }); - - // Race between timeout and message checking - return Promise.race([timeoutPromise, checkMessagesPromise]); - } catch (error) { - log.error("Error in waitForMessages:", error); - return false; - } - } - - /** - * Waits for a total number of messages across all nodes using autosharding. - */ - public async waitForMessagesAutosharding( - numMessages: number, - options?: { - contentTopic: string; - timeoutDuration?: number; - exact?: boolean; - } ): Promise { const startTime = Date.now(); + const pubsubTopic = options?.pubsubTopic || DefaultTestPubsubTopic; const timeoutDuration = options?.timeoutDuration || 400; const exact = options?.exact || false; while (this.messageList.length < numMessages) { if (this.relayNodes) { if (this.strictChecking) { - // In strict mode, all nodes must have the messages const results = await Promise.all( - this.messageCollectors.map(async (collector) => { - return collector.waitForMessagesAutosharding( - numMessages, - options - ); + this.relayNodes.map(async (node) => { + const msgs = await node.messages(pubsubTopic); + return msgs.length >= numMessages; }) ); - if (results.every((result) => result)) { - return true; - } + return results.every((result) => result); } else { - // In non-strict mode, at least one node must have the messages const results = await Promise.all( - this.messageCollectors.map(async (collector) => { - return collector.waitForMessagesAutosharding( - numMessages, - options - ); + this.relayNodes.map(async (node) => { + const msgs = await node.messages(pubsubTopic); + return msgs.length >= numMessages; }) ); - if (results.some((result) => result)) { - return true; - } + return results.some((result) => result); } - - if (Date.now() - startTime > timeoutDuration * numMessages) { - return false; - } - - await delay(10); - } else { - // If no relay nodes, just wait for messages in the list - if (Date.now() - startTime > timeoutDuration * numMessages) { - return false; - } - await delay(10); } + + if (Date.now() - startTime > timeoutDuration * numMessages) { + return false; + } + + await delay(10); } if (exact) { @@ -360,6 +253,7 @@ class MultipleNodesMessageCollector { log.warn( `Was expecting exactly ${numMessages} messages. Received: ${this.messageList.length}` ); + return false; } } else { diff --git a/packages/tests/src/lib/service_node.ts b/packages/tests/src/lib/service_node.ts index 52ec598cbd..b443fd6cb2 100644 --- a/packages/tests/src/lib/service_node.ts +++ b/packages/tests/src/lib/service_node.ts @@ -27,7 +27,7 @@ const WAKU_SERVICE_NODE_PARAMS = const NODE_READY_LOG_LINE = "Node setup complete"; export const DOCKER_IMAGE_NAME = - process.env.WAKUNODE_IMAGE || "wakuorg/nwaku:v0.34.0"; + process.env.WAKUNODE_IMAGE || "wakuorg/nwaku:v0.31.0"; const isGoWaku = DOCKER_IMAGE_NAME.includes("go-waku"); @@ -402,15 +402,6 @@ export class ServiceNode { throw `${this.type} container hasn't started`; } } - - public async getExternalWebsocketMultiaddr(): Promise { - if (!this.docker?.container) { - return undefined; - } - const containerIp = this.docker.containerIp; - const peerId = await this.getPeerId(); - return `/ip4/${containerIp}/tcp/${this.websocketPort}/ws/p2p/${peerId}`; - } } export function defaultArgs(): Args { @@ -431,20 +422,3 @@ interface RpcInfoResponse { listenAddresses: string[]; enrUri?: string; } - -export async function verifyServiceNodesConnected( - nodes: ServiceNode[] -): Promise { - for (const node of nodes) { - const peers = await node.peers(); - log.info(`Service node ${node.containerName} peers:`, peers.length); - log.info(`Service node ${node.containerName} peers:`, peers); - - if (nodes.length > 1 && peers.length === 0) { - log.error(`Service node ${node.containerName} has no peers connected`); - return false; - } - } - - return true; -} diff --git a/packages/tests/src/run-tests.js b/packages/tests/src/run-tests.js index 60f3cf6c33..b468bdc00e 100644 --- a/packages/tests/src/run-tests.js +++ b/packages/tests/src/run-tests.js @@ -3,7 +3,7 @@ import { promisify } from "util"; const execAsync = promisify(exec); -const WAKUNODE_IMAGE = process.env.WAKUNODE_IMAGE || "wakuorg/nwaku:v0.34.0"; +const WAKUNODE_IMAGE = process.env.WAKUNODE_IMAGE || "wakuorg/nwaku:v0.31.0"; async function main() { try { @@ -40,15 +40,6 @@ async function main() { mocha.on("exit", (code) => { console.log(`Mocha tests exited with code ${code}`); - try { - execAsync( - `docker ps -q -f "ancestor=${WAKUNODE_IMAGE}" | xargs -r docker stop` - ).catch((error) => { - console.error("Error cleaning up containers:", error); - }); - } catch (error) { - console.error("Error cleaning up containers:", error); - } process.exit(code || 0); }); } diff --git a/packages/tests/src/sync-rln-tree.js b/packages/tests/src/sync-rln-tree.js index a84a40d741..d69c3d1899 100644 --- a/packages/tests/src/sync-rln-tree.js +++ b/packages/tests/src/sync-rln-tree.js @@ -7,7 +7,7 @@ import { ServiceNode } from "./lib/index.js"; const execAsync = promisify(exec); -const WAKUNODE_IMAGE = process.env.WAKUNODE_IMAGE || "wakuorg/nwaku:v0.34.0"; +const WAKUNODE_IMAGE = process.env.WAKUNODE_IMAGE || "wakuorg/nwaku:v0.31.0"; const containerName = "rln_tree"; async function syncRlnTree() { diff --git a/packages/tests/src/utils/nodes.ts b/packages/tests/src/utils/nodes.ts index b5f3512094..c3e8cc7a9b 100644 --- a/packages/tests/src/utils/nodes.ts +++ b/packages/tests/src/utils/nodes.ts @@ -7,13 +7,12 @@ import { Protocols } from "@waku/interfaces"; import { createLightNode } from "@waku/sdk"; -import { delay, derivePubsubTopicsFromNetworkConfig } from "@waku/utils"; +import { derivePubsubTopicsFromNetworkConfig } from "@waku/utils"; import { Context } from "mocha"; import pRetry from "p-retry"; import { NOISE_KEY_1 } from "../constants.js"; import { ServiceNodesFleet } from "../lib/index.js"; -import { verifyServiceNodesConnected } from "../lib/service_node.js"; import { Args } from "../types.js"; import { waitForConnections } from "./waitForConnections.js"; @@ -36,13 +35,6 @@ export async function runMultipleNodes( withoutFilter ); - if (numServiceNodes > 1) { - const success = await verifyServiceNodesConnected(serviceNodes.nodes); - if (!success) { - throw new Error("Failed to verify that service nodes are connected"); - } - } - const wakuOptions: CreateNodeOptions = { staticNoiseKey: NOISE_KEY_1, libp2p: { @@ -58,34 +50,24 @@ export async function runMultipleNodes( throw new Error("Failed to initialize waku"); } - //TODO: reinvestigate the need for these delays with nwaku:0.35.0: https://github.com/waku-org/js-waku/issues/2264 - await delay(2000); - for (const node of serviceNodes.nodes) { await waku.dial(await node.getMultiaddrWithId()); await waku.waitForPeers([Protocols.Filter, Protocols.LightPush]); - const success = await node.ensureSubscriptions( + await node.ensureSubscriptions( derivePubsubTopicsFromNetworkConfig(networkConfig) ); - if (!success) { - throw new Error("Failed to ensure subscriptions"); - } - //TODO: reinvestigate the need for these delays with nwaku:0.35.0: https://github.com/waku-org/js-waku/issues/2264 - await delay(2000); + const wakuConnections = waku.libp2p.getConnections(); + + if (wakuConnections.length < 1) { + throw new Error(`Expected at least 1 connection for js-waku.`); + } await node.waitForLog(waku.libp2p.peerId.toString(), 100); } await waitForConnections(numServiceNodes, waku); - const wakuConnections = waku.libp2p.getConnections(); - if (wakuConnections.length < numServiceNodes) { - throw new Error( - `Expected at least ${numServiceNodes} connections for js-waku.` - ); - } - return [serviceNodes, waku]; } diff --git a/packages/tests/src/utils/teardown.ts b/packages/tests/src/utils/teardown.ts index a4fc2573b2..86257ceb41 100644 --- a/packages/tests/src/utils/teardown.ts +++ b/packages/tests/src/utils/teardown.ts @@ -6,8 +6,6 @@ import { ServiceNode } from "../lib/service_node.js"; const log = new Logger("test:teardown"); -const TEARDOWN_TIMEOUT = 10000; // 10 seconds timeout for teardown - export async function tearDownNodes( nwakuNodes: ServiceNode | ServiceNode[], wakuNodes: IWaku | IWaku[] @@ -15,47 +13,37 @@ export async function tearDownNodes( const nNodes = Array.isArray(nwakuNodes) ? nwakuNodes : [nwakuNodes]; const wNodes = Array.isArray(wakuNodes) ? wakuNodes : [wakuNodes]; - try { - // Use Promise.race to implement timeout - const teardownPromise = Promise.all([ - ...nNodes.map(async (nwaku) => { - if (nwaku) { - await pRetry( - async () => { - try { - await nwaku.stop(); - } catch (error) { - log.error("Nwaku failed to stop:", error); - throw error; - } - }, - { retries: 3, minTimeout: 1000 } - ); - } - }), - ...wNodes.map(async (waku) => { - if (waku) { + const stopNwakuNodes = nNodes.map(async (nwaku) => { + if (nwaku) { + await pRetry( + async () => { + try { + await nwaku.stop(); + } catch (error) { + log.error("Nwaku failed to stop:", error); + throw error; + } + }, + { retries: 3 } + ); + } + }); + + const stopWakuNodes = wNodes.map(async (waku) => { + if (waku) { + await pRetry( + async () => { try { await waku.stop(); } catch (error) { log.error("Waku failed to stop:", error); + throw error; } - } - }) - ]); + }, + { retries: 3 } + ); + } + }); - await Promise.race([ - teardownPromise, - new Promise((_, reject) => - setTimeout( - () => reject(new Error("Teardown timeout")), - TEARDOWN_TIMEOUT - ) - ) - ]); - } catch (error) { - log.error("Teardown failed:", error); - // Force process cleanup if needed - process.exit(1); - } + await Promise.all([...stopNwakuNodes, ...stopWakuNodes]); } diff --git a/packages/tests/tests/dns-peer-discovery.spec.ts b/packages/tests/tests/dns-peer-discovery.spec.ts index e05d97fe8c..ad3ab013db 100644 --- a/packages/tests/tests/dns-peer-discovery.spec.ts +++ b/packages/tests/tests/dns-peer-discovery.spec.ts @@ -56,7 +56,7 @@ describe("DNS Node Discovery [live data]", function () { }); it(`should use DNS peer discovery with light client`, async function () { - this.timeout(100_000); + this.timeout(100000); const maxQuantity = 3; const nodeRequirements = { diff --git a/packages/tests/tests/ephemeral.node.spec.ts b/packages/tests/tests/ephemeral.node.spec.ts index 6219b081ad..2abed950d2 100644 --- a/packages/tests/tests/ephemeral.node.spec.ts +++ b/packages/tests/tests/ephemeral.node.spec.ts @@ -23,11 +23,11 @@ import { afterEachCustom, beforeEachCustom, delay, + makeLogFileName, NOISE_KEY_1, NOISE_KEY_2, - runMultipleNodes, - ServiceNodesFleet, - teardownNodesWithRedundancy + ServiceNode, + tearDownNodes } from "../src/index.js"; const log = new Logger("test:ephemeral"); @@ -76,39 +76,41 @@ const SymDecoder = createSymDecoder(SymContentTopic, symKey, PubsubTopic); describe("Waku Message Ephemeral field", function () { let waku: LightNode; - let serviceNodes: ServiceNodesFleet; + let nwaku: ServiceNode; afterEachCustom(this, async () => { - await teardownNodesWithRedundancy(serviceNodes, waku); + await tearDownNodes(nwaku, waku); }); beforeEachCustom(this, async () => { - [serviceNodes, waku] = await runMultipleNodes( - this.ctx, - { - clusterId: ClusterId, - contentTopics: [TestContentTopic, AsymContentTopic, SymContentTopic] - }, - { - filter: true, - lightpush: true, - store: true, - relay: true, - pubsubTopic: [PubsubTopic] - }, - true, - 2 - ); + nwaku = new ServiceNode(makeLogFileName(this.ctx)); + await nwaku.start({ + filter: true, + lightpush: true, + store: true, + relay: true, + pubsubTopic: [PubsubTopic], + contentTopic: [TestContentTopic, AsymContentTopic, SymContentTopic], + clusterId: ClusterId + }); + await nwaku.ensureSubscriptionsAutosharding([ + TestContentTopic, + AsymContentTopic, + SymContentTopic + ]); - await Promise.all( - serviceNodes.nodes.map(async (node) => { - await node.ensureSubscriptionsAutosharding([ - TestContentTopic, - AsymContentTopic, - SymContentTopic - ]); - }) - ); + waku = await createLightNode({ + staticNoiseKey: NOISE_KEY_1, + libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }, + networkConfig: { + contentTopics: [TestContentTopic, AsymContentTopic, SymContentTopic], + clusterId: ClusterId + } + }); + await waku.start(); + await waku.dial(await nwaku.getMultiaddrWithId()); + + await waku.waitForPeers([Protocols.Filter, Protocols.LightPush]); }); it("Ephemeral messages are not stored", async function () { @@ -128,7 +130,7 @@ describe("Waku Message Ephemeral field", function () { payload: utf8ToBytes(clearText) }; - const [waku1, waku2] = await Promise.all([ + const [waku1, waku2, nimWakuMultiaddr] = await Promise.all([ createLightNode({ staticNoiseKey: NOISE_KEY_1, networkConfig: { @@ -142,18 +144,16 @@ describe("Waku Message Ephemeral field", function () { contentTopics: [TestContentTopic, AsymContentTopic, SymContentTopic], clusterId: ClusterId } - }).then((waku) => waku.start().then(() => waku)) + }).then((waku) => waku.start().then(() => waku)), + nwaku.getMultiaddrWithId() ]); log.info("Waku nodes created"); - await Promise.all( - serviceNodes.nodes.map(async (node) => { - const multiaddr = await node.getMultiaddrWithId(); - await waku1.dial(multiaddr); - await waku2.dial(multiaddr); - }) - ); + await Promise.all([ + waku1.dial(nimWakuMultiaddr), + waku2.dial(nimWakuMultiaddr) + ]); log.info("Waku nodes connected to nwaku"); @@ -186,7 +186,7 @@ describe("Waku Message Ephemeral field", function () { expect(messages?.length).eq(0); - await teardownNodesWithRedundancy(serviceNodes, [waku1, waku2]); + await tearDownNodes([], [waku1, waku2]); }); it("Ephemeral field is preserved - encoder v0", async function () { diff --git a/packages/tests/tests/filter/subscribe.node.spec.ts b/packages/tests/tests/filter/subscribe.node.spec.ts index 8098fd6745..af281c039c 100644 --- a/packages/tests/tests/filter/subscribe.node.spec.ts +++ b/packages/tests/tests/filter/subscribe.node.spec.ts @@ -102,7 +102,8 @@ const runTests = (strictCheckNodes: boolean): void => { expectedVersion: 1, expectedPubsubTopic: TestPubsubTopic }); - await serviceNodes.confirmMessageLength(1, { encryptedPayload: true }); + + await serviceNodes.confirmMessageLength(1); }); it("Subscribe and receive symmetrically encrypted messages via lightPush", async function () { @@ -135,7 +136,7 @@ const runTests = (strictCheckNodes: boolean): void => { expectedPubsubTopic: TestPubsubTopic }); - await serviceNodes.confirmMessageLength(1, { encryptedPayload: true }); + await serviceNodes.confirmMessageLength(1); }); it("Subscribe and receive messages via waku relay post", async function () { diff --git a/packages/tests/tests/light-push/index.node.spec.ts b/packages/tests/tests/light-push/index.node.spec.ts index 060a283982..d9bc508bc5 100644 --- a/packages/tests/tests/light-push/index.node.spec.ts +++ b/packages/tests/tests/light-push/index.node.spec.ts @@ -50,7 +50,7 @@ const runTests = (strictNodeCheck: boolean): void => { const pushResponse = await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes(testItem.value) }); - expect(pushResponse.successes.length).to.equal(numServiceNodes); + expect(pushResponse.successes.length).to.eq(numServiceNodes); expect( await serviceNodes.messageCollector.waitForMessages(1, { @@ -65,24 +65,24 @@ const runTests = (strictNodeCheck: boolean): void => { }); }); - it("Push 5 different messages", async function () { + it("Push 30 different messages", async function () { const generateMessageText = (index: number): string => `M${index}`; - for (let i = 0; i < 5; i++) { + for (let i = 0; i < 30; i++) { const pushResponse = await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes(generateMessageText(i)) }); - if (pushResponse.failures.length > 0) console.log(i + 1); - expect(pushResponse.successes.length).to.equal(numServiceNodes); + + expect(pushResponse.successes.length).to.eq(numServiceNodes); } expect( - await serviceNodes.messageCollector.waitForMessages(5, { + await serviceNodes.messageCollector.waitForMessages(30, { pubsubTopic: TestPubsubTopic }) ).to.eq(true); - for (let i = 0; i < 5; i++) { + for (let i = 0; i < 30; i++) { serviceNodes.messageCollector.verifyReceivedMessage(i, { expectedMessageText: generateMessageText(i), expectedContentTopic: TestContentTopic, @@ -119,7 +119,7 @@ const runTests = (strictNodeCheck: boolean): void => { customEncoder, messagePayload ); - expect(pushResponse.successes.length).to.equal(numServiceNodes); + expect(pushResponse.successes.length).to.eq(numServiceNodes); expect( await serviceNodes.messageCollector.waitForMessages(1, { @@ -156,7 +156,7 @@ const runTests = (strictNodeCheck: boolean): void => { customTestEncoder, messagePayload ); - expect(pushResponse.successes.length).to.equal(numServiceNodes); + expect(pushResponse.successes.length).to.eq(numServiceNodes); expect( await serviceNodes.messageCollector.waitForMessages(1, { @@ -190,7 +190,7 @@ const runTests = (strictNodeCheck: boolean): void => { ); if (serviceNodes.type == "go-waku") { - expect(pushResponse.successes.length).to.equal(numServiceNodes); + expect(pushResponse.successes.length).to.eq(numServiceNodes); expect( await serviceNodes.messageCollector.waitForMessages(1, { pubsubTopic: TestPubsubTopic @@ -229,7 +229,7 @@ const runTests = (strictNodeCheck: boolean): void => { payload: utf8ToBytes(messageText), rateLimitProof: rateLimitProof }); - expect(pushResponse.successes.length).to.equal(numServiceNodes); + expect(pushResponse.successes.length).to.eq(numServiceNodes); expect( await serviceNodes.messageCollector.waitForMessages(1, { @@ -253,7 +253,7 @@ const runTests = (strictNodeCheck: boolean): void => { payload: utf8ToBytes(messageText), timestamp: new Date(testItem) }); - expect(pushResponse.successes.length).to.equal(numServiceNodes); + expect(pushResponse.successes.length).to.eq(numServiceNodes); expect( await serviceNodes.messageCollector.waitForMessages(1, { @@ -274,7 +274,7 @@ const runTests = (strictNodeCheck: boolean): void => { const pushResponse = await waku.lightPush.send(TestEncoder, { payload: bigPayload }); - expect(pushResponse.successes.length).to.equal(numServiceNodes); + expect(pushResponse.successes.length).to.greaterThan(0); }); it("Fails to push message bigger that 1MB", async function () { diff --git a/packages/tests/tests/light-push/single_node/index.node.spec.ts b/packages/tests/tests/light-push/single_node/index.node.spec.ts index af0c3232d6..a8b8835264 100644 --- a/packages/tests/tests/light-push/single_node/index.node.spec.ts +++ b/packages/tests/tests/light-push/single_node/index.node.spec.ts @@ -7,6 +7,7 @@ import { afterEachCustom, beforeEachCustom, generateRandomUint8Array, + MessageCollector, ServiceNode, tearDownNodes, TEST_STRING @@ -21,16 +22,16 @@ import { TestShardInfo } from "../utils.js"; -// These tests are expected to fail as service nodes now require at least one more connected node: https://github.com/waku-org/nwaku/pull/2951/files - -describe("Waku Light Push: Single Node: Fails as expected", function () { +describe("Waku Light Push: Single Node", function () { // Set the timeout for all tests in this suite. Can be overwritten at test level this.timeout(15000); let waku: LightNode; let nwaku: ServiceNode; + let messageCollector: MessageCollector; beforeEachCustom(this, async () => { [nwaku, waku] = await runNodes(this.ctx, TestShardInfo); + messageCollector = new MessageCollector(nwaku); await nwaku.ensureSubscriptions([TestPubsubTopic]); }); @@ -44,9 +45,18 @@ describe("Waku Light Push: Single Node: Fails as expected", function () { const pushResponse = await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes(testItem.value) }); - // Expect failure since node requires another connected node - expect(pushResponse.successes.length).to.eq(0); - expect(pushResponse.failures?.length).to.be.greaterThan(0); + expect(pushResponse.successes.length).to.eq(1); + + expect( + await messageCollector.waitForMessages(1, { + pubsubTopic: TestPubsubTopic + }) + ).to.eq(true); + messageCollector.verifyReceivedMessage(0, { + expectedMessageText: testItem.value, + expectedContentTopic: TestContentTopic, + expectedPubsubTopic: TestPubsubTopic + }); }); }); @@ -57,9 +67,73 @@ describe("Waku Light Push: Single Node: Fails as expected", function () { const pushResponse = await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes(generateMessageText(i)) }); - // Expect failure since node requires another connected node - expect(pushResponse.successes.length).to.eq(0); - expect(pushResponse.failures?.length).to.be.greaterThan(0); + expect(pushResponse.successes.length).to.eq(1); + } + + expect( + await messageCollector.waitForMessages(30, { + pubsubTopic: TestPubsubTopic + }) + ).to.eq(true); + + for (let i = 0; i < 30; i++) { + messageCollector.verifyReceivedMessage(i, { + expectedMessageText: generateMessageText(i), + expectedContentTopic: TestContentTopic, + expectedPubsubTopic: TestPubsubTopic + }); + } + }); + + it("Throws when trying to push message with empty payload", async function () { + const pushResponse = await waku.lightPush.send(TestEncoder, { + payload: new Uint8Array() + }); + + expect(pushResponse.successes.length).to.eq(0); + expect(pushResponse.failures?.map((failure) => failure.error)).to.include( + ProtocolError.EMPTY_PAYLOAD + ); + expect( + await messageCollector.waitForMessages(1, { + pubsubTopic: TestPubsubTopic + }) + ).to.eq(false); + }); + + TEST_STRING.forEach((testItem) => { + it(`Push message with content topic containing ${testItem.description}`, async function () { + const customEncoder = createEncoder({ + contentTopic: testItem.value, + pubsubTopic: TestPubsubTopic + }); + const pushResponse = await waku.lightPush.send( + customEncoder, + messagePayload + ); + expect(pushResponse.successes.length).to.eq(1); + + expect( + await messageCollector.waitForMessages(1, { + pubsubTopic: TestPubsubTopic + }) + ).to.eq(true); + messageCollector.verifyReceivedMessage(0, { + expectedMessageText: messageText, + expectedContentTopic: testItem.value, + expectedPubsubTopic: TestPubsubTopic + }); + }); + }); + + it("Fails to push message with empty content topic", async function () { + try { + createEncoder({ contentTopic: "" }); + expect.fail("Expected an error but didn't get one"); + } catch (error) { + expect((error as Error).message).to.equal( + "Content topic must be specified" + ); } }); @@ -74,19 +148,62 @@ describe("Waku Light Push: Single Node: Fails as expected", function () { customTestEncoder, messagePayload ); - expect(pushResponse.successes.length).to.eq(0); - expect(pushResponse.failures?.length).to.be.greaterThan(0); + expect(pushResponse.successes.length).to.eq(1); + + expect( + await messageCollector.waitForMessages(1, { + pubsubTopic: TestPubsubTopic + }) + ).to.eq(true); + messageCollector.verifyReceivedMessage(0, { + expectedMessageText: messageText, + expectedContentTopic: TestContentTopic, + expectedPubsubTopic: TestPubsubTopic + }); }); - it("Fails to push message with empty payload", async function () { - const pushResponse = await waku.lightPush.send(TestEncoder, { - payload: new Uint8Array() + it("Fails to push message with large meta", async function () { + const customTestEncoder = createEncoder({ + contentTopic: TestContentTopic, + pubsubTopic: TestPubsubTopic, + metaSetter: () => new Uint8Array(105024) // see the note below *** }); - expect(pushResponse.successes.length).to.eq(0); - expect(pushResponse.failures?.map((failure) => failure.error)).to.include( - ProtocolError.EMPTY_PAYLOAD + // *** note: this test used 10 ** 6 when `nwaku` node had MaxWakuMessageSize == 1MiB ( 1*2^20 .) + // `nwaku` establishes the max lightpush msg size as `const MaxRpcSize* = MaxWakuMessageSize + 64 * 1024` + // see: https://github.com/waku-org/nwaku/blob/07beea02095035f4f4c234ec2dec1f365e6955b8/waku/waku_lightpush/rpc_codec.nim#L15 + // In the PR https://github.com/waku-org/nwaku/pull/2298 we reduced the MaxWakuMessageSize + // from 1MiB to 150KiB. Therefore, the 105024 number comes from substracting ( 1*2^20 - 150*2^10 ) + // to the original 10^6 that this test had when MaxWakuMessageSize == 1*2^20 + + const pushResponse = await waku.lightPush.send( + customTestEncoder, + messagePayload ); + + if (nwaku.type == "go-waku") { + expect(pushResponse.successes.length).to.eq(1); + expect( + await messageCollector.waitForMessages(1, { + pubsubTopic: TestPubsubTopic + }) + ).to.eq(true); + messageCollector.verifyReceivedMessage(0, { + expectedMessageText: messageText, + expectedContentTopic: TestContentTopic, + expectedPubsubTopic: TestPubsubTopic + }); + } else { + expect(pushResponse.successes.length).to.eq(0); + expect(pushResponse.failures?.map((failure) => failure.error)).to.include( + ProtocolError.REMOTE_PEER_REJECTED + ); + expect( + await messageCollector.waitForMessages(1, { + pubsubTopic: TestPubsubTopic + }) + ).to.eq(false); + } }); it("Push message with rate limit", async function () { @@ -104,8 +221,18 @@ describe("Waku Light Push: Single Node: Fails as expected", function () { payload: utf8ToBytes(messageText), rateLimitProof: rateLimitProof }); - expect(pushResponse.successes.length).to.eq(0); - expect(pushResponse.failures?.length).to.be.greaterThan(0); + expect(pushResponse.successes.length).to.eq(1); + + expect( + await messageCollector.waitForMessages(1, { + pubsubTopic: TestPubsubTopic + }) + ).to.eq(true); + messageCollector.verifyReceivedMessage(0, { + expectedMessageText: messageText, + expectedContentTopic: TestContentTopic, + expectedPubsubTopic: TestPubsubTopic + }); }); [ @@ -118,8 +245,19 @@ describe("Waku Light Push: Single Node: Fails as expected", function () { payload: utf8ToBytes(messageText), timestamp: new Date(testItem) }); - expect(pushResponse.successes.length).to.eq(0); - expect(pushResponse.failures?.length).to.be.greaterThan(0); + expect(pushResponse.successes.length).to.eq(1); + + expect( + await messageCollector.waitForMessages(1, { + pubsubTopic: TestPubsubTopic + }) + ).to.eq(true); + messageCollector.verifyReceivedMessage(0, { + expectedMessageText: messageText, + expectedTimestamp: testItem, + expectedContentTopic: TestContentTopic, + expectedPubsubTopic: TestPubsubTopic + }); }); }); @@ -128,8 +266,7 @@ describe("Waku Light Push: Single Node: Fails as expected", function () { const pushResponse = await waku.lightPush.send(TestEncoder, { payload: bigPayload }); - expect(pushResponse.successes.length).to.eq(0); - expect(pushResponse.failures?.length).to.be.greaterThan(0); + expect(pushResponse.successes.length).to.greaterThan(0); }); it("Fails to push message bigger that 1MB", async function () { @@ -142,5 +279,10 @@ describe("Waku Light Push: Single Node: Fails as expected", function () { expect(pushResponse.failures?.map((failure) => failure.error)).to.include( ProtocolError.SIZE_TOO_BIG ); + expect( + await messageCollector.waitForMessages(1, { + pubsubTopic: TestPubsubTopic + }) + ).to.eq(false); }); }); diff --git a/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts b/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts index 7e5fdd71fe..02e6c7668a 100644 --- a/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts +++ b/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts @@ -16,20 +16,24 @@ import { } from "@waku/utils"; import { utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; +import { Context } from "mocha"; import { afterEachCustom, beforeEachCustom, - runMultipleNodes, - ServiceNodesFleet, + makeLogFileName, + MessageCollector, + ServiceNode, tearDownNodes } from "../../../src/index.js"; -import { messageText } from "../utils.js"; +import { messageText, runNodes } from "../utils.js"; describe("Waku Light Push : Multiple PubsubTopics", function () { this.timeout(30000); let waku: LightNode; - let serviceNodes: ServiceNodesFleet; + let nwaku: ServiceNode; + let nwaku2: ServiceNode; + let messageCollector: MessageCollector; const shardInfo: ShardInfo = { clusterId: 3, shards: [1, 2] }; const singleShardInfo1: SingleShardInfo = { clusterId: 3, shard: 1 }; @@ -51,19 +55,13 @@ describe("Waku Light Push : Multiple PubsubTopics", function () { let node1PeerId: PeerId; beforeEachCustom(this, async () => { - [serviceNodes, waku] = await runMultipleNodes( - this.ctx, - shardInfo, - undefined, - true, - 2, - true - ); - node1PeerId = await serviceNodes.nodes[0].getPeerId(); + [nwaku, waku] = await runNodes(this.ctx, shardInfo); + messageCollector = new MessageCollector(nwaku); + node1PeerId = await nwaku.getPeerId(); }); afterEachCustom(this, async () => { - await tearDownNodes(serviceNodes.nodes, waku); + await tearDownNodes([nwaku, nwaku2], waku); }); it("Push message on custom pubsubTopic", async function () { @@ -74,11 +72,11 @@ describe("Waku Light Push : Multiple PubsubTopics", function () { expect(pushResponse.successes[0].toString()).to.eq(node1PeerId.toString()); expect( - await serviceNodes.messageCollector.waitForMessages(1, { + await messageCollector.waitForMessages(1, { pubsubTopic: customPubsubTopic1 }) ).to.eq(true); - serviceNodes.messageCollector.verifyReceivedMessage(0, { + messageCollector.verifyReceivedMessage(0, { expectedMessageText: messageText, expectedContentTopic: customContentTopic1 }); @@ -94,48 +92,50 @@ describe("Waku Light Push : Multiple PubsubTopics", function () { expect(pushResponse1.successes[0].toString()).to.eq(node1PeerId.toString()); expect(pushResponse2.successes[0].toString()).to.eq(node1PeerId.toString()); + const messageCollector2 = new MessageCollector(nwaku); + expect( - await serviceNodes.messageCollector.waitForMessages(1, { + await messageCollector.waitForMessages(1, { pubsubTopic: customPubsubTopic1 }) ).to.eq(true); expect( - await serviceNodes.messageCollector.waitForMessages(1, { + await messageCollector2.waitForMessages(1, { pubsubTopic: customPubsubTopic2 }) ).to.eq(true); - serviceNodes.messageCollector.verifyReceivedMessage(0, { + messageCollector.verifyReceivedMessage(0, { expectedMessageText: "M1", expectedContentTopic: customContentTopic1, expectedPubsubTopic: customPubsubTopic1 }); - serviceNodes.messageCollector.verifyReceivedMessage(1, { + messageCollector2.verifyReceivedMessage(0, { expectedMessageText: "M2", expectedContentTopic: customContentTopic2, expectedPubsubTopic: customPubsubTopic2 }); }); - it("Light push messages to 2 service nodes each with different pubsubtopics", async function () { - const [serviceNodes2, waku2] = await runMultipleNodes( - this.ctx, - { - clusterId: singleShardInfo2.clusterId, - shards: [singleShardInfo2.shard!] - }, - undefined, - true, - 1 - ); - - await serviceNodes2.nodes[0].ensureSubscriptions([ + it("Light push messages to 2 nwaku nodes each with different pubsubtopics", async function () { + // Set up and start a new nwaku node with Default PubsubTopic + nwaku2 = new ServiceNode(makeLogFileName(this) + "2"); + await nwaku2.start({ + filter: true, + lightpush: true, + relay: true, + pubsubTopic: [singleShardInfoToPubsubTopic(singleShardInfo2)], + clusterId: singleShardInfo2.clusterId + }); + await nwaku2.ensureSubscriptions([ singleShardInfoToPubsubTopic(singleShardInfo2) ]); - await waku.dial(await serviceNodes2.nodes[0].getMultiaddrWithId()); + await waku.dial(await nwaku2.getMultiaddrWithId()); await waku.waitForPeers([Protocols.LightPush]); + const messageCollector2 = new MessageCollector(nwaku2); + await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") }); @@ -143,33 +143,33 @@ describe("Waku Light Push : Multiple PubsubTopics", function () { payload: utf8ToBytes("M2") }); - await serviceNodes.messageCollector.waitForMessages(1, { + await messageCollector.waitForMessages(1, { pubsubTopic: customPubsubTopic1 }); - await serviceNodes2.messageCollector.waitForMessages(1, { + + await messageCollector2.waitForMessages(1, { pubsubTopic: singleShardInfoToPubsubTopic(singleShardInfo2) }); - serviceNodes.messageCollector.verifyReceivedMessage(0, { + messageCollector.verifyReceivedMessage(0, { expectedMessageText: "M1", expectedContentTopic: customContentTopic1, expectedPubsubTopic: customPubsubTopic1 }); - serviceNodes2.messageCollector.verifyReceivedMessage(0, { + messageCollector2.verifyReceivedMessage(0, { expectedMessageText: "M2", expectedContentTopic: customContentTopic2, expectedPubsubTopic: singleShardInfoToPubsubTopic(singleShardInfo2) }); - - // Clean up second fleet - await tearDownNodes(serviceNodes2.nodes, waku2); }); }); describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () { this.timeout(30000); let waku: LightNode; - let serviceNodes: ServiceNodesFleet; + let nwaku: ServiceNode; + let nwaku2: ServiceNode; + let messageCollector: MessageCollector; const clusterId = 4; const customContentTopic1 = "/waku/2/content/test.js"; @@ -198,19 +198,13 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () { let node1PeerId: PeerId; beforeEachCustom(this, async () => { - [serviceNodes, waku] = await runMultipleNodes( - this.ctx, - shardInfo, - undefined, - true, - 2, - true - ); - node1PeerId = await serviceNodes.nodes[0].getPeerId(); + [nwaku, waku] = await runNodes(this.ctx, shardInfo); + messageCollector = new MessageCollector(nwaku); + node1PeerId = await nwaku.getPeerId(); }); afterEachCustom(this, async () => { - await tearDownNodes(serviceNodes.nodes, waku); + await tearDownNodes([nwaku, nwaku2], waku); }); it("Push message on custom pubsubTopic", async function () { @@ -218,14 +212,15 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () { payload: utf8ToBytes(messageText) }); + expect(pushResponse.failures).to.be.empty; expect(pushResponse.successes[0].toString()).to.eq(node1PeerId.toString()); expect( - await serviceNodes.messageCollector.waitForMessages(1, { - pubsubTopic: autoshardingPubsubTopic1 + await messageCollector.waitForMessagesAutosharding(1, { + contentTopic: customContentTopic1 }) ).to.eq(true); - serviceNodes.messageCollector.verifyReceivedMessage(0, { + messageCollector.verifyReceivedMessage(0, { expectedMessageText: messageText, expectedContentTopic: customContentTopic1 }); @@ -241,46 +236,48 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () { expect(pushResponse1.successes[0].toString()).to.eq(node1PeerId.toString()); expect(pushResponse2.successes[0].toString()).to.eq(node1PeerId.toString()); + const messageCollector2 = new MessageCollector(nwaku); + expect( - await serviceNodes.messageCollector.waitForMessages(1, { - pubsubTopic: autoshardingPubsubTopic1 + await messageCollector.waitForMessagesAutosharding(1, { + contentTopic: customContentTopic1 }) ).to.eq(true); expect( - await serviceNodes.messageCollector.waitForMessages(1, { - pubsubTopic: autoshardingPubsubTopic2 + await messageCollector2.waitForMessagesAutosharding(1, { + contentTopic: customContentTopic2 }) ).to.eq(true); - serviceNodes.messageCollector.verifyReceivedMessage(0, { + messageCollector.verifyReceivedMessage(0, { expectedMessageText: "M1", expectedContentTopic: customContentTopic1, expectedPubsubTopic: autoshardingPubsubTopic1 }); - serviceNodes.messageCollector.verifyReceivedMessage(1, { + messageCollector2.verifyReceivedMessage(0, { expectedMessageText: "M2", expectedContentTopic: customContentTopic2, expectedPubsubTopic: autoshardingPubsubTopic2 }); }); - it("Light push messages to 2 service nodes each with different pubsubtopics", async function () { - // Create a second fleet for the second pubsub topic - const [serviceNodes2, waku2] = await runMultipleNodes( - this.ctx, - { clusterId, contentTopics: [customContentTopic2] }, - undefined, - true, - 1 // Only need one node for second fleet - ); - - await serviceNodes2.nodes[0].ensureSubscriptionsAutosharding([ - customContentTopic2 - ]); - await waku.dial(await serviceNodes2.nodes[0].getMultiaddrWithId()); + it("Light push messages to 2 nwaku nodes each with different pubsubtopics", async function () { + // Set up and start a new nwaku node with Default PubsubTopic + nwaku2 = new ServiceNode(makeLogFileName(this) + "2"); + await nwaku2.start({ + filter: true, + lightpush: true, + relay: true, + pubsubTopic: [autoshardingPubsubTopic2], + clusterId: shardInfo.clusterId + }); + await nwaku2.ensureSubscriptionsAutosharding([customContentTopic2]); + await waku.dial(await nwaku2.getMultiaddrWithId()); await waku.waitForPeers([Protocols.LightPush]); + const messageCollector2 = new MessageCollector(nwaku2); + await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") }); @@ -288,33 +285,34 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () { payload: utf8ToBytes("M2") }); - await serviceNodes.messageCollector.waitForMessagesAutosharding(1, { + await messageCollector.waitForMessagesAutosharding(1, { contentTopic: customContentTopic1 }); - await serviceNodes2.messageCollector.waitForMessagesAutosharding(1, { + await messageCollector2.waitForMessagesAutosharding(1, { contentTopic: customContentTopic2 }); - serviceNodes.messageCollector.verifyReceivedMessage(0, { + messageCollector.verifyReceivedMessage(0, { expectedMessageText: "M1", expectedContentTopic: customContentTopic1, expectedPubsubTopic: autoshardingPubsubTopic1 }); - serviceNodes2.messageCollector.verifyReceivedMessage(0, { + messageCollector2.verifyReceivedMessage(0, { expectedMessageText: "M2", expectedContentTopic: customContentTopic2, expectedPubsubTopic: autoshardingPubsubTopic2 }); - - // Clean up second fleet - await tearDownNodes(serviceNodes2.nodes, waku2); }); }); describe("Waku Light Push (named sharding): Multiple PubsubTopics", function () { this.timeout(30000); let waku: LightNode; - let serviceNodes: ServiceNodesFleet; + let waku2: LightNode; + let nwaku: ServiceNode; + let nwaku2: ServiceNode; + let messageCollector: MessageCollector; + let ctx: Context; const clusterId = 3; const customContentTopic1 = "/waku/2/content/utf8"; @@ -357,19 +355,14 @@ describe("Waku Light Push (named sharding): Multiple PubsubTopics", function () let node1PeerId: PeerId; beforeEachCustom(this, async () => { - [serviceNodes, waku] = await runMultipleNodes( - this.ctx, - testShardInfo, - undefined, - true, - 2, - true - ); - node1PeerId = await serviceNodes.nodes[0].getPeerId(); + ctx = this.ctx; + [nwaku, waku] = await runNodes(ctx, testShardInfo); + messageCollector = new MessageCollector(nwaku); + node1PeerId = await nwaku.getPeerId(); }); afterEachCustom(this, async () => { - await tearDownNodes(serviceNodes.nodes, waku); + await tearDownNodes([nwaku, nwaku2], [waku, waku2]); }); it("Push message on custom pubsubTopic", async function () { @@ -380,11 +373,11 @@ describe("Waku Light Push (named sharding): Multiple PubsubTopics", function () expect(pushResponse.successes[0].toString()).to.eq(node1PeerId.toString()); expect( - await serviceNodes.messageCollector.waitForMessages(1, { + await messageCollector.waitForMessages(1, { pubsubTopic: autoshardingPubsubTopic1 }) ).to.eq(true); - serviceNodes.messageCollector.verifyReceivedMessage(0, { + messageCollector.verifyReceivedMessage(0, { expectedMessageText: messageText, expectedContentTopic: customContentTopic1 }); @@ -400,71 +393,68 @@ describe("Waku Light Push (named sharding): Multiple PubsubTopics", function () expect(pushResponse1.successes[0].toString()).to.eq(node1PeerId.toString()); expect(pushResponse2.successes[0].toString()).to.eq(node1PeerId.toString()); + const messageCollector2 = new MessageCollector(nwaku); + expect( - await serviceNodes.messageCollector.waitForMessages(1, { + await messageCollector.waitForMessages(1, { pubsubTopic: autoshardingPubsubTopic1 }) ).to.eq(true); expect( - await serviceNodes.messageCollector.waitForMessages(1, { + await messageCollector2.waitForMessages(1, { pubsubTopic: autoshardingPubsubTopic2 }) ).to.eq(true); - serviceNodes.messageCollector.verifyReceivedMessage(0, { + messageCollector.verifyReceivedMessage(0, { expectedMessageText: "M1", expectedContentTopic: customContentTopic1, expectedPubsubTopic: autoshardingPubsubTopic1 }); - serviceNodes.messageCollector.verifyReceivedMessage(1, { + messageCollector2.verifyReceivedMessage(0, { expectedMessageText: "M2", expectedContentTopic: customContentTopic2, expectedPubsubTopic: autoshardingPubsubTopic2 }); }); - it("Light push messages to 2 service nodes each with different pubsubtopics", async function () { - const [serviceNodes2, waku2] = await runMultipleNodes( - this.ctx, - shardInfo2, - undefined, - true, - 1 - ); + it("Light push messages to 2 nwaku nodes each with different pubsubtopics", async function () { + // Set up and start a new nwaku node with Default PubsubTopic + [nwaku2] = await runNodes(ctx, shardInfo2); - await serviceNodes2.nodes[0].ensureSubscriptions([ - autoshardingPubsubTopic2 - ]); - await waku.dial(await serviceNodes2.nodes[0].getMultiaddrWithId()); + await nwaku2.ensureSubscriptions([autoshardingPubsubTopic2]); + await waku.dial(await nwaku2.getMultiaddrWithId()); await waku.waitForPeers([Protocols.LightPush]); - await waku.lightPush.send(customEncoder1, { + const messageCollector2 = new MessageCollector(nwaku2); + + const { failures: f1 } = await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") }); - await waku.lightPush.send(customEncoder2, { + const { failures: f2 } = await waku.lightPush.send(customEncoder2, { payload: utf8ToBytes("M2") }); - await serviceNodes.messageCollector.waitForMessages(1, { + expect(f1).to.be.empty; + expect(f2).to.be.empty; + + await messageCollector.waitForMessages(1, { pubsubTopic: autoshardingPubsubTopic1 }); - await serviceNodes2.messageCollector.waitForMessages(1, { + await messageCollector2.waitForMessages(1, { pubsubTopic: autoshardingPubsubTopic2 }); - serviceNodes.messageCollector.verifyReceivedMessage(0, { + messageCollector.verifyReceivedMessage(0, { expectedMessageText: "M1", expectedContentTopic: customContentTopic1, expectedPubsubTopic: autoshardingPubsubTopic1 }); - serviceNodes2.messageCollector.verifyReceivedMessage(0, { + messageCollector2.verifyReceivedMessage(0, { expectedMessageText: "M2", expectedContentTopic: customContentTopic2, expectedPubsubTopic: autoshardingPubsubTopic2 }); - - // Clean up second fleet - await tearDownNodes(serviceNodes2.nodes, waku2); }); }); diff --git a/packages/tests/tests/sharding/auto_sharding.spec.ts b/packages/tests/tests/sharding/auto_sharding.spec.ts index 37410df49d..e2cdca6d41 100644 --- a/packages/tests/tests/sharding/auto_sharding.spec.ts +++ b/packages/tests/tests/sharding/auto_sharding.spec.ts @@ -1,4 +1,4 @@ -import { LightNode, ProtocolError } from "@waku/interfaces"; +import { LightNode, ProtocolError, Protocols } from "@waku/interfaces"; import { createEncoder, createLightNode, utf8ToBytes } from "@waku/sdk"; import { contentTopicToPubsubTopic, @@ -8,8 +8,10 @@ import { expect } from "chai"; import { afterEachCustom, - runMultipleNodes, - ServiceNodesFleet, + beforeEachCustom, + makeLogFileName, + MessageCollector, + ServiceNode, tearDownNodes } from "../../src/index.js"; @@ -20,33 +22,41 @@ describe("Autosharding: Running Nodes", function () { this.timeout(50000); const clusterId = 10; let waku: LightNode; - let serviceNodes: ServiceNodesFleet; + let nwaku: ServiceNode; + let messageCollector: MessageCollector; + + beforeEachCustom(this, async () => { + nwaku = new ServiceNode(makeLogFileName(this.ctx)); + messageCollector = new MessageCollector(nwaku); + }); afterEachCustom(this, async () => { - await tearDownNodes(serviceNodes.nodes, waku); + await tearDownNodes(nwaku, waku); }); describe("Different clusters and topics", function () { + // js-waku allows autosharding for cluster IDs different than 1 it("Cluster ID 0 - Default/Global Cluster", async function () { const clusterId = 0; const pubsubTopics = [contentTopicToPubsubTopic(ContentTopic, clusterId)]; + await nwaku.start({ + store: true, + lightpush: true, + relay: true, + clusterId: clusterId, + pubsubTopic: pubsubTopics + }); - [serviceNodes, waku] = await runMultipleNodes( - this.ctx, - { - clusterId, + await nwaku.ensureSubscriptions(pubsubTopics); + + waku = await createLightNode({ + networkConfig: { + clusterId: clusterId, contentTopics: [ContentTopic] - }, - { - store: true, - lightpush: true, - relay: true, - pubsubTopic: pubsubTopics - }, - false, - 2, - true - ); + } + }); + await waku.dial(await nwaku.getMultiaddrWithId()); + await waku.waitForPeers([Protocols.LightPush]); const encoder = createEncoder({ contentTopic: ContentTopic, @@ -60,9 +70,9 @@ describe("Autosharding: Running Nodes", function () { payload: utf8ToBytes("Hello World") }); - expect(request.successes.length).to.eq(2); // Expect 2 successes for 2 nodes + expect(request.successes.length).to.eq(1); expect( - await serviceNodes.messageCollector.waitForMessagesAutosharding(1, { + await messageCollector.waitForMessagesAutosharding(1, { contentTopic: ContentTopic }) ).to.eq(true); @@ -71,23 +81,24 @@ describe("Autosharding: Running Nodes", function () { it("Non TWN Cluster", async function () { const clusterId = 5; const pubsubTopics = [contentTopicToPubsubTopic(ContentTopic, clusterId)]; + await nwaku.start({ + store: true, + lightpush: true, + relay: true, + clusterId: clusterId, + pubsubTopic: pubsubTopics + }); - [serviceNodes, waku] = await runMultipleNodes( - this.ctx, - { - clusterId, + await nwaku.ensureSubscriptions(pubsubTopics); + + waku = await createLightNode({ + networkConfig: { + clusterId: clusterId, contentTopics: [ContentTopic] - }, - { - store: true, - lightpush: true, - relay: true, - pubsubTopic: pubsubTopics - }, - false, - 2, - true - ); + } + }); + await waku.dial(await nwaku.getMultiaddrWithId()); + await waku.waitForPeers([Protocols.LightPush]); const encoder = createEncoder({ contentTopic: ContentTopic, @@ -101,9 +112,9 @@ describe("Autosharding: Running Nodes", function () { payload: utf8ToBytes("Hello World") }); - expect(request.successes.length).to.eq(2); // Expect 2 successes for 2 nodes + expect(request.successes.length).to.eq(1); expect( - await serviceNodes.messageCollector.waitForMessagesAutosharding(1, { + await messageCollector.waitForMessagesAutosharding(1, { contentTopic: ContentTopic }) ).to.eq(true); @@ -127,23 +138,24 @@ describe("Autosharding: Running Nodes", function () { contentTopicToPubsubTopic(ContentTopic, clusterId) ]; - [serviceNodes, waku] = await runMultipleNodes( - this.ctx, - { - clusterId, + await nwaku.start({ + store: true, + lightpush: true, + relay: true, + clusterId: clusterId, + pubsubTopic: pubsubTopics, + contentTopic: [ContentTopic] + }); + + waku = await createLightNode({ + networkConfig: { + clusterId: clusterId, contentTopics: [ContentTopic] - }, - { - store: true, - lightpush: true, - relay: true, - pubsubTopic: pubsubTopics, - contentTopic: [ContentTopic] - }, - false, - 2, - true - ); + } + }); + + await waku.dial(await nwaku.getMultiaddrWithId()); + await waku.waitForPeers([Protocols.LightPush]); const encoder = createEncoder({ contentTopic: ContentTopic, @@ -157,9 +169,9 @@ describe("Autosharding: Running Nodes", function () { payload: utf8ToBytes("Hello World") }); - expect(request.successes.length).to.eq(2); // Expect 2 successes for 2 nodes + expect(request.successes.length).to.eq(1); expect( - await serviceNodes.messageCollector.waitForMessagesAutosharding(1, { + await messageCollector.waitForMessagesAutosharding(1, { contentTopic: ContentTopic }) ).to.eq(true); @@ -189,23 +201,24 @@ describe("Autosharding: Running Nodes", function () { contentTopicToPubsubTopic(ContentTopic2, clusterId) ]; - [serviceNodes, waku] = await runMultipleNodes( - this.ctx, - { - clusterId, + await nwaku.start({ + store: true, + lightpush: true, + relay: true, + clusterId: clusterId, + pubsubTopic: pubsubTopics, + contentTopic: [ContentTopic, ContentTopic2] + }); + + waku = await createLightNode({ + networkConfig: { + clusterId: clusterId, + // For autosharding, we configure multiple pubsub topics by using two content topics that hash to different shards contentTopics: [ContentTopic, ContentTopic2] - }, - { - store: true, - lightpush: true, - relay: true, - pubsubTopic: pubsubTopics, - contentTopic: [ContentTopic, ContentTopic2] - }, - false, - 2, - true - ); + } + }); + await waku.dial(await nwaku.getMultiaddrWithId()); + await waku.waitForPeers([Protocols.LightPush]); const encoder1 = createEncoder({ contentTopic: ContentTopic, @@ -226,9 +239,9 @@ describe("Autosharding: Running Nodes", function () { const request1 = await waku.lightPush.send(encoder1, { payload: utf8ToBytes("Hello World") }); - expect(request1.successes.length).to.eq(2); // Expect 2 successes for 2 nodes + expect(request1.successes.length).to.eq(1); expect( - await serviceNodes.messageCollector.waitForMessagesAutosharding(1, { + await messageCollector.waitForMessagesAutosharding(1, { contentTopic: ContentTopic }) ).to.eq(true); @@ -236,9 +249,9 @@ describe("Autosharding: Running Nodes", function () { const request2 = await waku.lightPush.send(encoder2, { payload: utf8ToBytes("Hello World") }); - expect(request2.successes.length).to.eq(2); // Expect 2 successes for 2 nodes + expect(request2.successes.length).to.eq(1); expect( - await serviceNodes.messageCollector.waitForMessagesAutosharding(1, { + await messageCollector.waitForMessagesAutosharding(1, { contentTopic: ContentTopic }) ).to.eq(true); @@ -246,24 +259,21 @@ describe("Autosharding: Running Nodes", function () { it("using a protocol with unconfigured pubsub topic should fail", async function () { const pubsubTopics = [contentTopicToPubsubTopic(ContentTopic, clusterId)]; + await nwaku.start({ + store: true, + lightpush: true, + relay: true, + clusterId: clusterId, + pubsubTopic: pubsubTopics, + contentTopic: [ContentTopic] + }); - [serviceNodes, waku] = await runMultipleNodes( - this.ctx, - { - clusterId, + waku = await createLightNode({ + networkConfig: { + clusterId: clusterId, contentTopics: [ContentTopic] - }, - { - store: true, - lightpush: true, - relay: true, - pubsubTopic: pubsubTopics, - contentTopic: [ContentTopic] - }, - false, - 2, - true - ); + } + }); // use a content topic that is not configured const encoder = createEncoder({ @@ -278,17 +288,17 @@ describe("Autosharding: Running Nodes", function () { payload: utf8ToBytes("Hello World") }); - if (successes.length > 0 || !failures?.length) { + if (successes.length > 0 || failures?.length === 0) { throw new Error("The request should've thrown an error"); } - const errors = failures.map((failure) => failure.error); + const errors = failures?.map((failure) => failure.error); expect(errors).to.include(ProtocolError.TOPIC_NOT_CONFIGURED); }); it("start node with empty content topic", async function () { try { - await createLightNode({ + waku = await createLightNode({ networkConfig: { clusterId: clusterId, contentTopics: [] diff --git a/packages/tests/tests/sharding/static_sharding.spec.ts b/packages/tests/tests/sharding/static_sharding.spec.ts index 61db888be5..f4cdccb4ee 100644 --- a/packages/tests/tests/sharding/static_sharding.spec.ts +++ b/packages/tests/tests/sharding/static_sharding.spec.ts @@ -1,6 +1,7 @@ import { LightNode, ProtocolError, + Protocols, ShardInfo, SingleShardInfo } from "@waku/interfaces"; @@ -14,20 +15,28 @@ import { expect } from "chai"; import { afterEachCustom, - runMultipleNodes, - ServiceNodesFleet, + beforeEachCustom, + makeLogFileName, + MessageCollector, + ServiceNode, tearDownNodes } from "../../src/index.js"; const ContentTopic = "/waku/2/content/test.js"; describe("Static Sharding: Running Nodes", function () { - this.timeout(60_000); + this.timeout(15_000); let waku: LightNode; - let serviceNodes: ServiceNodesFleet; + let nwaku: ServiceNode; + let messageCollector: MessageCollector; + + beforeEachCustom(this, async () => { + nwaku = new ServiceNode(makeLogFileName(this.ctx)); + messageCollector = new MessageCollector(nwaku); + }); afterEachCustom(this, async () => { - await tearDownNodes(serviceNodes.nodes, waku); + await tearDownNodes(nwaku, waku); }); describe("Different clusters and shards", function () { @@ -35,20 +44,20 @@ describe("Static Sharding: Running Nodes", function () { const singleShardInfo = { clusterId: 0, shard: 0 }; const shardInfo = singleShardInfosToShardInfo([singleShardInfo]); - [serviceNodes, waku] = await runMultipleNodes( - this.ctx, - shardInfo, - { - store: true, - lightpush: true, - relay: true, - pubsubTopic: shardInfoToPubsubTopics(shardInfo), - clusterId: singleShardInfo.clusterId - }, - false, - 2, - true - ); + await nwaku.start({ + store: true, + lightpush: true, + relay: true, + pubsubTopic: shardInfoToPubsubTopics(shardInfo) + }); + + await nwaku.ensureSubscriptions(shardInfoToPubsubTopics(shardInfo)); + + waku = await createLightNode({ + networkConfig: shardInfo + }); + await waku.dial(await nwaku.getMultiaddrWithId()); + await waku.waitForPeers([Protocols.LightPush]); const encoder = createEncoder({ contentTopic: ContentTopic, @@ -62,32 +71,33 @@ describe("Static Sharding: Running Nodes", function () { payload: utf8ToBytes("Hello World") }); - expect(request.successes.length).to.eq(2); + expect(request.successes.length).to.eq(1); expect( - await serviceNodes.messageCollector.waitForMessages(1, { + await messageCollector.waitForMessages(1, { pubsubTopic: shardInfoToPubsubTopics(shardInfo)[0] }) ).to.eq(true); }); + // dedicated test for Default Cluster ID 0 it("Cluster ID 0 - Default/Global Cluster", async function () { const singleShardInfo = { clusterId: 0, shard: 1 }; const shardInfo = singleShardInfosToShardInfo([singleShardInfo]); - [serviceNodes, waku] = await runMultipleNodes( - this.ctx, - shardInfo, - { - store: true, - lightpush: true, - relay: true, - pubsubTopic: shardInfoToPubsubTopics(shardInfo), - clusterId: singleShardInfo.clusterId - }, - false, - 2, - true - ); + await nwaku.start({ + store: true, + lightpush: true, + relay: true, + pubsubTopic: shardInfoToPubsubTopics(shardInfo) + }); + + await nwaku.ensureSubscriptions(shardInfoToPubsubTopics(shardInfo)); + + waku = await createLightNode({ + networkConfig: shardInfo + }); + await waku.dial(await nwaku.getMultiaddrWithId()); + await waku.waitForPeers([Protocols.LightPush]); const encoder = createEncoder({ contentTopic: ContentTopic, @@ -98,9 +108,9 @@ describe("Static Sharding: Running Nodes", function () { payload: utf8ToBytes("Hello World") }); - expect(request.successes.length).to.eq(2); + expect(request.successes.length).to.eq(1); expect( - await serviceNodes.messageCollector.waitForMessages(1, { + await messageCollector.waitForMessages(1, { pubsubTopic: shardInfoToPubsubTopics(shardInfo)[0] }) ).to.eq(true); @@ -110,29 +120,33 @@ describe("Static Sharding: Running Nodes", function () { for (let i = 0; i < numTest; i++) { // Random clusterId between 2 and 1000 const clusterId = Math.floor(Math.random() * 999) + 2; + // Random shardId between 1 and 1000 const shardId = Math.floor(Math.random() * 1000) + 1; it(`random static sharding ${ i + 1 } - Cluster ID: ${clusterId}, Shard ID: ${shardId}`, async function () { + afterEach(async () => { + await tearDownNodes(nwaku, waku); + }); + const singleShardInfo = { clusterId: clusterId, shard: shardId }; const shardInfo = singleShardInfosToShardInfo([singleShardInfo]); - [serviceNodes, waku] = await runMultipleNodes( - this.ctx, - shardInfo, - { - store: true, - lightpush: true, - relay: true, - clusterId: clusterId, - pubsubTopic: shardInfoToPubsubTopics(shardInfo) - }, - false, - 2, - true - ); + await nwaku.start({ + store: true, + lightpush: true, + relay: true, + clusterId: clusterId, + pubsubTopic: shardInfoToPubsubTopics(shardInfo) + }); + + waku = await createLightNode({ + networkConfig: shardInfo + }); + await waku.dial(await nwaku.getMultiaddrWithId()); + await waku.waitForPeers([Protocols.LightPush]); const encoder = createEncoder({ contentTopic: ContentTopic, @@ -143,9 +157,9 @@ describe("Static Sharding: Running Nodes", function () { payload: utf8ToBytes("Hello World") }); - expect(request.successes.length).to.eq(2); + expect(request.successes.length).to.eq(1); expect( - await serviceNodes.messageCollector.waitForMessages(1, { + await messageCollector.waitForMessages(1, { pubsubTopic: shardInfoToPubsubTopics(shardInfo)[0] }) ).to.eq(true); @@ -155,14 +169,12 @@ describe("Static Sharding: Running Nodes", function () { describe("Others", function () { const clusterId = 2; + let shardInfo: ShardInfo; + const shardInfoFirstShard: ShardInfo = { clusterId: clusterId, shards: [2] }; - const shardInfoSecondShard: ShardInfo = { - clusterId: clusterId, - shards: [3] - }; const shardInfoBothShards: ShardInfo = { clusterId: clusterId, shards: [2, 3] @@ -176,21 +188,31 @@ describe("Static Sharding: Running Nodes", function () { shard: 3 }; + beforeEachCustom(this, async () => { + shardInfo = { + clusterId: clusterId, + shards: [2] + }; + + await nwaku.start({ + store: true, + lightpush: true, + relay: true, + clusterId: clusterId, + pubsubTopic: shardInfoToPubsubTopics(shardInfo) + }); + }); + + afterEachCustom(this, async () => { + await tearDownNodes(nwaku, waku); + }); + it("configure the node with multiple pubsub topics", async function () { - [serviceNodes, waku] = await runMultipleNodes( - this.ctx, - shardInfoBothShards, - { - store: true, - lightpush: true, - relay: true, - clusterId: clusterId, - pubsubTopic: shardInfoToPubsubTopics(shardInfoBothShards) - }, - false, - 2, - true - ); + waku = await createLightNode({ + networkConfig: shardInfoBothShards + }); + await waku.dial(await nwaku.getMultiaddrWithId()); + await waku.waitForPeers([Protocols.LightPush]); const encoder1 = createEncoder({ contentTopic: ContentTopic, @@ -205,40 +227,29 @@ describe("Static Sharding: Running Nodes", function () { const request1 = await waku.lightPush.send(encoder1, { payload: utf8ToBytes("Hello World2") }); - expect(request1.successes.length).to.eq(2); + expect(request1.successes.length).to.eq(1); expect( - await serviceNodes.messageCollector.waitForMessages(1, { - pubsubTopic: shardInfoToPubsubTopics(shardInfoFirstShard)[0] + await messageCollector.waitForMessages(1, { + pubsubTopic: shardInfoToPubsubTopics(shardInfo)[0] }) ).to.eq(true); const request2 = await waku.lightPush.send(encoder2, { payload: utf8ToBytes("Hello World3") }); - expect(request2.successes.length).to.eq(2); + expect(request2.successes.length).to.eq(1); expect( - await serviceNodes.messageCollector.waitForMessages(1, { - pubsubTopic: shardInfoToPubsubTopics(shardInfoSecondShard)[0] + await messageCollector.waitForMessages(1, { + pubsubTopic: shardInfoToPubsubTopics(shardInfo)[0] }) ).to.eq(true); }); it("using a protocol with unconfigured pubsub topic should fail", async function () { this.timeout(15_000); - [serviceNodes, waku] = await runMultipleNodes( - this.ctx, - shardInfoFirstShard, - { - store: true, - lightpush: true, - relay: true, - clusterId: clusterId, - pubsubTopic: shardInfoToPubsubTopics(shardInfoFirstShard) - }, - false, - 2, - true - ); + waku = await createLightNode({ + networkConfig: shardInfoFirstShard + }); // use a pubsub topic that is not configured const encoder = createEncoder({ @@ -250,17 +261,17 @@ describe("Static Sharding: Running Nodes", function () { payload: utf8ToBytes("Hello World") }); - if (successes.length > 0 || !failures?.length) { + if (successes.length > 0 || failures?.length === 0) { throw new Error("The request should've thrown an error"); } - const errors = failures.map((failure) => failure.error); + const errors = failures?.map((failure) => failure.error); expect(errors).to.include(ProtocolError.TOPIC_NOT_CONFIGURED); }); it("start node with empty shard should fail", async function () { try { - await createLightNode({ + waku = await createLightNode({ networkConfig: { clusterId: clusterId, shards: [] } }); throw new Error( diff --git a/packages/tests/tests/store/index.node.spec.ts b/packages/tests/tests/store/index.node.spec.ts index c95acffc86..d9a055fb3e 100644 --- a/packages/tests/tests/store/index.node.spec.ts +++ b/packages/tests/tests/store/index.node.spec.ts @@ -304,10 +304,13 @@ describe("Waku Store, general", function () { for await (const msg of query) { if (msg) { messages.push(msg as DecodedMessage); + console.log(bytesToUtf8(msg.payload!)); } } } + console.log(messages.length); + // Messages are ordered from oldest to latest within a page (1 page query) expect(bytesToUtf8(messages[0].payload!)).to.eq(asymText); expect(bytesToUtf8(messages[1].payload!)).to.eq(symText); diff --git a/packages/tests/tests/store/multiple_pubsub.spec.ts b/packages/tests/tests/store/multiple_pubsub.spec.ts index fb4303a25e..99878acb96 100644 --- a/packages/tests/tests/store/multiple_pubsub.spec.ts +++ b/packages/tests/tests/store/multiple_pubsub.spec.ts @@ -1,24 +1,26 @@ import { createDecoder } from "@waku/core"; -import type { AutoSharding, IMessage, LightNode } from "@waku/interfaces"; -import { Protocols } from "@waku/sdk"; -import { contentTopicToPubsubTopic } from "@waku/utils"; +import type { ContentTopicInfo, IMessage, LightNode } from "@waku/interfaces"; +import { createLightNode, Protocols } from "@waku/sdk"; +import { + contentTopicToPubsubTopic, + pubsubTopicToSingleShardInfo +} from "@waku/utils"; import { expect } from "chai"; import { afterEachCustom, beforeEachCustom, makeLogFileName, - runMultipleNodes, + NOISE_KEY_1, ServiceNode, - ServiceNodesFleet, - tearDownNodes, - teardownNodesWithRedundancy + tearDownNodes } from "../../src/index.js"; import { processQueriedMessages, runStoreNodes, sendMessages, + sendMessagesAutosharding, TestDecoder, TestDecoder2, TestShardInfo, @@ -154,7 +156,8 @@ describe("Waku Store, custom pubsub topic", function () { describe("Waku Store (Autosharding), custom pubsub topic", function () { this.timeout(15000); let waku: LightNode; - let serviceNodes: ServiceNodesFleet; + let nwaku: ServiceNode; + let nwaku2: ServiceNode; const customContentTopic1 = "/waku/2/content/utf8"; const customContentTopic2 = "/myapp/1/latest/proto"; @@ -167,35 +170,29 @@ describe("Waku Store (Autosharding), custom pubsub topic", function () { customContentTopic2, clusterId ); - const customDecoder1 = createDecoder(customContentTopic1, { clusterId: 5 }); - const customDecoder2 = createDecoder(customContentTopic2, { clusterId: 5 }); - const contentTopicInfoBothShards: AutoSharding = { + const customDecoder1 = createDecoder( + customContentTopic1, + pubsubTopicToSingleShardInfo(autoshardingPubsubTopic1) + ); + const customDecoder2 = createDecoder( + customContentTopic2, + pubsubTopicToSingleShardInfo(autoshardingPubsubTopic2) + ); + const contentTopicInfoBothShards: ContentTopicInfo = { clusterId, contentTopics: [customContentTopic1, customContentTopic2] }; beforeEachCustom(this, async () => { - [serviceNodes, waku] = await runMultipleNodes( - this.ctx, - contentTopicInfoBothShards, - { store: true } - ); + [nwaku, waku] = await runStoreNodes(this.ctx, contentTopicInfoBothShards); }); afterEachCustom(this, async () => { - await teardownNodesWithRedundancy(serviceNodes, waku); + await tearDownNodes([nwaku, nwaku2], waku); }); it("Generator, custom pubsub topic", async function () { - for (let i = 0; i < totalMsgs; i++) { - await serviceNodes.sendRelayMessage( - ServiceNode.toMessageRpcQuery({ - payload: new Uint8Array([0]), - contentTopic: customContentTopic1 - }), - autoshardingPubsubTopic1 - ); - } + await sendMessagesAutosharding(nwaku, totalMsgs, customContentTopic1); const messages = await processQueriedMessages( waku, @@ -214,22 +211,8 @@ describe("Waku Store (Autosharding), custom pubsub topic", function () { this.timeout(10000); const totalMsgs = 10; - for (let i = 0; i < totalMsgs; i++) { - await serviceNodes.sendRelayMessage( - ServiceNode.toMessageRpcQuery({ - payload: new Uint8Array([i]), - contentTopic: customContentTopic1 - }), - autoshardingPubsubTopic1 - ); - await serviceNodes.sendRelayMessage( - ServiceNode.toMessageRpcQuery({ - payload: new Uint8Array([i]), - contentTopic: customContentTopic2 - }), - autoshardingPubsubTopic2 - ); - } + await sendMessagesAutosharding(nwaku, totalMsgs, customContentTopic1); + await sendMessagesAutosharding(nwaku, totalMsgs, customContentTopic2); const customMessages = await processQueriedMessages( waku, @@ -253,6 +236,54 @@ describe("Waku Store (Autosharding), custom pubsub topic", function () { }); expect(result2).to.not.eq(-1); }); + + it("Generator, 2 nwaku nodes each with different pubsubtopics", async function () { + this.timeout(10000); + + // Set up and start a new nwaku node with Default Pubsubtopic + nwaku2 = new ServiceNode(makeLogFileName(this) + "2"); + await nwaku2.start({ + store: true, + pubsubTopic: [autoshardingPubsubTopic2], + contentTopic: [customContentTopic2], + relay: true, + clusterId + }); + await nwaku2.ensureSubscriptionsAutosharding([customContentTopic2]); + + const totalMsgs = 10; + await sendMessagesAutosharding(nwaku, totalMsgs, customContentTopic1); + await sendMessagesAutosharding(nwaku2, totalMsgs, customContentTopic2); + + waku = await createLightNode({ + staticNoiseKey: NOISE_KEY_1, + networkConfig: contentTopicInfoBothShards + }); + await waku.start(); + + await waku.dial(await nwaku.getMultiaddrWithId()); + await waku.dial(await nwaku2.getMultiaddrWithId()); + await waku.waitForPeers([Protocols.Store]); + + let customMessages: IMessage[] = []; + let testMessages: IMessage[] = []; + + while ( + customMessages.length != totalMsgs || + testMessages.length != totalMsgs + ) { + customMessages = await processQueriedMessages( + waku, + [customDecoder1], + autoshardingPubsubTopic1 + ); + testMessages = await processQueriedMessages( + waku, + [customDecoder2], + autoshardingPubsubTopic2 + ); + } + }); }); describe("Waku Store (named sharding), custom pubsub topic", function () {