fbarbu15 c1f9471cd7
chore: extra reliability tests 2 (#2450)
* chore: add throughput reliability tests

* chore: add capability to run all

* chore: add network-latency reliability tests

* chore: add network-latency reliability tests

* chore: add other network reliability tests

* chore: add other network reliability tests

* chore: fix tc cleanup

* chore: refactor common code

* chore: refactor common code

* chore: refactor common code

* chore: refactor common code

* chore: refactor common code

* chore: refactor common code

* chore: refactor common code

* chore: fix

* chore: fix tests

* chore: fix tests
2025-07-29 10:17:43 +03:00

272 lines
7.4 KiB
TypeScript

import { execSync } from "child_process";
import { AutoSharding, LightNode, Protocols } from "@waku/interfaces";
import { createDecoder, createLightNode, utf8ToBytes } from "@waku/sdk";
import { createRoutingInfo, delay } from "@waku/utils";
import { expect } from "chai";
import {
afterEachCustom,
beforeEachCustom,
makeLogFileName,
MessageCollector,
ServiceNode,
tearDownNodes
} from "../../tests/src/index.js";
export interface TestContext {
waku?: LightNode;
nwaku?: ServiceNode;
messageCollector?: MessageCollector;
report?: Array<{
messageId: number;
size?: number;
timestamp: string;
sent: boolean;
received: boolean;
error?: string;
}>;
}
/* eslint-disable no-console */
export function setupTest(ctx: Mocha.Suite, testContext: TestContext): void {
beforeEachCustom(ctx, async () => {
testContext.nwaku = new ServiceNode(makeLogFileName(ctx.ctx));
testContext.messageCollector = new MessageCollector();
});
afterEachCustom(ctx, async () => {
if (testContext.nwaku && testContext.waku) {
await tearDownNodes(testContext.nwaku, testContext.waku);
}
});
}
export function printSizeDistributionReport(
report: TestContext["report"]
): void {
if (!report) return;
const sizes = report.map((r) => r.size ?? 0);
const sizesSet = new Set(sizes);
if (sizesSet.size <= 1) return; // Only one size, no need to print distribution
const buckets = [10, 100, 1000, 10000, 100000];
const sizeCounts: Record<number, number> = {};
for (const entry of report) {
if (entry.size !== undefined) {
// Find closest bucket
let closest = buckets[0];
let minDiff = Math.abs(entry.size - buckets[0]);
for (const b of buckets) {
const diff = Math.abs(entry.size - b);
if (diff < minDiff) {
minDiff = diff;
closest = b;
}
}
sizeCounts[closest] = (sizeCounts[closest] || 0) + 1;
}
}
console.log("\nMessage size distribution (mapped to fixed buckets):");
for (const size of buckets) {
console.log(`Size ${size} bytes: ${sizeCounts[size] || 0} messages`);
}
}
export interface RunTestOptions {
testContext: TestContext;
testDurationMs: number;
testName: string;
messageGenerator?: (messageId: number) => string;
messageTimeoutMs?: number;
delayBetweenMessagesMs?: number;
}
export function runTest(options: RunTestOptions): void {
const {
testContext,
testDurationMs,
testName,
messageGenerator,
delayBetweenMessagesMs = 400
} = options;
describe(testName, function () {
this.timeout(testDurationMs * 1.1); // Timing out if test runs for 10% more than expected. Used to avoid hangs and freezes
it(testName, async function () {
const clusterId = 2;
const shards = [1];
const numShardsInCluster = 8;
const contentTopic = "/waku/2/content/test.js";
const testStart = new Date();
const testEnd = Date.now() + testDurationMs;
const testNetworkConfig: AutoSharding = {
clusterId: clusterId,
numShardsInCluster: numShardsInCluster
};
const testRoutingInfo = createRoutingInfo(testNetworkConfig, {
contentTopic: contentTopic
});
const report: {
messageId: number;
size?: number;
timestamp: string;
sent: boolean;
received: boolean;
error?: string;
}[] = [];
await testContext.nwaku!.start(
{
store: true,
filter: true,
relay: true,
clusterId,
shard: shards,
numShardsInNetwork: numShardsInCluster,
contentTopic: [contentTopic]
},
{ retries: 3 }
);
await delay(1000);
await testContext.nwaku!.ensureSubscriptions([
testRoutingInfo.pubsubTopic
]);
testContext.waku = await createLightNode({
networkConfig: { clusterId, numShardsInCluster }
});
await testContext.waku.start();
await testContext.waku.dial(
await testContext.nwaku!.getMultiaddrWithId()
);
await testContext.waku.waitForPeers([Protocols.Filter]);
const decoder = createDecoder(contentTopic, testRoutingInfo);
const hasSubscribed = await testContext.waku.filter.subscribe(
[decoder],
testContext.messageCollector!.callback
);
if (!hasSubscribed)
throw new Error("Failed to subscribe from the start.");
let messageId = 0;
console.log("Received messages via filter:");
while (Date.now() < testEnd) {
const now = new Date();
const message = messageGenerator
? messageGenerator(messageId)
: `ping-${messageId}`;
let sent = false;
let received = false;
let err: string | undefined;
try {
await testContext.nwaku!.sendMessage(
ServiceNode.toMessageRpcQuery({
contentTopic,
payload: utf8ToBytes(message)
}),
testRoutingInfo
);
sent = true;
received = await testContext.messageCollector!.waitForMessages(1, {
timeoutDuration: 5000
});
if (received) {
testContext.messageCollector!.verifyReceivedMessage(0, {
expectedMessageText: message,
expectedContentTopic: contentTopic,
expectedPubsubTopic: testRoutingInfo.pubsubTopic
});
}
console.log(
JSON.stringify(testContext.messageCollector!.getMessage(0))
);
// eslint-disable-next-line @typescript-eslint/no-explicit-any
} catch (e: any) {
err = e.message || String(e);
console.log(`Issue/Error/Failure for message: ${String(e)}`);
}
report.push({
messageId,
size: message.length,
timestamp: now.toISOString(),
sent,
received,
error: err
});
messageId++;
testContext.messageCollector!.list = []; // clearing the message collector
await delay(delayBetweenMessagesMs);
}
testContext.report = report;
const failedMessages = report.filter(
(m) => !m.sent || !m.received || m.error
);
console.log(`\n=== ${testName} Summary ===`);
console.log("Start time:", testStart.toISOString());
console.log("End time:", new Date().toISOString());
console.log("Total messages:", report.length);
console.log("Failures:", failedMessages.length);
if (failedMessages.length > 0) {
console.log("\n--- Failed Messages ---");
for (const fail of failedMessages) {
console.log(
`#${fail.messageId} (size: ${fail.size} bytes) @ ${fail.timestamp} | sent: ${fail.sent} | received: ${fail.received} | error: ${fail.error || "N/A"}`
);
}
}
expect(
failedMessages.length,
`Some messages failed: ${failedMessages.length}`
).to.eq(0);
});
});
}
export function generateRandomString(size: number): string {
const chars =
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
let result = "";
for (let i = 0; i < size; i++) {
result += chars.charAt(Math.floor(Math.random() * chars.length));
}
return result;
}
export function execCommand(command: string): void {
try {
execSync(command);
} catch (e) {
console.warn(
`Failed to execute command "${command}", continuing without it:`,
e
);
}
}
/* eslint-enable no-console */