From c1f9471cd7eb9be2b2ff1ada06ae9178040665ed Mon Sep 17 00:00:00 2001 From: fbarbu15 Date: Tue, 29 Jul 2025 10:17:43 +0300 Subject: [PATCH] chore: extra reliability tests 2 (#2450) * chore: add throughput reliability tests * chore: add capability to run all * chore: add network-latency reliability tests * chore: add network-latency reliability tests * chore: add other network reliability tests * chore: add other network reliability tests * chore: fix tc cleanup * chore: refactor common code * chore: refactor common code * chore: refactor common code * chore: refactor common code * chore: refactor common code * chore: refactor common code * chore: refactor common code * chore: fix * chore: fix tests * chore: fix tests --- .github/workflows/test-reliability.yml | 17 +- package.json | 3 + packages/reliability-tests/package.json | 3 + .../tests/high-throughput.spec.ts | 160 +---------- .../reliability-tests/tests/longevity.spec.ts | 160 +---------- .../tests/low-bandwidth.spec.ts | 26 ++ .../tests/network-latency.spec.ts | 26 ++ .../tests/packet-loss.spec.ts | 24 ++ .../tests/sharedTestUtils.ts | 271 ++++++++++++++++++ .../tests/throughput-sizes.spec.ts | 197 ++----------- 10 files changed, 408 insertions(+), 479 deletions(-) create mode 100644 packages/reliability-tests/tests/low-bandwidth.spec.ts create mode 100644 packages/reliability-tests/tests/network-latency.spec.ts create mode 100644 packages/reliability-tests/tests/packet-loss.spec.ts create mode 100644 packages/reliability-tests/tests/sharedTestUtils.ts diff --git a/.github/workflows/test-reliability.yml b/.github/workflows/test-reliability.yml index ba55f6bc41..a25f5a6be3 100644 --- a/.github/workflows/test-reliability.yml +++ b/.github/workflows/test-reliability.yml @@ -12,6 +12,9 @@ on: - longevity - high-throughput - throughput-sizes + - network-latency + - low-bandwidth + - packet-loss - all env: @@ -26,7 +29,7 @@ jobs: checks: write strategy: matrix: - test_type: [longevity, high-throughput, throughput-sizes] + test_type: [longevity, high-throughput, throughput-sizes, network-latency, low-bandwidth, packet-loss] fail-fast: false if: ${{ github.event.inputs.test_type == 'all' }} steps: @@ -52,6 +55,12 @@ jobs: npm run test:high-throughput elif [ "${{ matrix.test_type }}" = "throughput-sizes" ]; then npm run test:throughput-sizes + elif [ "${{ matrix.test_type }}" = "network-latency" ]; then + npm run test:network-latency + elif [ "${{ matrix.test_type }}" = "low-bandwidth" ]; then + npm run test:low-bandwidth + elif [ "${{ matrix.test_type }}" = "packet-loss" ]; then + npm run test:packet-loss else npm run test:longevity fi @@ -86,6 +95,12 @@ jobs: npm run test:high-throughput elif [ "${{ github.event.inputs.test_type }}" = "throughput-sizes" ]; then npm run test:throughput-sizes + elif [ "${{ github.event.inputs.test_type }}" = "network-latency" ]; then + npm run test:network-latency + elif [ "${{ github.event.inputs.test_type }}" = "low-bandwidth" ]; then + npm run test:low-bandwidth + elif [ "${{ github.event.inputs.test_type }}" = "packet-loss" ]; then + npm run test:packet-loss else npm run test:longevity fi diff --git a/package.json b/package.json index 6ad5fcf0bb..53baf7ee6b 100644 --- a/package.json +++ b/package.json @@ -36,6 +36,9 @@ "test:longevity": "npm --prefix packages/reliability-tests run test:longevity", "test:high-throughput": "npm --prefix packages/reliability-tests run test:high-throughput", "test:throughput-sizes": "npm --prefix packages/reliability-tests run test:throughput-sizes", + "test:network-latency": "npm --prefix packages/reliability-tests run test:network-latency", + "test:low-bandwidth": "npm --prefix packages/reliability-tests run test:low-bandwidth", + "test:packet-loss": "npm --prefix packages/reliability-tests run test:packet-loss", "proto": "npm run proto --workspaces --if-present", "deploy": "node ci/deploy.js", "doc": "run-s doc:*", diff --git a/packages/reliability-tests/package.json b/packages/reliability-tests/package.json index f7acb8bbd9..97dfac7ba1 100644 --- a/packages/reliability-tests/package.json +++ b/packages/reliability-tests/package.json @@ -44,6 +44,9 @@ "test:longevity": "NODE_ENV=test node ./src/run-tests.js tests/longevity.spec.ts", "test:high-throughput": "NODE_ENV=test node ./src/run-tests.js tests/high-throughput.spec.ts", "test:throughput-sizes": "NODE_ENV=test node ./src/run-tests.js tests/throughput-sizes.spec.ts", + "test:network-latency": "NODE_ENV=test node ./src/run-tests.js tests/network-latency.spec.ts", + "test:low-bandwidth": "NODE_ENV=test node ./src/run-tests.js tests/low-bandwidth.spec.ts", + "test:packet-loss": "NODE_ENV=test node ./src/run-tests.js tests/packet-loss.spec.ts", "reset-hard": "git clean -dfx -e .idea && git reset --hard && npm i && npm run build" }, "engines": { diff --git a/packages/reliability-tests/tests/high-throughput.spec.ts b/packages/reliability-tests/tests/high-throughput.spec.ts index 7a11e9caf9..f27b6ebfcd 100644 --- a/packages/reliability-tests/tests/high-throughput.spec.ts +++ b/packages/reliability-tests/tests/high-throughput.spec.ts @@ -1,158 +1,16 @@ -import { LightNode, Protocols } from "@waku/interfaces"; -import { createDecoder, createLightNode, utf8ToBytes } from "@waku/sdk"; -import { - contentTopicToPubsubTopic, - createRoutingInfo, - delay -} from "@waku/utils"; -import { expect } from "chai"; - -import { - afterEachCustom, - beforeEachCustom, - makeLogFileName, - MessageCollector, - ServiceNode, - tearDownNodes -} from "../../tests/src/index.js"; - -const ContentTopic = "/waku/2/content/test.high-throughput.js"; -const NetworkConfig = { clusterId: 0, numShardsInCluster: 8 }; -const RoutingInfo = createRoutingInfo(NetworkConfig, { - contentTopic: ContentTopic -}); +import { runTest, setupTest } from "./sharedTestUtils.js"; describe("High Throughput Messaging", function () { const testDurationMs = 20 * 60 * 1000; // 20 minutes - this.timeout(testDurationMs * 1.1); - let waku: LightNode; - let nwaku: ServiceNode; - let messageCollector: MessageCollector; + const testContext = {}; - beforeEachCustom(this, async () => { - nwaku = new ServiceNode(makeLogFileName(this.ctx)); - messageCollector = new MessageCollector(nwaku); - }); + setupTest(this, testContext); - afterEachCustom(this, async () => { - await tearDownNodes(nwaku, waku); - }); - - it("Send/Receive thousands of messages quickly", async function () { - const testStart = new Date(); - const testEnd = Date.now() + testDurationMs; - - const report: { - messageId: number; - timestamp: string; - sent: boolean; - received: boolean; - error?: string; - }[] = []; - - await nwaku.start( - { - store: true, - filter: true, - relay: true, - clusterId: NetworkConfig.clusterId, - numShardsInNetwork: NetworkConfig.numShardsInCluster, - contentTopic: [ContentTopic] - }, - { retries: 3 } - ); - - await delay(1000); - - await nwaku.ensureSubscriptions([ - contentTopicToPubsubTopic( - ContentTopic, - NetworkConfig.clusterId, - NetworkConfig.numShardsInCluster - ) - ]); - - waku = await createLightNode({ networkConfig: NetworkConfig }); - await waku.start(); - await waku.dial(await nwaku.getMultiaddrWithId()); - await waku.waitForPeers([Protocols.Filter]); - - const decoder = createDecoder(ContentTopic, RoutingInfo); - const hasSubscribed = await waku.filter.subscribe( - [decoder], - messageCollector.callback - ); - if (!hasSubscribed) throw new Error("Failed to subscribe from the start."); - - let messageId = 0; - - // Send messages as fast as possible until testEnd - while (Date.now() < testEnd) { - const now = new Date(); - const message = `msg-${messageId}`; - let sent = false; - let received = false; - let err: string | undefined; - - try { - await nwaku.sendMessage( - ServiceNode.toMessageRpcQuery({ - contentTopic: ContentTopic, - payload: utf8ToBytes(message) - }), - RoutingInfo - ); - sent = true; - - received = await messageCollector.waitForMessages(1, { - timeoutDuration: 2000 - }); - - if (received) { - messageCollector.verifyReceivedMessage(0, { - expectedMessageText: message, - expectedContentTopic: ContentTopic, - expectedPubsubTopic: RoutingInfo.pubsubTopic - }); - } - } catch (e: any) { - err = e.message || String(e); - } - - report.push({ - messageId, - timestamp: now.toISOString(), - sent, - received, - error: err - }); - - messageId++; - messageCollector.list = []; // clearing the message collector - } - - const failedMessages = report.filter( - (m) => !m.sent || !m.received || m.error - ); - - console.log("\n=== High Throughput Test Summary ==="); - console.log("Start time:", testStart.toISOString()); - console.log("End time:", new Date().toISOString()); - console.log("Total messages:", report.length); - console.log("Failures:", failedMessages.length); - - if (failedMessages.length > 0) { - console.log("\n--- Failed Messages ---"); - for (const fail of failedMessages) { - console.log( - `#${fail.messageId} @ ${fail.timestamp} | sent: ${fail.sent} | received: ${fail.received} | error: ${fail.error || "N/A"}` - ); - } - } - - expect( - failedMessages.length, - `Some messages failed: ${failedMessages.length}` - ).to.eq(0); + runTest({ + testContext: testContext, + testDurationMs: testDurationMs, + testName: "High Throughput Messaging", + messageGenerator: (messageId: number) => `High-Throughput-${messageId}`, + delayBetweenMessagesMs: 0 }); }); diff --git a/packages/reliability-tests/tests/longevity.spec.ts b/packages/reliability-tests/tests/longevity.spec.ts index e0ec05678f..cb6b9e9b82 100644 --- a/packages/reliability-tests/tests/longevity.spec.ts +++ b/packages/reliability-tests/tests/longevity.spec.ts @@ -1,158 +1,16 @@ -import { LightNode, Protocols } from "@waku/interfaces"; -import { createDecoder, createLightNode, utf8ToBytes } from "@waku/sdk"; -import { - contentTopicToPubsubTopic, - createRoutingInfo, - delay -} from "@waku/utils"; -import { expect } from "chai"; - -import { - afterEachCustom, - beforeEachCustom, - makeLogFileName, - MessageCollector, - ServiceNode, - tearDownNodes -} from "../../tests/src/index.js"; - -const ContentTopic = "/waku/2/content/test.js"; +import { runTest, setupTest } from "./sharedTestUtils.js"; describe("Longevity", function () { const testDurationMs = 2 * 60 * 60 * 1000; // 2 hours - this.timeout(testDurationMs * 1.1); - let waku: LightNode; - let nwaku: ServiceNode; - let messageCollector: MessageCollector; + const testContext = {}; - beforeEachCustom(this, async () => { - nwaku = new ServiceNode(makeLogFileName(this.ctx)); - messageCollector = new MessageCollector(nwaku); - }); + setupTest(this, testContext); - afterEachCustom(this, async () => { - await tearDownNodes(nwaku, waku); - }); - - it("Filter - 2 hours", async function () { - const networkConfig = { clusterId: 0, numShardsInCluster: 8 }; - - const testStart = new Date(); - - const testEnd = Date.now() + testDurationMs; - - const report: { - messageId: number; - timestamp: string; - sent: boolean; - received: boolean; - error?: string; - }[] = []; - - await nwaku.start( - { - store: true, - filter: true, - relay: true, - clusterId: 0, - shard: [0], - contentTopic: [ContentTopic] - }, - { retries: 3 } - ); - - await nwaku.ensureSubscriptions([ - contentTopicToPubsubTopic( - ContentTopic, - networkConfig.clusterId, - networkConfig.numShardsInCluster - ) - ]); - - waku = await createLightNode({ networkConfig }); - await waku.start(); - await waku.dial(await nwaku.getMultiaddrWithId()); - await waku.waitForPeers([Protocols.Filter]); - - const routingInfo = createRoutingInfo(networkConfig, { - contentTopic: ContentTopic - }); - const decoder = createDecoder(ContentTopic, routingInfo); - const hasSubscribed = await waku.filter.subscribe( - [decoder], - messageCollector.callback - ); - if (!hasSubscribed) throw new Error("Failed to subscribe from the start."); - - let messageId = 0; - - while (Date.now() < testEnd) { - const now = new Date(); - const message = `ping-${messageId}`; - let sent = false; - let received = false; - let err: string | undefined; - - try { - await nwaku.sendMessage( - ServiceNode.toMessageRpcQuery({ - contentTopic: ContentTopic, - payload: utf8ToBytes(message) - }), - routingInfo - ); - sent = true; - - received = await messageCollector.waitForMessages(1, { - timeoutDuration: 5000 - }); - - if (received) { - messageCollector.verifyReceivedMessage(0, { - expectedMessageText: message, - expectedContentTopic: ContentTopic, - expectedPubsubTopic: routingInfo.pubsubTopic - }); - } - } catch (e: any) { - err = e.message || String(e); - } - - report.push({ - messageId, - timestamp: now.toISOString(), - sent, - received, - error: err - }); - - messageId++; - messageCollector.list = []; // clearing the message collector - await delay(400); - } - - const failedMessages = report.filter( - (m) => !m.sent || !m.received || m.error - ); - - console.log("\n=== Longevity Test Summary ==="); - console.log("Start time:", testStart.toISOString()); - console.log("End time:", new Date().toISOString()); - console.log("Total messages:", report.length); - console.log("Failures:", failedMessages.length); - - if (failedMessages.length > 0) { - console.log("\n--- Failed Messages ---"); - for (const fail of failedMessages) { - console.log( - `#${fail.messageId} @ ${fail.timestamp} | sent: ${fail.sent} | received: ${fail.received} | error: ${fail.error || "N/A"}` - ); - } - } - - expect( - failedMessages.length, - `Some messages failed: ${failedMessages.length}` - ).to.eq(0); + runTest({ + testContext: testContext, + testDurationMs: testDurationMs, + testName: "Longevity", + messageGenerator: (messageId: number) => `Longevity-${messageId}`, + delayBetweenMessagesMs: 400 }); }); diff --git a/packages/reliability-tests/tests/low-bandwidth.spec.ts b/packages/reliability-tests/tests/low-bandwidth.spec.ts new file mode 100644 index 0000000000..998e60e3e1 --- /dev/null +++ b/packages/reliability-tests/tests/low-bandwidth.spec.ts @@ -0,0 +1,26 @@ +import { execCommand, runTest, setupTest } from "./sharedTestUtils.js"; + +describe("Low Bandwith Test", function () { + const testDurationMs = 10 * 60 * 1000; // 10 mins + const testContext = {}; + + setupTest(this, testContext); + + beforeEach(async () => { + execCommand( + "sudo tc qdisc add dev eth0 root tbf rate 1mbit burst 32kbit limit 12500" + ); + }); + + afterEach(async () => { + execCommand("sudo tc qdisc del dev eth0 root"); + }); + + runTest({ + testContext: testContext, + testDurationMs: testDurationMs, + testName: "Low Bandwith Test", + messageGenerator: (messageId: number) => `Low-Bandwith-${messageId}`, + delayBetweenMessagesMs: 400 + }); +}); diff --git a/packages/reliability-tests/tests/network-latency.spec.ts b/packages/reliability-tests/tests/network-latency.spec.ts new file mode 100644 index 0000000000..da6396240a --- /dev/null +++ b/packages/reliability-tests/tests/network-latency.spec.ts @@ -0,0 +1,26 @@ +import { execCommand, runTest, setupTest } from "./sharedTestUtils.js"; + +describe("Network Latency and Jitter Test", function () { + const testDurationMs = 10 * 60 * 1000; // 10 mins + const testContext = {}; + + setupTest(this, testContext); + + beforeEach(async () => { + execCommand( + "sudo tc qdisc add dev eth0 root netem delay 300ms 50ms distribution normal" + ); + }); + + afterEach(async () => { + execCommand("sudo tc qdisc del dev eth0 root netem"); + }); + + runTest({ + testContext: testContext, + testDurationMs: testDurationMs, + testName: "Network Latency and Jitter Test", + messageGenerator: (messageId: number) => `Network-Latency-${messageId}`, + delayBetweenMessagesMs: 400 + }); +}); diff --git a/packages/reliability-tests/tests/packet-loss.spec.ts b/packages/reliability-tests/tests/packet-loss.spec.ts new file mode 100644 index 0000000000..eeb8d0bcf7 --- /dev/null +++ b/packages/reliability-tests/tests/packet-loss.spec.ts @@ -0,0 +1,24 @@ +import { execCommand, runTest, setupTest } from "./sharedTestUtils.js"; + +describe("Packet Loss Test", function () { + const testDurationMs = 10 * 60 * 1000; // 10 mins + const testContext = {}; + + setupTest(this, testContext); + + beforeEach(async () => { + execCommand("sudo tc qdisc add dev eth0 root netem loss 2%"); + }); + + afterEach(async () => { + execCommand("sudo tc qdisc del dev eth0 root netem"); + }); + + runTest({ + testContext: testContext, + testDurationMs: testDurationMs, + testName: "Packet Loss Test", + messageGenerator: (messageId: number) => `Packet-Loss-${messageId}`, + delayBetweenMessagesMs: 400 + }); +}); diff --git a/packages/reliability-tests/tests/sharedTestUtils.ts b/packages/reliability-tests/tests/sharedTestUtils.ts new file mode 100644 index 0000000000..b979a79073 --- /dev/null +++ b/packages/reliability-tests/tests/sharedTestUtils.ts @@ -0,0 +1,271 @@ +import { execSync } from "child_process"; + +import { AutoSharding, LightNode, Protocols } from "@waku/interfaces"; +import { createDecoder, createLightNode, utf8ToBytes } from "@waku/sdk"; +import { createRoutingInfo, delay } from "@waku/utils"; +import { expect } from "chai"; + +import { + afterEachCustom, + beforeEachCustom, + makeLogFileName, + MessageCollector, + ServiceNode, + tearDownNodes +} from "../../tests/src/index.js"; + +export interface TestContext { + waku?: LightNode; + nwaku?: ServiceNode; + messageCollector?: MessageCollector; + report?: Array<{ + messageId: number; + size?: number; + timestamp: string; + sent: boolean; + received: boolean; + error?: string; + }>; +} + +/* eslint-disable no-console */ + +export function setupTest(ctx: Mocha.Suite, testContext: TestContext): void { + beforeEachCustom(ctx, async () => { + testContext.nwaku = new ServiceNode(makeLogFileName(ctx.ctx)); + testContext.messageCollector = new MessageCollector(); + }); + + afterEachCustom(ctx, async () => { + if (testContext.nwaku && testContext.waku) { + await tearDownNodes(testContext.nwaku, testContext.waku); + } + }); +} + +export function printSizeDistributionReport( + report: TestContext["report"] +): void { + if (!report) return; + + const sizes = report.map((r) => r.size ?? 0); + const sizesSet = new Set(sizes); + if (sizesSet.size <= 1) return; // Only one size, no need to print distribution + + const buckets = [10, 100, 1000, 10000, 100000]; + const sizeCounts: Record = {}; + for (const entry of report) { + if (entry.size !== undefined) { + // Find closest bucket + let closest = buckets[0]; + let minDiff = Math.abs(entry.size - buckets[0]); + for (const b of buckets) { + const diff = Math.abs(entry.size - b); + if (diff < minDiff) { + minDiff = diff; + closest = b; + } + } + sizeCounts[closest] = (sizeCounts[closest] || 0) + 1; + } + } + console.log("\nMessage size distribution (mapped to fixed buckets):"); + for (const size of buckets) { + console.log(`Size ${size} bytes: ${sizeCounts[size] || 0} messages`); + } +} + +export interface RunTestOptions { + testContext: TestContext; + testDurationMs: number; + testName: string; + messageGenerator?: (messageId: number) => string; + messageTimeoutMs?: number; + delayBetweenMessagesMs?: number; +} + +export function runTest(options: RunTestOptions): void { + const { + testContext, + testDurationMs, + testName, + messageGenerator, + delayBetweenMessagesMs = 400 + } = options; + + describe(testName, function () { + this.timeout(testDurationMs * 1.1); // Timing out if test runs for 10% more than expected. Used to avoid hangs and freezes + + it(testName, async function () { + const clusterId = 2; + const shards = [1]; + const numShardsInCluster = 8; + + const contentTopic = "/waku/2/content/test.js"; + + const testStart = new Date(); + const testEnd = Date.now() + testDurationMs; + + const testNetworkConfig: AutoSharding = { + clusterId: clusterId, + numShardsInCluster: numShardsInCluster + }; + const testRoutingInfo = createRoutingInfo(testNetworkConfig, { + contentTopic: contentTopic + }); + + const report: { + messageId: number; + size?: number; + timestamp: string; + sent: boolean; + received: boolean; + error?: string; + }[] = []; + + await testContext.nwaku!.start( + { + store: true, + filter: true, + relay: true, + clusterId, + shard: shards, + numShardsInNetwork: numShardsInCluster, + contentTopic: [contentTopic] + }, + { retries: 3 } + ); + + await delay(1000); + + await testContext.nwaku!.ensureSubscriptions([ + testRoutingInfo.pubsubTopic + ]); + + testContext.waku = await createLightNode({ + networkConfig: { clusterId, numShardsInCluster } + }); + await testContext.waku.start(); + await testContext.waku.dial( + await testContext.nwaku!.getMultiaddrWithId() + ); + await testContext.waku.waitForPeers([Protocols.Filter]); + + const decoder = createDecoder(contentTopic, testRoutingInfo); + const hasSubscribed = await testContext.waku.filter.subscribe( + [decoder], + testContext.messageCollector!.callback + ); + if (!hasSubscribed) + throw new Error("Failed to subscribe from the start."); + + let messageId = 0; + + console.log("Received messages via filter:"); + + while (Date.now() < testEnd) { + const now = new Date(); + const message = messageGenerator + ? messageGenerator(messageId) + : `ping-${messageId}`; + let sent = false; + let received = false; + let err: string | undefined; + + try { + await testContext.nwaku!.sendMessage( + ServiceNode.toMessageRpcQuery({ + contentTopic, + payload: utf8ToBytes(message) + }), + testRoutingInfo + ); + sent = true; + + received = await testContext.messageCollector!.waitForMessages(1, { + timeoutDuration: 5000 + }); + + if (received) { + testContext.messageCollector!.verifyReceivedMessage(0, { + expectedMessageText: message, + expectedContentTopic: contentTopic, + expectedPubsubTopic: testRoutingInfo.pubsubTopic + }); + } + + console.log( + JSON.stringify(testContext.messageCollector!.getMessage(0)) + ); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + } catch (e: any) { + err = e.message || String(e); + console.log(`Issue/Error/Failure for message: ${String(e)}`); + } + + report.push({ + messageId, + size: message.length, + timestamp: now.toISOString(), + sent, + received, + error: err + }); + + messageId++; + testContext.messageCollector!.list = []; // clearing the message collector + await delay(delayBetweenMessagesMs); + } + + testContext.report = report; + + const failedMessages = report.filter( + (m) => !m.sent || !m.received || m.error + ); + + console.log(`\n=== ${testName} Summary ===`); + console.log("Start time:", testStart.toISOString()); + console.log("End time:", new Date().toISOString()); + console.log("Total messages:", report.length); + console.log("Failures:", failedMessages.length); + + if (failedMessages.length > 0) { + console.log("\n--- Failed Messages ---"); + for (const fail of failedMessages) { + console.log( + `#${fail.messageId} (size: ${fail.size} bytes) @ ${fail.timestamp} | sent: ${fail.sent} | received: ${fail.received} | error: ${fail.error || "N/A"}` + ); + } + } + + expect( + failedMessages.length, + `Some messages failed: ${failedMessages.length}` + ).to.eq(0); + }); + }); +} + +export function generateRandomString(size: number): string { + const chars = + "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"; + + let result = ""; + for (let i = 0; i < size; i++) { + result += chars.charAt(Math.floor(Math.random() * chars.length)); + } + return result; +} + +export function execCommand(command: string): void { + try { + execSync(command); + } catch (e) { + console.warn( + `Failed to execute command "${command}", continuing without it:`, + e + ); + } +} + +/* eslint-enable no-console */ diff --git a/packages/reliability-tests/tests/throughput-sizes.spec.ts b/packages/reliability-tests/tests/throughput-sizes.spec.ts index 7044176223..e88186e189 100644 --- a/packages/reliability-tests/tests/throughput-sizes.spec.ts +++ b/packages/reliability-tests/tests/throughput-sizes.spec.ts @@ -1,187 +1,32 @@ -import { LightNode, Protocols } from "@waku/interfaces"; -import { createDecoder, createLightNode, utf8ToBytes } from "@waku/sdk"; import { - contentTopicToPubsubTopic, - createRoutingInfo, - delay -} from "@waku/utils"; -import { expect } from "chai"; + generateRandomString, + printSizeDistributionReport, + runTest, + setupTest +} from "./sharedTestUtils.js"; -import { - afterEachCustom, - beforeEachCustom, - makeLogFileName, - MessageCollector, - ServiceNode, - tearDownNodes -} from "../../tests/src/index.js"; - -const ContentTopic = "/waku/2/content/test.throughput-sizes.js"; - -function generateRandomString(size: number): string { - const chars = - "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"; - - let result = ""; - for (let i = 0; i < size; i++) { - result += chars.charAt(Math.floor(Math.random() * chars.length)); - } - return result; -} +const sizes = [10, 100, 1000, 10_000, 100_000]; // bytes describe("Throughput Sanity Checks - Different Message Sizes", function () { const testDurationMs = 20 * 60 * 1000; // 20 minute - this.timeout(testDurationMs * 1.1); - let waku: LightNode; - let nwaku: ServiceNode; - let messageCollector: MessageCollector; + const testContext: { report?: any } = {}; - beforeEachCustom(this, async () => { - nwaku = new ServiceNode(makeLogFileName(this.ctx)); - messageCollector = new MessageCollector(nwaku); + setupTest(this, testContext); + + afterEach(async () => { + if (testContext.report) { + printSizeDistributionReport(testContext.report); + } }); - afterEachCustom(this, async () => { - await tearDownNodes(nwaku, waku); - }); - - it("Send/Receive messages of varying sizes", async function () { - const networkConfig = { clusterId: 0, numShardsInCluster: 8 }; - - const testStart = new Date(); - const testEnd = Date.now() + testDurationMs; - - const sizes = [10, 100, 1000, 10_000, 100_000]; // bytes - - await nwaku.start( - { - store: true, - filter: true, - relay: true, - clusterId: 0, - shard: [0], - contentTopic: [ContentTopic] - }, - { retries: 3 } - ); - - await delay(1000); - - await nwaku.ensureSubscriptions([ - contentTopicToPubsubTopic( - ContentTopic, - networkConfig.clusterId, - networkConfig.numShardsInCluster - ) - ]); - - waku = await createLightNode({ networkConfig }); - await waku.start(); - await waku.dial(await nwaku.getMultiaddrWithId()); - await waku.waitForPeers([Protocols.Filter]); - - const routingInfo = createRoutingInfo(networkConfig, { - contentTopic: ContentTopic - }); - const decoder = createDecoder(ContentTopic, routingInfo); - const hasSubscribed = await waku.filter.subscribe( - [decoder], - messageCollector.callback - ); - if (!hasSubscribed) throw new Error("Failed to subscribe from the start."); - - let messageId = 0; - const report: { - messageId: number; - size: number; - timestamp: string; - sent: boolean; - received: boolean; - error?: string; - }[] = []; - - while (Date.now() < testEnd) { - const now = new Date(); - // Pick a random size from sizes array + runTest({ + testContext: testContext, + testDurationMs: testDurationMs, + testName: "Throughput Sanity Checks - Different Message Sizes", + messageGenerator: (_messageId: number) => { const size = sizes[Math.floor(Math.random() * sizes.length)]; - const message = generateRandomString(size); - let sent = false; - let received = false; - let err: string | undefined; - - try { - await nwaku.sendMessage( - ServiceNode.toMessageRpcQuery({ - contentTopic: ContentTopic, - payload: utf8ToBytes(message) - }), - routingInfo - ); - sent = true; - - received = await messageCollector.waitForMessages(1, { - timeoutDuration: 3000 - }); - - if (received) { - messageCollector.verifyReceivedMessage(0, { - expectedMessageText: message, - expectedContentTopic: ContentTopic, - expectedPubsubTopic: routingInfo.pubsubTopic - }); - } - } catch (e: any) { - err = e.message || String(e); - } - - report.push({ - messageId, - size, - timestamp: now.toISOString(), - sent, - received, - error: err - }); - - messageId++; - messageCollector.list = []; // clearing the message collector - await delay(400); - } - - const failedMessages = report.filter( - (m) => !m.sent || !m.received || m.error - ); - - console.log("\n=== Throughput Sizes Test Summary ==="); - console.log("Start time:", testStart.toISOString()); - console.log("End time:", new Date().toISOString()); - console.log("Total messages:", report.length); - console.log("Failures:", failedMessages.length); - - // Additional size info - const sizeCounts: Record = {}; - for (const entry of report) { - sizeCounts[entry.size] = (sizeCounts[entry.size] || 0) + 1; - } - console.log("\nMessage size distribution:"); - for (const size of Object.keys(sizeCounts).sort( - (a, b) => Number(a) - Number(b) - )) { - console.log(`Size ${size} bytes: ${sizeCounts[Number(size)]} messages`); - } - - if (failedMessages.length > 0) { - console.log("\n--- Failed Messages ---"); - for (const fail of failedMessages) { - console.log( - `#${fail.messageId} (size: ${fail.size} bytes) @ ${fail.timestamp} | sent: ${fail.sent} | received: ${fail.received} | error: ${fail.error || "N/A"}` - ); - } - } - - expect( - failedMessages.length, - `Some messages failed: ${failedMessages.length}` - ).to.eq(0); + return generateRandomString(size); + }, + delayBetweenMessagesMs: 400 }); });