chore: add throughput reliability tests (#2444)

* chore: add throughput reliability tests

* chore: add capability to run all
This commit is contained in:
fbarbu15 2025-07-14 11:38:42 +03:00 committed by GitHub
parent f75634d9c5
commit 5d8cfff7eb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 422 additions and 6 deletions

View File

@ -2,20 +2,33 @@ name: Run Reliability Test
on:
workflow_dispatch:
push:
branches:
- "chore/longevity-tests"
inputs:
test_type:
description: 'Type of reliability test to run'
required: true
default: 'longevity'
type: choice
options:
- longevity
- high-throughput
- throughput-sizes
- all
env:
NODE_JS: "22"
jobs:
node:
test:
runs-on: ubuntu-latest
permissions:
contents: read
actions: read
checks: write
strategy:
matrix:
test_type: [longevity, high-throughput, throughput-sizes]
fail-fast: false
if: ${{ github.event.inputs.test_type == 'all' }}
steps:
- uses: actions/checkout@v3
with:
@ -34,4 +47,45 @@ jobs:
- name: Run tests
timeout-minutes: 150
run: npm run test:longevity
run: |
if [ "${{ matrix.test_type }}" = "high-throughput" ]; then
npm run test:high-throughput
elif [ "${{ matrix.test_type }}" = "throughput-sizes" ]; then
npm run test:throughput-sizes
else
npm run test:longevity
fi
single-test:
runs-on: ubuntu-latest
permissions:
contents: read
actions: read
checks: write
if: ${{ github.event.inputs.test_type != 'all' }}
steps:
- uses: actions/checkout@v3
with:
repository: waku-org/js-waku
- name: Remove unwanted software
uses: ./.github/actions/prune-vm
- uses: actions/setup-node@v3
with:
node-version: ${{ env.NODE_JS }}
- uses: ./.github/actions/npm
- run: npm run build:esm
- name: Run tests
timeout-minutes: 150
run: |
if [ "${{ github.event.inputs.test_type }}" = "high-throughput" ]; then
npm run test:high-throughput
elif [ "${{ github.event.inputs.test_type }}" = "throughput-sizes" ]; then
npm run test:throughput-sizes
else
npm run test:longevity
fi

View File

@ -34,6 +34,8 @@
"test:browser": "NODE_ENV=test npm run test:browser --workspaces --if-present",
"test:node": "NODE_ENV=test npm run test:node --workspaces --if-present",
"test:longevity": "npm --prefix packages/reliability-tests run test:longevity",
"test:high-throughput": "npm --prefix packages/reliability-tests run test:high-throughput",
"test:throughput-sizes": "npm --prefix packages/reliability-tests run test:throughput-sizes",
"proto": "npm run proto --workspaces --if-present",
"deploy": "node ci/deploy.js",
"doc": "run-s doc:*",

View File

@ -42,6 +42,8 @@
"check:spelling": "cspell \"{README.md,{tests,src}/**/*.ts}\"",
"check:tsc": "tsc -p tsconfig.dev.json",
"test:longevity": "NODE_ENV=test node ./src/run-tests.js tests/longevity.spec.ts",
"test:high-throughput": "NODE_ENV=test node ./src/run-tests.js tests/high-throughput.spec.ts",
"test:throughput-sizes": "NODE_ENV=test node ./src/run-tests.js tests/throughput-sizes.spec.ts",
"reset-hard": "git clean -dfx -e .idea && git reset --hard && npm i && npm run build"
},
"engines": {

View File

@ -0,0 +1,165 @@
import { LightNode, Protocols } from "@waku/interfaces";
import {
createDecoder,
createEncoder,
createLightNode,
utf8ToBytes
} from "@waku/sdk";
import {
delay,
shardInfoToPubsubTopics,
singleShardInfosToShardInfo,
singleShardInfoToPubsubTopic
} from "@waku/utils";
import { expect } from "chai";
import {
afterEachCustom,
beforeEachCustom,
makeLogFileName,
MessageCollector,
ServiceNode,
tearDownNodes
} from "../../tests/src/index.js";
const ContentTopic = "/waku/2/content/test.high-throughput.js";
describe("High Throughput Messaging", function () {
const testDurationMs = 20 * 60 * 1000; // 20 minutes
this.timeout(testDurationMs * 1.1);
let waku: LightNode;
let nwaku: ServiceNode;
let messageCollector: MessageCollector;
beforeEachCustom(this, async () => {
nwaku = new ServiceNode(makeLogFileName(this.ctx));
messageCollector = new MessageCollector(nwaku);
});
afterEachCustom(this, async () => {
await tearDownNodes(nwaku, waku);
});
it("Send/Receive thousands of messages quickly", async function () {
const singleShardInfo = { clusterId: 0, shard: 0 };
const shardInfo = singleShardInfosToShardInfo([singleShardInfo]);
const testStart = new Date();
const testEnd = Date.now() + testDurationMs;
const report: {
messageId: number;
timestamp: string;
sent: boolean;
received: boolean;
error?: string;
}[] = [];
await nwaku.start(
{
store: true,
filter: true,
relay: true,
clusterId: 0,
shard: [0],
contentTopic: [ContentTopic]
},
{ retries: 3 }
);
await delay(1000);
await nwaku.ensureSubscriptions(shardInfoToPubsubTopics(shardInfo));
waku = await createLightNode({ networkConfig: shardInfo });
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waku.waitForPeers([Protocols.Filter]);
const decoder = createDecoder(ContentTopic, singleShardInfo);
const hasSubscribed = await waku.filter.subscribe(
[decoder],
messageCollector.callback
);
if (!hasSubscribed) throw new Error("Failed to subscribe from the start.");
const encoder = createEncoder({
contentTopic: ContentTopic,
pubsubTopicShardInfo: singleShardInfo
});
expect(encoder.pubsubTopic).to.eq(
singleShardInfoToPubsubTopic(singleShardInfo)
);
let messageId = 0;
// Send messages as fast as possible until testEnd
while (Date.now() < testEnd) {
const now = new Date();
const message = `msg-${messageId}`;
let sent = false;
let received = false;
let err: string | undefined;
try {
await nwaku.sendMessage(
ServiceNode.toMessageRpcQuery({
contentTopic: ContentTopic,
payload: utf8ToBytes(message)
})
);
sent = true;
received = await messageCollector.waitForMessages(1, {
timeoutDuration: 2000
});
if (received) {
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: message,
expectedContentTopic: ContentTopic,
expectedPubsubTopic: shardInfoToPubsubTopics(shardInfo)[0]
});
}
} catch (e: any) {
err = e.message || String(e);
}
report.push({
messageId,
timestamp: now.toISOString(),
sent,
received,
error: err
});
messageId++;
messageCollector.list = []; // clearing the message collector
}
const failedMessages = report.filter(
(m) => !m.sent || !m.received || m.error
);
console.log("\n=== High Throughput Test Summary ===");
console.log("Start time:", testStart.toISOString());
console.log("End time:", new Date().toISOString());
console.log("Total messages:", report.length);
console.log("Failures:", failedMessages.length);
if (failedMessages.length > 0) {
console.log("\n--- Failed Messages ---");
for (const fail of failedMessages) {
console.log(
`#${fail.messageId} @ ${fail.timestamp} | sent: ${fail.sent} | received: ${fail.received} | error: ${fail.error || "N/A"}`
);
}
}
expect(
failedMessages.length,
`Some messages failed: ${failedMessages.length}`
).to.eq(0);
});
});

