mirror of
https://github.com/logos-messaging/js-waku.git
synced 2026-01-02 13:53:12 +00:00
chore: extra reliability tests 2 (#2450)
* chore: add throughput reliability tests * chore: add capability to run all * chore: add network-latency reliability tests * chore: add network-latency reliability tests * chore: add other network reliability tests * chore: add other network reliability tests * chore: fix tc cleanup * chore: refactor common code * chore: refactor common code * chore: refactor common code * chore: refactor common code * chore: refactor common code * chore: refactor common code * chore: refactor common code * chore: fix * chore: fix tests * chore: fix tests
This commit is contained in:
parent
0be4861c79
commit
c1f9471cd7
17
.github/workflows/test-reliability.yml
vendored
17
.github/workflows/test-reliability.yml
vendored
@ -12,6 +12,9 @@ on:
|
||||
- longevity
|
||||
- high-throughput
|
||||
- throughput-sizes
|
||||
- network-latency
|
||||
- low-bandwidth
|
||||
- packet-loss
|
||||
- all
|
||||
|
||||
env:
|
||||
@ -26,7 +29,7 @@ jobs:
|
||||
checks: write
|
||||
strategy:
|
||||
matrix:
|
||||
test_type: [longevity, high-throughput, throughput-sizes]
|
||||
test_type: [longevity, high-throughput, throughput-sizes, network-latency, low-bandwidth, packet-loss]
|
||||
fail-fast: false
|
||||
if: ${{ github.event.inputs.test_type == 'all' }}
|
||||
steps:
|
||||
@ -52,6 +55,12 @@ jobs:
|
||||
npm run test:high-throughput
|
||||
elif [ "${{ matrix.test_type }}" = "throughput-sizes" ]; then
|
||||
npm run test:throughput-sizes
|
||||
elif [ "${{ matrix.test_type }}" = "network-latency" ]; then
|
||||
npm run test:network-latency
|
||||
elif [ "${{ matrix.test_type }}" = "low-bandwidth" ]; then
|
||||
npm run test:low-bandwidth
|
||||
elif [ "${{ matrix.test_type }}" = "packet-loss" ]; then
|
||||
npm run test:packet-loss
|
||||
else
|
||||
npm run test:longevity
|
||||
fi
|
||||
@ -86,6 +95,12 @@ jobs:
|
||||
npm run test:high-throughput
|
||||
elif [ "${{ github.event.inputs.test_type }}" = "throughput-sizes" ]; then
|
||||
npm run test:throughput-sizes
|
||||
elif [ "${{ github.event.inputs.test_type }}" = "network-latency" ]; then
|
||||
npm run test:network-latency
|
||||
elif [ "${{ github.event.inputs.test_type }}" = "low-bandwidth" ]; then
|
||||
npm run test:low-bandwidth
|
||||
elif [ "${{ github.event.inputs.test_type }}" = "packet-loss" ]; then
|
||||
npm run test:packet-loss
|
||||
else
|
||||
npm run test:longevity
|
||||
fi
|
||||
|
||||
@ -36,6 +36,9 @@
|
||||
"test:longevity": "npm --prefix packages/reliability-tests run test:longevity",
|
||||
"test:high-throughput": "npm --prefix packages/reliability-tests run test:high-throughput",
|
||||
"test:throughput-sizes": "npm --prefix packages/reliability-tests run test:throughput-sizes",
|
||||
"test:network-latency": "npm --prefix packages/reliability-tests run test:network-latency",
|
||||
"test:low-bandwidth": "npm --prefix packages/reliability-tests run test:low-bandwidth",
|
||||
"test:packet-loss": "npm --prefix packages/reliability-tests run test:packet-loss",
|
||||
"proto": "npm run proto --workspaces --if-present",
|
||||
"deploy": "node ci/deploy.js",
|
||||
"doc": "run-s doc:*",
|
||||
|
||||
@ -44,6 +44,9 @@
|
||||
"test:longevity": "NODE_ENV=test node ./src/run-tests.js tests/longevity.spec.ts",
|
||||
"test:high-throughput": "NODE_ENV=test node ./src/run-tests.js tests/high-throughput.spec.ts",
|
||||
"test:throughput-sizes": "NODE_ENV=test node ./src/run-tests.js tests/throughput-sizes.spec.ts",
|
||||
"test:network-latency": "NODE_ENV=test node ./src/run-tests.js tests/network-latency.spec.ts",
|
||||
"test:low-bandwidth": "NODE_ENV=test node ./src/run-tests.js tests/low-bandwidth.spec.ts",
|
||||
"test:packet-loss": "NODE_ENV=test node ./src/run-tests.js tests/packet-loss.spec.ts",
|
||||
"reset-hard": "git clean -dfx -e .idea && git reset --hard && npm i && npm run build"
|
||||
},
|
||||
"engines": {
|
||||
|
||||
@ -1,158 +1,16 @@
|
||||
import { LightNode, Protocols } from "@waku/interfaces";
|
||||
import { createDecoder, createLightNode, utf8ToBytes } from "@waku/sdk";
|
||||
import {
|
||||
contentTopicToPubsubTopic,
|
||||
createRoutingInfo,
|
||||
delay
|
||||
} from "@waku/utils";
|
||||
import { expect } from "chai";
|
||||
|
||||
import {
|
||||
afterEachCustom,
|
||||
beforeEachCustom,
|
||||
makeLogFileName,
|
||||
MessageCollector,
|
||||
ServiceNode,
|
||||
tearDownNodes
|
||||
} from "../../tests/src/index.js";
|
||||
|
||||
const ContentTopic = "/waku/2/content/test.high-throughput.js";
|
||||
const NetworkConfig = { clusterId: 0, numShardsInCluster: 8 };
|
||||
const RoutingInfo = createRoutingInfo(NetworkConfig, {
|
||||
contentTopic: ContentTopic
|
||||
});
|
||||
import { runTest, setupTest } from "./sharedTestUtils.js";
|
||||
|
||||
describe("High Throughput Messaging", function () {
|
||||
const testDurationMs = 20 * 60 * 1000; // 20 minutes
|
||||
this.timeout(testDurationMs * 1.1);
|
||||
let waku: LightNode;
|
||||
let nwaku: ServiceNode;
|
||||
let messageCollector: MessageCollector;
|
||||
const testContext = {};
|
||||
|
||||
beforeEachCustom(this, async () => {
|
||||
nwaku = new ServiceNode(makeLogFileName(this.ctx));
|
||||
messageCollector = new MessageCollector(nwaku);
|
||||
});
|
||||
setupTest(this, testContext);
|
||||
|
||||
afterEachCustom(this, async () => {
|
||||
await tearDownNodes(nwaku, waku);
|
||||
});
|
||||
|
||||
it("Send/Receive thousands of messages quickly", async function () {
|
||||
const testStart = new Date();
|
||||
const testEnd = Date.now() + testDurationMs;
|
||||
|
||||
const report: {
|
||||
messageId: number;
|
||||
timestamp: string;
|
||||
sent: boolean;
|
||||
received: boolean;
|
||||
error?: string;
|
||||
}[] = [];
|
||||
|
||||
await nwaku.start(
|
||||
{
|
||||
store: true,
|
||||
filter: true,
|
||||
relay: true,
|
||||
clusterId: NetworkConfig.clusterId,
|
||||
numShardsInNetwork: NetworkConfig.numShardsInCluster,
|
||||
contentTopic: [ContentTopic]
|
||||
},
|
||||
{ retries: 3 }
|
||||
);
|
||||
|
||||
await delay(1000);
|
||||
|
||||
await nwaku.ensureSubscriptions([
|
||||
contentTopicToPubsubTopic(
|
||||
ContentTopic,
|
||||
NetworkConfig.clusterId,
|
||||
NetworkConfig.numShardsInCluster
|
||||
)
|
||||
]);
|
||||
|
||||
waku = await createLightNode({ networkConfig: NetworkConfig });
|
||||
await waku.start();
|
||||
await waku.dial(await nwaku.getMultiaddrWithId());
|
||||
await waku.waitForPeers([Protocols.Filter]);
|
||||
|
||||
const decoder = createDecoder(ContentTopic, RoutingInfo);
|
||||
const hasSubscribed = await waku.filter.subscribe(
|
||||
[decoder],
|
||||
messageCollector.callback
|
||||
);
|
||||
if (!hasSubscribed) throw new Error("Failed to subscribe from the start.");
|
||||
|
||||
let messageId = 0;
|
||||
|
||||
// Send messages as fast as possible until testEnd
|
||||
while (Date.now() < testEnd) {
|
||||
const now = new Date();
|
||||
const message = `msg-${messageId}`;
|
||||
let sent = false;
|
||||
let received = false;
|
||||
let err: string | undefined;
|
||||
|
||||
try {
|
||||
await nwaku.sendMessage(
|
||||
ServiceNode.toMessageRpcQuery({
|
||||
contentTopic: ContentTopic,
|
||||
payload: utf8ToBytes(message)
|
||||
}),
|
||||
RoutingInfo
|
||||
);
|
||||
sent = true;
|
||||
|
||||
received = await messageCollector.waitForMessages(1, {
|
||||
timeoutDuration: 2000
|
||||
});
|
||||
|
||||
if (received) {
|
||||
messageCollector.verifyReceivedMessage(0, {
|
||||
expectedMessageText: message,
|
||||
expectedContentTopic: ContentTopic,
|
||||
expectedPubsubTopic: RoutingInfo.pubsubTopic
|
||||
});
|
||||
}
|
||||
} catch (e: any) {
|
||||
err = e.message || String(e);
|
||||
}
|
||||
|
||||
report.push({
|
||||
messageId,
|
||||
timestamp: now.toISOString(),
|
||||
sent,
|
||||
received,
|
||||
error: err
|
||||
});
|
||||
|
||||
messageId++;
|
||||
messageCollector.list = []; // clearing the message collector
|
||||
}
|
||||
|
||||
const failedMessages = report.filter(
|
||||
(m) => !m.sent || !m.received || m.error
|
||||
);
|
||||
|
||||
console.log("\n=== High Throughput Test Summary ===");
|
||||
console.log("Start time:", testStart.toISOString());
|
||||
console.log("End time:", new Date().toISOString());
|
||||
console.log("Total messages:", report.length);
|
||||
console.log("Failures:", failedMessages.length);
|
||||
|
||||
if (failedMessages.length > 0) {
|
||||
console.log("\n--- Failed Messages ---");
|
||||
for (const fail of failedMessages) {
|
||||
console.log(
|
||||
`#${fail.messageId} @ ${fail.timestamp} | sent: ${fail.sent} | received: ${fail.received} | error: ${fail.error || "N/A"}`
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
expect(
|
||||
failedMessages.length,
|
||||
`Some messages failed: ${failedMessages.length}`
|
||||
).to.eq(0);
|
||||
runTest({
|
||||
testContext: testContext,
|
||||
testDurationMs: testDurationMs,
|
||||
testName: "High Throughput Messaging",
|
||||
messageGenerator: (messageId: number) => `High-Throughput-${messageId}`,
|
||||
delayBetweenMessagesMs: 0
|
||||
});
|
||||
});
|
||||
|
||||
@ -1,158 +1,16 @@
|
||||
import { LightNode, Protocols } from "@waku/interfaces";
|
||||
import { createDecoder, createLightNode, utf8ToBytes } from "@waku/sdk";
|
||||
import {
|
||||
contentTopicToPubsubTopic,
|
||||
createRoutingInfo,
|
||||
delay
|
||||
} from "@waku/utils";
|
||||
import { expect } from "chai";
|
||||
|
||||
import {
|
||||
afterEachCustom,
|
||||
beforeEachCustom,
|
||||
makeLogFileName,
|
||||
MessageCollector,
|
||||
ServiceNode,
|
||||
tearDownNodes
|
||||
} from "../../tests/src/index.js";
|
||||
|
||||
const ContentTopic = "/waku/2/content/test.js";
|
||||
import { runTest, setupTest } from "./sharedTestUtils.js";
|
||||
|
||||
describe("Longevity", function () {
|
||||
const testDurationMs = 2 * 60 * 60 * 1000; // 2 hours
|
||||
this.timeout(testDurationMs * 1.1);
|
||||
let waku: LightNode;
|
||||
let nwaku: ServiceNode;
|
||||
let messageCollector: MessageCollector;
|
||||
const testContext = {};
|
||||
|
||||
beforeEachCustom(this, async () => {
|
||||
nwaku = new ServiceNode(makeLogFileName(this.ctx));
|
||||
messageCollector = new MessageCollector(nwaku);
|
||||
});
|
||||
setupTest(this, testContext);
|
||||
|
||||
afterEachCustom(this, async () => {
|
||||
await tearDownNodes(nwaku, waku);
|
||||
});
|
||||
|
||||
it("Filter - 2 hours", async function () {
|
||||
const networkConfig = { clusterId: 0, numShardsInCluster: 8 };
|
||||
|
||||
const testStart = new Date();
|
||||
|
||||
const testEnd = Date.now() + testDurationMs;
|
||||
|
||||
const report: {
|
||||
messageId: number;
|
||||
timestamp: string;
|
||||
sent: boolean;
|
||||
received: boolean;
|
||||
error?: string;
|
||||
}[] = [];
|
||||
|
||||
await nwaku.start(
|
||||
{
|
||||
store: true,
|
||||
filter: true,
|
||||
relay: true,
|
||||
clusterId: 0,
|
||||
shard: [0],
|
||||
contentTopic: [ContentTopic]
|
||||
},
|
||||
{ retries: 3 }
|
||||
);
|
||||
|
||||
await nwaku.ensureSubscriptions([
|
||||
contentTopicToPubsubTopic(
|
||||
ContentTopic,
|
||||
networkConfig.clusterId,
|
||||
networkConfig.numShardsInCluster
|
||||
)
|
||||
]);
|
||||
|
||||
waku = await createLightNode({ networkConfig });
|
||||
await waku.start();
|
||||
await waku.dial(await nwaku.getMultiaddrWithId());
|
||||
await waku.waitForPeers([Protocols.Filter]);
|
||||
|
||||
const routingInfo = createRoutingInfo(networkConfig, {
|
||||
contentTopic: ContentTopic
|
||||
});
|
||||
const decoder = createDecoder(ContentTopic, routingInfo);
|
||||
const hasSubscribed = await waku.filter.subscribe(
|
||||
[decoder],
|
||||
messageCollector.callback
|
||||
);
|
||||
if (!hasSubscribed) throw new Error("Failed to subscribe from the start.");
|
||||
|
||||
let messageId = 0;
|
||||
|
||||
while (Date.now() < testEnd) {
|
||||
const now = new Date();
|
||||
const message = `ping-${messageId}`;
|
||||
let sent = false;
|
||||
let received = false;
|
||||
let err: string | undefined;
|
||||
|
||||
try {
|
||||
await nwaku.sendMessage(
|
||||
ServiceNode.toMessageRpcQuery({
|
||||
contentTopic: ContentTopic,
|
||||
payload: utf8ToBytes(message)
|
||||
}),
|
||||
routingInfo
|
||||
);
|
||||
sent = true;
|
||||
|
||||
received = await messageCollector.waitForMessages(1, {
|
||||
timeoutDuration: 5000
|
||||
});
|
||||
|
||||
if (received) {
|
||||
messageCollector.verifyReceivedMessage(0, {
|
||||
expectedMessageText: message,
|
||||
expectedContentTopic: ContentTopic,
|
||||
expectedPubsubTopic: routingInfo.pubsubTopic
|
||||
});
|
||||
}
|
||||
} catch (e: any) {
|
||||
err = e.message || String(e);
|
||||
}
|
||||
|
||||
report.push({
|
||||
messageId,
|
||||
timestamp: now.toISOString(),
|
||||
sent,
|
||||
received,
|
||||
error: err
|
||||
});
|
||||
|
||||
messageId++;
|
||||
messageCollector.list = []; // clearing the message collector
|
||||
await delay(400);
|
||||
}
|
||||
|
||||
const failedMessages = report.filter(
|
||||
(m) => !m.sent || !m.received || m.error
|
||||
);
|
||||
|
||||
console.log("\n=== Longevity Test Summary ===");
|
||||
console.log("Start time:", testStart.toISOString());
|
||||
console.log("End time:", new Date().toISOString());
|
||||
console.log("Total messages:", report.length);
|
||||
console.log("Failures:", failedMessages.length);
|
||||
|
||||
if (failedMessages.length > 0) {
|
||||
console.log("\n--- Failed Messages ---");
|
||||
for (const fail of failedMessages) {
|
||||
console.log(
|
||||
`#${fail.messageId} @ ${fail.timestamp} | sent: ${fail.sent} | received: ${fail.received} | error: ${fail.error || "N/A"}`
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
expect(
|
||||
failedMessages.length,
|
||||
`Some messages failed: ${failedMessages.length}`
|
||||
).to.eq(0);
|
||||
runTest({
|
||||
testContext: testContext,
|
||||
testDurationMs: testDurationMs,
|
||||
testName: "Longevity",
|
||||
messageGenerator: (messageId: number) => `Longevity-${messageId}`,
|
||||
delayBetweenMessagesMs: 400
|
||||
});
|
||||
});
|
||||
|
||||
26
packages/reliability-tests/tests/low-bandwidth.spec.ts
Normal file
26
packages/reliability-tests/tests/low-bandwidth.spec.ts
Normal file
@ -0,0 +1,26 @@
|
||||
import { execCommand, runTest, setupTest } from "./sharedTestUtils.js";
|
||||
|
||||
describe("Low Bandwith Test", function () {
|
||||
const testDurationMs = 10 * 60 * 1000; // 10 mins
|
||||
const testContext = {};
|
||||
|
||||
setupTest(this, testContext);
|
||||
|
||||
beforeEach(async () => {
|
||||
execCommand(
|
||||
"sudo tc qdisc add dev eth0 root tbf rate 1mbit burst 32kbit limit 12500"
|
||||
);
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
execCommand("sudo tc qdisc del dev eth0 root");
|
||||
});
|
||||
|
||||
runTest({
|
||||
testContext: testContext,
|
||||
testDurationMs: testDurationMs,
|
||||
testName: "Low Bandwith Test",
|
||||
messageGenerator: (messageId: number) => `Low-Bandwith-${messageId}`,
|
||||
delayBetweenMessagesMs: 400
|
||||
});
|
||||
});
|
||||
26
packages/reliability-tests/tests/network-latency.spec.ts
Normal file
26
packages/reliability-tests/tests/network-latency.spec.ts
Normal file
@ -0,0 +1,26 @@
|
||||
import { execCommand, runTest, setupTest } from "./sharedTestUtils.js";
|
||||
|
||||
describe("Network Latency and Jitter Test", function () {
|
||||
const testDurationMs = 10 * 60 * 1000; // 10 mins
|
||||
const testContext = {};
|
||||
|
||||
setupTest(this, testContext);
|
||||
|
||||
beforeEach(async () => {
|
||||
execCommand(
|
||||
"sudo tc qdisc add dev eth0 root netem delay 300ms 50ms distribution normal"
|
||||
);
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
execCommand("sudo tc qdisc del dev eth0 root netem");
|
||||
});
|
||||
|
||||
runTest({
|
||||
testContext: testContext,
|
||||
testDurationMs: testDurationMs,
|
||||
testName: "Network Latency and Jitter Test",
|
||||
messageGenerator: (messageId: number) => `Network-Latency-${messageId}`,
|
||||
delayBetweenMessagesMs: 400
|
||||
});
|
||||
});
|
||||
24
packages/reliability-tests/tests/packet-loss.spec.ts
Normal file
24
packages/reliability-tests/tests/packet-loss.spec.ts
Normal file
@ -0,0 +1,24 @@
|
||||
import { execCommand, runTest, setupTest } from "./sharedTestUtils.js";
|
||||
|
||||
describe("Packet Loss Test", function () {
|
||||
const testDurationMs = 10 * 60 * 1000; // 10 mins
|
||||
const testContext = {};
|
||||
|
||||
setupTest(this, testContext);
|
||||
|
||||
beforeEach(async () => {
|
||||
execCommand("sudo tc qdisc add dev eth0 root netem loss 2%");
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
execCommand("sudo tc qdisc del dev eth0 root netem");
|
||||
});
|
||||
|
||||
runTest({
|
||||
testContext: testContext,
|
||||
testDurationMs: testDurationMs,
|
||||
testName: "Packet Loss Test",
|
||||
messageGenerator: (messageId: number) => `Packet-Loss-${messageId}`,
|
||||
delayBetweenMessagesMs: 400
|
||||
});
|
||||
});
|
||||
271
packages/reliability-tests/tests/sharedTestUtils.ts
Normal file
271
packages/reliability-tests/tests/sharedTestUtils.ts
Normal file
@ -0,0 +1,271 @@
|
||||
import { execSync } from "child_process";
|
||||
|
||||
import { AutoSharding, LightNode, Protocols } from "@waku/interfaces";
|
||||
import { createDecoder, createLightNode, utf8ToBytes } from "@waku/sdk";
|
||||
import { createRoutingInfo, delay } from "@waku/utils";
|
||||
import { expect } from "chai";
|
||||
|
||||
import {
|
||||
afterEachCustom,
|
||||
beforeEachCustom,
|
||||
makeLogFileName,
|
||||
MessageCollector,
|
||||
ServiceNode,
|
||||
tearDownNodes
|
||||
} from "../../tests/src/index.js";
|
||||
|
||||
export interface TestContext {
|
||||
waku?: LightNode;
|
||||
nwaku?: ServiceNode;
|
||||
messageCollector?: MessageCollector;
|
||||
report?: Array<{
|
||||
messageId: number;
|
||||
size?: number;
|
||||
timestamp: string;
|
||||
sent: boolean;
|
||||
received: boolean;
|
||||
error?: string;
|
||||
}>;
|
||||
}
|
||||
|
||||
/* eslint-disable no-console */
|
||||
|
||||
export function setupTest(ctx: Mocha.Suite, testContext: TestContext): void {
|
||||
beforeEachCustom(ctx, async () => {
|
||||
testContext.nwaku = new ServiceNode(makeLogFileName(ctx.ctx));
|
||||
testContext.messageCollector = new MessageCollector();
|
||||
});
|
||||
|
||||
afterEachCustom(ctx, async () => {
|
||||
if (testContext.nwaku && testContext.waku) {
|
||||
await tearDownNodes(testContext.nwaku, testContext.waku);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
export function printSizeDistributionReport(
|
||||
report: TestContext["report"]
|
||||
): void {
|
||||
if (!report) return;
|
||||
|
||||
const sizes = report.map((r) => r.size ?? 0);
|
||||
const sizesSet = new Set(sizes);
|
||||
if (sizesSet.size <= 1) return; // Only one size, no need to print distribution
|
||||
|
||||
const buckets = [10, 100, 1000, 10000, 100000];
|
||||
const sizeCounts: Record<number, number> = {};
|
||||
for (const entry of report) {
|
||||
if (entry.size !== undefined) {
|
||||
// Find closest bucket
|
||||
let closest = buckets[0];
|
||||
let minDiff = Math.abs(entry.size - buckets[0]);
|
||||
for (const b of buckets) {
|
||||
const diff = Math.abs(entry.size - b);
|
||||
if (diff < minDiff) {
|
||||
minDiff = diff;
|
||||
closest = b;
|
||||
}
|
||||
}
|
||||
sizeCounts[closest] = (sizeCounts[closest] || 0) + 1;
|
||||
}
|
||||
}
|
||||
console.log("\nMessage size distribution (mapped to fixed buckets):");
|
||||
for (const size of buckets) {
|
||||
console.log(`Size ${size} bytes: ${sizeCounts[size] || 0} messages`);
|
||||
}
|
||||
}
|
||||
|
||||
export interface RunTestOptions {
|
||||
testContext: TestContext;
|
||||
testDurationMs: number;
|
||||
testName: string;
|
||||
messageGenerator?: (messageId: number) => string;
|
||||
messageTimeoutMs?: number;
|
||||
delayBetweenMessagesMs?: number;
|
||||
}
|
||||
|
||||
export function runTest(options: RunTestOptions): void {
|
||||
const {
|
||||
testContext,
|
||||
testDurationMs,
|
||||
testName,
|
||||
messageGenerator,
|
||||
delayBetweenMessagesMs = 400
|
||||
} = options;
|
||||
|
||||
describe(testName, function () {
|
||||
this.timeout(testDurationMs * 1.1); // Timing out if test runs for 10% more than expected. Used to avoid hangs and freezes
|
||||
|
||||
it(testName, async function () {
|
||||
const clusterId = 2;
|
||||
const shards = [1];
|
||||
const numShardsInCluster = 8;
|
||||
|
||||
const contentTopic = "/waku/2/content/test.js";
|
||||
|
||||
const testStart = new Date();
|
||||
const testEnd = Date.now() + testDurationMs;
|
||||
|
||||
const testNetworkConfig: AutoSharding = {
|
||||
clusterId: clusterId,
|
||||
numShardsInCluster: numShardsInCluster
|
||||
};
|
||||
const testRoutingInfo = createRoutingInfo(testNetworkConfig, {
|
||||
contentTopic: contentTopic
|
||||
});
|
||||
|
||||
const report: {
|
||||
messageId: number;
|
||||
size?: number;
|
||||
timestamp: string;
|
||||
sent: boolean;
|
||||
received: boolean;
|
||||
error?: string;
|
||||
}[] = [];
|
||||
|
||||
await testContext.nwaku!.start(
|
||||
{
|
||||
store: true,
|
||||
filter: true,
|
||||
relay: true,
|
||||
clusterId,
|
||||
shard: shards,
|
||||
numShardsInNetwork: numShardsInCluster,
|
||||
contentTopic: [contentTopic]
|
||||
},
|
||||
{ retries: 3 }
|
||||
);
|
||||
|
||||
await delay(1000);
|
||||
|
||||
await testContext.nwaku!.ensureSubscriptions([
|
||||
testRoutingInfo.pubsubTopic
|
||||
]);
|
||||
|
||||
testContext.waku = await createLightNode({
|
||||
networkConfig: { clusterId, numShardsInCluster }
|
||||
});
|
||||
await testContext.waku.start();
|
||||
await testContext.waku.dial(
|
||||
await testContext.nwaku!.getMultiaddrWithId()
|
||||
);
|
||||
await testContext.waku.waitForPeers([Protocols.Filter]);
|
||||
|
||||
const decoder = createDecoder(contentTopic, testRoutingInfo);
|
||||
const hasSubscribed = await testContext.waku.filter.subscribe(
|
||||
[decoder],
|
||||
testContext.messageCollector!.callback
|
||||
);
|
||||
if (!hasSubscribed)
|
||||
throw new Error("Failed to subscribe from the start.");
|
||||
|
||||
let messageId = 0;
|
||||
|
||||
console.log("Received messages via filter:");
|
||||
|
||||
while (Date.now() < testEnd) {
|
||||
const now = new Date();
|
||||
const message = messageGenerator
|
||||
? messageGenerator(messageId)
|
||||
: `ping-${messageId}`;
|
||||
let sent = false;
|
||||
let received = false;
|
||||
let err: string | undefined;
|
||||
|
||||
try {
|
||||
await testContext.nwaku!.sendMessage(
|
||||
ServiceNode.toMessageRpcQuery({
|
||||
contentTopic,
|
||||
payload: utf8ToBytes(message)
|
||||
}),
|
||||
testRoutingInfo
|
||||
);
|
||||
sent = true;
|
||||
|
||||
received = await testContext.messageCollector!.waitForMessages(1, {
|
||||
timeoutDuration: 5000
|
||||
});
|
||||
|
||||
if (received) {
|
||||
testContext.messageCollector!.verifyReceivedMessage(0, {
|
||||
expectedMessageText: message,
|
||||
expectedContentTopic: contentTopic,
|
||||
expectedPubsubTopic: testRoutingInfo.pubsubTopic
|
||||
});
|
||||
}
|
||||
|
||||
console.log(
|
||||
JSON.stringify(testContext.messageCollector!.getMessage(0))
|
||||
);
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
} catch (e: any) {
|
||||
err = e.message || String(e);
|
||||
console.log(`Issue/Error/Failure for message: ${String(e)}`);
|
||||
}
|
||||
|
||||
report.push({
|
||||
messageId,
|
||||
size: message.length,
|
||||
timestamp: now.toISOString(),
|
||||
sent,
|
||||
received,
|
||||
error: err
|
||||
});
|
||||
|
||||
messageId++;
|
||||
testContext.messageCollector!.list = []; // clearing the message collector
|
||||
await delay(delayBetweenMessagesMs);
|
||||
}
|
||||
|
||||
testContext.report = report;
|
||||
|
||||
const failedMessages = report.filter(
|
||||
(m) => !m.sent || !m.received || m.error
|
||||
);
|
||||
|
||||
console.log(`\n=== ${testName} Summary ===`);
|
||||
console.log("Start time:", testStart.toISOString());
|
||||
console.log("End time:", new Date().toISOString());
|
||||
console.log("Total messages:", report.length);
|
||||
console.log("Failures:", failedMessages.length);
|
||||
|
||||
if (failedMessages.length > 0) {
|
||||
console.log("\n--- Failed Messages ---");
|
||||
for (const fail of failedMessages) {
|
||||
console.log(
|
||||
`#${fail.messageId} (size: ${fail.size} bytes) @ ${fail.timestamp} | sent: ${fail.sent} | received: ${fail.received} | error: ${fail.error || "N/A"}`
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
expect(
|
||||
failedMessages.length,
|
||||
`Some messages failed: ${failedMessages.length}`
|
||||
).to.eq(0);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
export function generateRandomString(size: number): string {
|
||||
const chars =
|
||||
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
|
||||
|
||||
let result = "";
|
||||
for (let i = 0; i < size; i++) {
|
||||
result += chars.charAt(Math.floor(Math.random() * chars.length));
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
export function execCommand(command: string): void {
|
||||
try {
|
||||
execSync(command);
|
||||
} catch (e) {
|
||||
console.warn(
|
||||
`Failed to execute command "${command}", continuing without it:`,
|
||||
e
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/* eslint-enable no-console */
|
||||
@ -1,187 +1,32 @@
|
||||
import { LightNode, Protocols } from "@waku/interfaces";
|
||||
import { createDecoder, createLightNode, utf8ToBytes } from "@waku/sdk";
|
||||
import {
|
||||
contentTopicToPubsubTopic,
|
||||
createRoutingInfo,
|
||||
delay
|
||||
} from "@waku/utils";
|
||||
import { expect } from "chai";
|
||||
generateRandomString,
|
||||
printSizeDistributionReport,
|
||||
runTest,
|
||||
setupTest
|
||||
} from "./sharedTestUtils.js";
|
||||
|
||||
import {
|
||||
afterEachCustom,
|
||||
beforeEachCustom,
|
||||
makeLogFileName,
|
||||
MessageCollector,
|
||||
ServiceNode,
|
||||
tearDownNodes
|
||||
} from "../../tests/src/index.js";
|
||||
|
||||
const ContentTopic = "/waku/2/content/test.throughput-sizes.js";
|
||||
|
||||
function generateRandomString(size: number): string {
|
||||
const chars =
|
||||
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
|
||||
|
||||
let result = "";
|
||||
for (let i = 0; i < size; i++) {
|
||||
result += chars.charAt(Math.floor(Math.random() * chars.length));
|
||||
}
|
||||
return result;
|
||||
}
|
||||
const sizes = [10, 100, 1000, 10_000, 100_000]; // bytes
|
||||
|
||||
describe("Throughput Sanity Checks - Different Message Sizes", function () {
|
||||
const testDurationMs = 20 * 60 * 1000; // 20 minute
|
||||
this.timeout(testDurationMs * 1.1);
|
||||
let waku: LightNode;
|
||||
let nwaku: ServiceNode;
|
||||
let messageCollector: MessageCollector;
|
||||
const testContext: { report?: any } = {};
|
||||
|
||||
beforeEachCustom(this, async () => {
|
||||
nwaku = new ServiceNode(makeLogFileName(this.ctx));
|
||||
messageCollector = new MessageCollector(nwaku);
|
||||
setupTest(this, testContext);
|
||||
|
||||
afterEach(async () => {
|
||||
if (testContext.report) {
|
||||
printSizeDistributionReport(testContext.report);
|
||||
}
|
||||
});
|
||||
|
||||
afterEachCustom(this, async () => {
|
||||
await tearDownNodes(nwaku, waku);
|
||||
});
|
||||
|
||||
it("Send/Receive messages of varying sizes", async function () {
|
||||
const networkConfig = { clusterId: 0, numShardsInCluster: 8 };
|
||||
|
||||
const testStart = new Date();
|
||||
const testEnd = Date.now() + testDurationMs;
|
||||
|
||||
const sizes = [10, 100, 1000, 10_000, 100_000]; // bytes
|
||||
|
||||
await nwaku.start(
|
||||
{
|
||||
store: true,
|
||||
filter: true,
|
||||
relay: true,
|
||||
clusterId: 0,
|
||||
shard: [0],
|
||||
contentTopic: [ContentTopic]
|
||||
},
|
||||
{ retries: 3 }
|
||||
);
|
||||
|
||||
await delay(1000);
|
||||
|
||||
await nwaku.ensureSubscriptions([
|
||||
contentTopicToPubsubTopic(
|
||||
ContentTopic,
|
||||
networkConfig.clusterId,
|
||||
networkConfig.numShardsInCluster
|
||||
)
|
||||
]);
|
||||
|
||||
waku = await createLightNode({ networkConfig });
|
||||
await waku.start();
|
||||
await waku.dial(await nwaku.getMultiaddrWithId());
|
||||
await waku.waitForPeers([Protocols.Filter]);
|
||||
|
||||
const routingInfo = createRoutingInfo(networkConfig, {
|
||||
contentTopic: ContentTopic
|
||||
});
|
||||
const decoder = createDecoder(ContentTopic, routingInfo);
|
||||
const hasSubscribed = await waku.filter.subscribe(
|
||||
[decoder],
|
||||
messageCollector.callback
|
||||
);
|
||||
if (!hasSubscribed) throw new Error("Failed to subscribe from the start.");
|
||||
|
||||
let messageId = 0;
|
||||
const report: {
|
||||
messageId: number;
|
||||
size: number;
|
||||
timestamp: string;
|
||||
sent: boolean;
|
||||
received: boolean;
|
||||
error?: string;
|
||||
}[] = [];
|
||||
|
||||
while (Date.now() < testEnd) {
|
||||
const now = new Date();
|
||||
// Pick a random size from sizes array
|
||||
runTest({
|
||||
testContext: testContext,
|
||||
testDurationMs: testDurationMs,
|
||||
testName: "Throughput Sanity Checks - Different Message Sizes",
|
||||
messageGenerator: (_messageId: number) => {
|
||||
const size = sizes[Math.floor(Math.random() * sizes.length)];
|
||||
const message = generateRandomString(size);
|
||||
let sent = false;
|
||||
let received = false;
|
||||
let err: string | undefined;
|
||||
|
||||
try {
|
||||
await nwaku.sendMessage(
|
||||
ServiceNode.toMessageRpcQuery({
|
||||
contentTopic: ContentTopic,
|
||||
payload: utf8ToBytes(message)
|
||||
}),
|
||||
routingInfo
|
||||
);
|
||||
sent = true;
|
||||
|
||||
received = await messageCollector.waitForMessages(1, {
|
||||
timeoutDuration: 3000
|
||||
});
|
||||
|
||||
if (received) {
|
||||
messageCollector.verifyReceivedMessage(0, {
|
||||
expectedMessageText: message,
|
||||
expectedContentTopic: ContentTopic,
|
||||
expectedPubsubTopic: routingInfo.pubsubTopic
|
||||
});
|
||||
}
|
||||
} catch (e: any) {
|
||||
err = e.message || String(e);
|
||||
}
|
||||
|
||||
report.push({
|
||||
messageId,
|
||||
size,
|
||||
timestamp: now.toISOString(),
|
||||
sent,
|
||||
received,
|
||||
error: err
|
||||
});
|
||||
|
||||
messageId++;
|
||||
messageCollector.list = []; // clearing the message collector
|
||||
await delay(400);
|
||||
}
|
||||
|
||||
const failedMessages = report.filter(
|
||||
(m) => !m.sent || !m.received || m.error
|
||||
);
|
||||
|
||||
console.log("\n=== Throughput Sizes Test Summary ===");
|
||||
console.log("Start time:", testStart.toISOString());
|
||||
console.log("End time:", new Date().toISOString());
|
||||
console.log("Total messages:", report.length);
|
||||
console.log("Failures:", failedMessages.length);
|
||||
|
||||
// Additional size info
|
||||
const sizeCounts: Record<number, number> = {};
|
||||
for (const entry of report) {
|
||||
sizeCounts[entry.size] = (sizeCounts[entry.size] || 0) + 1;
|
||||
}
|
||||
console.log("\nMessage size distribution:");
|
||||
for (const size of Object.keys(sizeCounts).sort(
|
||||
(a, b) => Number(a) - Number(b)
|
||||
)) {
|
||||
console.log(`Size ${size} bytes: ${sizeCounts[Number(size)]} messages`);
|
||||
}
|
||||
|
||||
if (failedMessages.length > 0) {
|
||||
console.log("\n--- Failed Messages ---");
|
||||
for (const fail of failedMessages) {
|
||||
console.log(
|
||||
`#${fail.messageId} (size: ${fail.size} bytes) @ ${fail.timestamp} | sent: ${fail.sent} | received: ${fail.received} | error: ${fail.error || "N/A"}`
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
expect(
|
||||
failedMessages.length,
|
||||
`Some messages failed: ${failedMessages.length}`
|
||||
).to.eq(0);
|
||||
return generateRandomString(size);
|
||||
},
|
||||
delayBetweenMessagesMs: 400
|
||||
});
|
||||
});
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user