From 5d8cfff7ebc9c705423e5f19e5cc67d298c7db7b Mon Sep 17 00:00:00 2001 From: fbarbu15 Date: Mon, 14 Jul 2025 11:38:42 +0300 Subject: [PATCH] chore: add throughput reliability tests (#2444) * chore: add throughput reliability tests * chore: add capability to run all --- .github/workflows/test-reliability.yml | 64 +++++- package.json | 2 + packages/reliability-tests/package.json | 2 + .../tests/high-throughput.spec.ts | 165 +++++++++++++++ .../reliability-tests/tests/longevity.spec.ts | 2 +- .../tests/throughput-sizes.spec.ts | 193 ++++++++++++++++++ 6 files changed, 422 insertions(+), 6 deletions(-) create mode 100644 packages/reliability-tests/tests/high-throughput.spec.ts create mode 100644 packages/reliability-tests/tests/throughput-sizes.spec.ts diff --git a/.github/workflows/test-reliability.yml b/.github/workflows/test-reliability.yml index 8f57fc9480..ba55f6bc41 100644 --- a/.github/workflows/test-reliability.yml +++ b/.github/workflows/test-reliability.yml @@ -2,20 +2,33 @@ name: Run Reliability Test on: workflow_dispatch: - push: - branches: - - "chore/longevity-tests" + inputs: + test_type: + description: 'Type of reliability test to run' + required: true + default: 'longevity' + type: choice + options: + - longevity + - high-throughput + - throughput-sizes + - all env: NODE_JS: "22" jobs: - node: + test: runs-on: ubuntu-latest permissions: contents: read actions: read checks: write + strategy: + matrix: + test_type: [longevity, high-throughput, throughput-sizes] + fail-fast: false + if: ${{ github.event.inputs.test_type == 'all' }} steps: - uses: actions/checkout@v3 with: @@ -34,4 +47,45 @@ jobs: - name: Run tests timeout-minutes: 150 - run: npm run test:longevity + run: | + if [ "${{ matrix.test_type }}" = "high-throughput" ]; then + npm run test:high-throughput + elif [ "${{ matrix.test_type }}" = "throughput-sizes" ]; then + npm run test:throughput-sizes + else + npm run test:longevity + fi + + single-test: + runs-on: ubuntu-latest + permissions: + contents: read + actions: read + checks: write + if: ${{ github.event.inputs.test_type != 'all' }} + steps: + - uses: actions/checkout@v3 + with: + repository: waku-org/js-waku + + - name: Remove unwanted software + uses: ./.github/actions/prune-vm + + - uses: actions/setup-node@v3 + with: + node-version: ${{ env.NODE_JS }} + + - uses: ./.github/actions/npm + + - run: npm run build:esm + + - name: Run tests + timeout-minutes: 150 + run: | + if [ "${{ github.event.inputs.test_type }}" = "high-throughput" ]; then + npm run test:high-throughput + elif [ "${{ github.event.inputs.test_type }}" = "throughput-sizes" ]; then + npm run test:throughput-sizes + else + npm run test:longevity + fi diff --git a/package.json b/package.json index cc52f4062c..6ad5fcf0bb 100644 --- a/package.json +++ b/package.json @@ -34,6 +34,8 @@ "test:browser": "NODE_ENV=test npm run test:browser --workspaces --if-present", "test:node": "NODE_ENV=test npm run test:node --workspaces --if-present", "test:longevity": "npm --prefix packages/reliability-tests run test:longevity", + "test:high-throughput": "npm --prefix packages/reliability-tests run test:high-throughput", + "test:throughput-sizes": "npm --prefix packages/reliability-tests run test:throughput-sizes", "proto": "npm run proto --workspaces --if-present", "deploy": "node ci/deploy.js", "doc": "run-s doc:*", diff --git a/packages/reliability-tests/package.json b/packages/reliability-tests/package.json index 86bc7e095c..f7acb8bbd9 100644 --- a/packages/reliability-tests/package.json +++ b/packages/reliability-tests/package.json @@ -42,6 +42,8 @@ "check:spelling": "cspell \"{README.md,{tests,src}/**/*.ts}\"", "check:tsc": "tsc -p tsconfig.dev.json", "test:longevity": "NODE_ENV=test node ./src/run-tests.js tests/longevity.spec.ts", + "test:high-throughput": "NODE_ENV=test node ./src/run-tests.js tests/high-throughput.spec.ts", + "test:throughput-sizes": "NODE_ENV=test node ./src/run-tests.js tests/throughput-sizes.spec.ts", "reset-hard": "git clean -dfx -e .idea && git reset --hard && npm i && npm run build" }, "engines": { diff --git a/packages/reliability-tests/tests/high-throughput.spec.ts b/packages/reliability-tests/tests/high-throughput.spec.ts new file mode 100644 index 0000000000..8414aab53d --- /dev/null +++ b/packages/reliability-tests/tests/high-throughput.spec.ts @@ -0,0 +1,165 @@ +import { LightNode, Protocols } from "@waku/interfaces"; +import { + createDecoder, + createEncoder, + createLightNode, + utf8ToBytes +} from "@waku/sdk"; +import { + delay, + shardInfoToPubsubTopics, + singleShardInfosToShardInfo, + singleShardInfoToPubsubTopic +} from "@waku/utils"; +import { expect } from "chai"; + +import { + afterEachCustom, + beforeEachCustom, + makeLogFileName, + MessageCollector, + ServiceNode, + tearDownNodes +} from "../../tests/src/index.js"; + +const ContentTopic = "/waku/2/content/test.high-throughput.js"; + +describe("High Throughput Messaging", function () { + const testDurationMs = 20 * 60 * 1000; // 20 minutes + this.timeout(testDurationMs * 1.1); + let waku: LightNode; + let nwaku: ServiceNode; + let messageCollector: MessageCollector; + + beforeEachCustom(this, async () => { + nwaku = new ServiceNode(makeLogFileName(this.ctx)); + messageCollector = new MessageCollector(nwaku); + }); + + afterEachCustom(this, async () => { + await tearDownNodes(nwaku, waku); + }); + + it("Send/Receive thousands of messages quickly", async function () { + const singleShardInfo = { clusterId: 0, shard: 0 }; + const shardInfo = singleShardInfosToShardInfo([singleShardInfo]); + + const testStart = new Date(); + const testEnd = Date.now() + testDurationMs; + + const report: { + messageId: number; + timestamp: string; + sent: boolean; + received: boolean; + error?: string; + }[] = []; + + await nwaku.start( + { + store: true, + filter: true, + relay: true, + clusterId: 0, + shard: [0], + contentTopic: [ContentTopic] + }, + { retries: 3 } + ); + + await delay(1000); + + await nwaku.ensureSubscriptions(shardInfoToPubsubTopics(shardInfo)); + + waku = await createLightNode({ networkConfig: shardInfo }); + await waku.start(); + await waku.dial(await nwaku.getMultiaddrWithId()); + await waku.waitForPeers([Protocols.Filter]); + + const decoder = createDecoder(ContentTopic, singleShardInfo); + const hasSubscribed = await waku.filter.subscribe( + [decoder], + messageCollector.callback + ); + if (!hasSubscribed) throw new Error("Failed to subscribe from the start."); + + const encoder = createEncoder({ + contentTopic: ContentTopic, + pubsubTopicShardInfo: singleShardInfo + }); + + expect(encoder.pubsubTopic).to.eq( + singleShardInfoToPubsubTopic(singleShardInfo) + ); + + let messageId = 0; + + // Send messages as fast as possible until testEnd + while (Date.now() < testEnd) { + const now = new Date(); + const message = `msg-${messageId}`; + let sent = false; + let received = false; + let err: string | undefined; + + try { + await nwaku.sendMessage( + ServiceNode.toMessageRpcQuery({ + contentTopic: ContentTopic, + payload: utf8ToBytes(message) + }) + ); + sent = true; + + received = await messageCollector.waitForMessages(1, { + timeoutDuration: 2000 + }); + + if (received) { + messageCollector.verifyReceivedMessage(0, { + expectedMessageText: message, + expectedContentTopic: ContentTopic, + expectedPubsubTopic: shardInfoToPubsubTopics(shardInfo)[0] + }); + } + } catch (e: any) { + err = e.message || String(e); + } + + report.push({ + messageId, + timestamp: now.toISOString(), + sent, + received, + error: err + }); + + messageId++; + messageCollector.list = []; // clearing the message collector + } + + const failedMessages = report.filter( + (m) => !m.sent || !m.received || m.error + ); + + console.log("\n=== High Throughput Test Summary ==="); + console.log("Start time:", testStart.toISOString()); + console.log("End time:", new Date().toISOString()); + console.log("Total messages:", report.length); + console.log("Failures:", failedMessages.length); + + if (failedMessages.length > 0) { + console.log("\n--- Failed Messages ---"); + for (const fail of failedMessages) { + console.log( + `#${fail.messageId} @ ${fail.timestamp} | sent: ${fail.sent} | received: ${fail.received} | error: ${fail.error || "N/A"}` + ); + } + } + + expect( + failedMessages.length, + `Some messages failed: ${failedMessages.length}` + ).to.eq(0); + }); +}); diff --git a/packages/reliability-tests/tests/longevity.spec.ts b/packages/reliability-tests/tests/longevity.spec.ts index 4790a9b36a..3abaddcde5 100644 --- a/packages/reliability-tests/tests/longevity.spec.ts +++ b/packages/reliability-tests/tests/longevity.spec.ts @@ -26,7 +26,7 @@ const ContentTopic = "/waku/2/content/test.js"; describe("Longevity", function () { const testDurationMs = 2 * 60 * 60 * 1000; // 2 hours - this.timeout(testDurationMs + 5 * 60 * 1000); + this.timeout(testDurationMs * 1.1); let waku: LightNode; let nwaku: ServiceNode; let messageCollector: MessageCollector; diff --git a/packages/reliability-tests/tests/throughput-sizes.spec.ts b/packages/reliability-tests/tests/throughput-sizes.spec.ts new file mode 100644 index 0000000000..911f49bc1a --- /dev/null +++ b/packages/reliability-tests/tests/throughput-sizes.spec.ts @@ -0,0 +1,193 @@ +import { LightNode, Protocols } from "@waku/interfaces"; +import { + createDecoder, + createEncoder, + createLightNode, + utf8ToBytes +} from "@waku/sdk"; +import { + delay, + shardInfoToPubsubTopics, + singleShardInfosToShardInfo, + singleShardInfoToPubsubTopic +} from "@waku/utils"; +import { expect } from "chai"; + +import { + afterEachCustom, + beforeEachCustom, + makeLogFileName, + MessageCollector, + ServiceNode, + tearDownNodes +} from "../../tests/src/index.js"; + +const ContentTopic = "/waku/2/content/test.throughput-sizes.js"; + +function generateRandomString(size: number): string { + const chars = + "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"; + + let result = ""; + for (let i = 0; i < size; i++) { + result += chars.charAt(Math.floor(Math.random() * chars.length)); + } + return result; +} + +describe("Throughput Sanity Checks - Different Message Sizes", function () { + const testDurationMs = 20 * 60 * 1000; // 20 minute + this.timeout(testDurationMs * 1.1); + let waku: LightNode; + let nwaku: ServiceNode; + let messageCollector: MessageCollector; + + beforeEachCustom(this, async () => { + nwaku = new ServiceNode(makeLogFileName(this.ctx)); + messageCollector = new MessageCollector(nwaku); + }); + + afterEachCustom(this, async () => { + await tearDownNodes(nwaku, waku); + }); + + it("Send/Receive messages of varying sizes", async function () { + const singleShardInfo = { clusterId: 0, shard: 0 }; + const shardInfo = singleShardInfosToShardInfo([singleShardInfo]); + + const testStart = new Date(); + const testEnd = Date.now() + testDurationMs; + + const sizes = [10, 100, 1000, 10_000, 100_000]; // bytes + + await nwaku.start( + { + store: true, + filter: true, + relay: true, + clusterId: 0, + shard: [0], + contentTopic: [ContentTopic] + }, + { retries: 3 } + ); + + await delay(1000); + + await nwaku.ensureSubscriptions(shardInfoToPubsubTopics(shardInfo)); + + waku = await createLightNode({ networkConfig: shardInfo }); + await waku.start(); + await waku.dial(await nwaku.getMultiaddrWithId()); + await waku.waitForPeers([Protocols.Filter]); + + const decoder = createDecoder(ContentTopic, singleShardInfo); + const hasSubscribed = await waku.filter.subscribe( + [decoder], + messageCollector.callback + ); + if (!hasSubscribed) throw new Error("Failed to subscribe from the start."); + + const encoder = createEncoder({ + contentTopic: ContentTopic, + pubsubTopicShardInfo: singleShardInfo + }); + + expect(encoder.pubsubTopic).to.eq( + singleShardInfoToPubsubTopic(singleShardInfo) + ); + + let messageId = 0; + const report: { + messageId: number; + size: number; + timestamp: string; + sent: boolean; + received: boolean; + error?: string; + }[] = []; + + while (Date.now() < testEnd) { + const now = new Date(); + // Pick a random size from sizes array + const size = sizes[Math.floor(Math.random() * sizes.length)]; + const message = generateRandomString(size); + let sent = false; + let received = false; + let err: string | undefined; + + try { + await nwaku.sendMessage( + ServiceNode.toMessageRpcQuery({ + contentTopic: ContentTopic, + payload: utf8ToBytes(message) + }) + ); + sent = true; + + received = await messageCollector.waitForMessages(1, { + timeoutDuration: 3000 + }); + + if (received) { + messageCollector.verifyReceivedMessage(0, { + expectedMessageText: message, + expectedContentTopic: ContentTopic, + expectedPubsubTopic: shardInfoToPubsubTopics(shardInfo)[0] + }); + } + } catch (e: any) { + err = e.message || String(e); + } + + report.push({ + messageId, + size, + timestamp: now.toISOString(), + sent, + received, + error: err + }); + + messageId++; + messageCollector.list = []; // clearing the message collector + await delay(400); + } + + const failedMessages = report.filter( + (m) => !m.sent || !m.received || m.error + ); + + console.log("\n=== Throughput Sizes Test Summary ==="); + console.log("Start time:", testStart.toISOString()); + console.log("End time:", new Date().toISOString()); + console.log("Total messages:", report.length); + console.log("Failures:", failedMessages.length); + + // Additional size info + const sizeCounts: Record = {}; + for (const entry of report) { + sizeCounts[entry.size] = (sizeCounts[entry.size] || 0) + 1; + } + console.log("\nMessage size distribution:"); + for (const size of Object.keys(sizeCounts).sort( + (a, b) => Number(a) - Number(b) + )) { + console.log(`Size ${size} bytes: ${sizeCounts[Number(size)]} messages`); + } + + if (failedMessages.length > 0) { + console.log("\n--- Failed Messages ---"); + for (const fail of failedMessages) { + console.log( + `#${fail.messageId} (size: ${fail.size} bytes) @ ${fail.timestamp} | sent: ${fail.sent} | received: ${fail.received} | error: ${fail.error || "N/A"}` + ); + } + } + + expect( + failedMessages.length, + `Some messages failed: ${failedMessages.length}` + ).to.eq(0); + }); +});