View File

@ -26,7 +26,7 @@ const ContentTopic = "/waku/2/content/test.js";
describe("Longevity", function () {
const testDurationMs = 2 * 60 * 60 * 1000; // 2 hours
this.timeout(testDurationMs + 5 * 60 * 1000);
this.timeout(testDurationMs * 1.1);
let waku: LightNode;
let nwaku: ServiceNode;
let messageCollector: MessageCollector;

View File

@ -0,0 +1,193 @@
import { LightNode, Protocols } from "@waku/interfaces";
import {
createDecoder,
createEncoder,
createLightNode,
utf8ToBytes
} from "@waku/sdk";
import {
delay,
shardInfoToPubsubTopics,
singleShardInfosToShardInfo,
singleShardInfoToPubsubTopic
} from "@waku/utils";
import { expect } from "chai";
import {
afterEachCustom,
beforeEachCustom,
makeLogFileName,
MessageCollector,
ServiceNode,
tearDownNodes
} from "../../tests/src/index.js";
const ContentTopic = "/waku/2/content/test.throughput-sizes.js";
function generateRandomString(size: number): string {
const chars =
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
let result = "";
for (let i = 0; i < size; i++) {
result += chars.charAt(Math.floor(Math.random() * chars.length));
}
return result;
}
describe("Throughput Sanity Checks - Different Message Sizes", function () {
const testDurationMs = 20 * 60 * 1000; // 20 minute
this.timeout(testDurationMs * 1.1);
let waku: LightNode;
let nwaku: ServiceNode;
let messageCollector: MessageCollector;
beforeEachCustom(this, async () => {
nwaku = new ServiceNode(makeLogFileName(this.ctx));
messageCollector = new MessageCollector(nwaku);
});
afterEachCustom(this, async () => {
await tearDownNodes(nwaku, waku);
});
it("Send/Receive messages of varying sizes", async function () {
const singleShardInfo = { clusterId: 0, shard: 0 };
const shardInfo = singleShardInfosToShardInfo([singleShardInfo]);
const testStart = new Date();
const testEnd = Date.now() + testDurationMs;
const sizes = [10, 100, 1000, 10_000, 100_000]; // bytes
await nwaku.start(
{
store: true,
filter: true,
relay: true,
clusterId: 0,
shard: [0],
contentTopic: [ContentTopic]
},
{ retries: 3 }
);
await delay(1000);
await nwaku.ensureSubscriptions(shardInfoToPubsubTopics(shardInfo));
waku = await createLightNode({ networkConfig: shardInfo });
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waku.waitForPeers([Protocols.Filter]);
const decoder = createDecoder(ContentTopic, singleShardInfo);
const hasSubscribed = await waku.filter.subscribe(
[decoder],
messageCollector.callback
);
if (!hasSubscribed) throw new Error("Failed to subscribe from the start.");
const encoder = createEncoder({
contentTopic: ContentTopic,
pubsubTopicShardInfo: singleShardInfo
});
expect(encoder.pubsubTopic).to.eq(
singleShardInfoToPubsubTopic(singleShardInfo)
);
let messageId = 0;
const report: {
messageId: number;
size: number;
timestamp: string;
sent: boolean;
received: boolean;
error?: string;
}[] = [];
while (Date.now() < testEnd) {
const now = new Date();
// Pick a random size from sizes array
const size = sizes[Math.floor(Math.random() * sizes.length)];
const message = generateRandomString(size);
let sent = false;
let received = false;
let err: string | undefined;
try {
await nwaku.sendMessage(
ServiceNode.toMessageRpcQuery({
contentTopic: ContentTopic,
payload: utf8ToBytes(message)
})
);
sent = true;
received = await messageCollector.waitForMessages(1, {
timeoutDuration: 3000
});
if (received) {
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: message,
expectedContentTopic: ContentTopic,
expectedPubsubTopic: shardInfoToPubsubTopics(shardInfo)[0]
});
}
} catch (e: any) {
err = e.message || String(e);
}
report.push({
messageId,
size,
timestamp: now.toISOString(),
sent,
received,
error: err
});
messageId++;
messageCollector.list = []; // clearing the message collector
await delay(400);
}
const failedMessages = report.filter(
(m) => !m.sent || !m.received || m.error
);
console.log("\n=== Throughput Sizes Test Summary ===");
console.log("Start time:", testStart.toISOString());
console.log("End time:", new Date().toISOString());
console.log("Total messages:", report.length);
console.log("Failures:", failedMessages.length);
// Additional size info
const sizeCounts: Record<number, number> = {};
for (const entry of report) {
sizeCounts[entry.size] = (sizeCounts[entry.size] || 0) + 1;
}
console.log("\nMessage size distribution:");
for (const size of Object.keys(sizeCounts).sort(
(a, b) => Number(a) - Number(b)
)) {
console.log(`Size ${size} bytes: ${sizeCounts[Number(size)]} messages`);
}
if (failedMessages.length > 0) {
console.log("\n--- Failed Messages ---");
for (const fail of failedMessages) {
console.log(
`#${fail.messageId} (size: ${fail.size} bytes) @ ${fail.timestamp} | sent: ${fail.sent} | received: ${fail.received} | error: ${fail.error || "N/A"}`
);
}
}
expect(
failedMessages.length,
`Some messages failed: ${failedMessages.length}`
).to.eq(0);
});
});