mirror of
https://github.com/logos-messaging/js-waku.git
synced 2026-01-02 13:53:12 +00:00
* chore: add throughput reliability tests * chore: add capability to run all * chore: add network-latency reliability tests * chore: add network-latency reliability tests * chore: add other network reliability tests * chore: add other network reliability tests * chore: fix tc cleanup * chore: refactor common code * chore: refactor common code * chore: refactor common code * chore: refactor common code * chore: refactor common code * chore: refactor common code * chore: refactor common code * chore: fix * chore: fix tests * chore: fix tests
272 lines
7.4 KiB
TypeScript
272 lines
7.4 KiB
TypeScript
import { execSync } from "child_process";
|
|
|
|
import { AutoSharding, LightNode, Protocols } from "@waku/interfaces";
|
|
import { createDecoder, createLightNode, utf8ToBytes } from "@waku/sdk";
|
|
import { createRoutingInfo, delay } from "@waku/utils";
|
|
import { expect } from "chai";
|
|
|
|
import {
|
|
afterEachCustom,
|
|
beforeEachCustom,
|
|
makeLogFileName,
|
|
MessageCollector,
|
|
ServiceNode,
|
|
tearDownNodes
|
|
} from "../../tests/src/index.js";
|
|
|
|
export interface TestContext {
|
|
waku?: LightNode;
|
|
nwaku?: ServiceNode;
|
|
messageCollector?: MessageCollector;
|
|
report?: Array<{
|
|
messageId: number;
|
|
size?: number;
|
|
timestamp: string;
|
|
sent: boolean;
|
|
received: boolean;
|
|
error?: string;
|
|
}>;
|
|
}
|
|
|
|
/* eslint-disable no-console */
|
|
|
|
export function setupTest(ctx: Mocha.Suite, testContext: TestContext): void {
|
|
beforeEachCustom(ctx, async () => {
|
|
testContext.nwaku = new ServiceNode(makeLogFileName(ctx.ctx));
|
|
testContext.messageCollector = new MessageCollector();
|
|
});
|
|
|
|
afterEachCustom(ctx, async () => {
|
|
if (testContext.nwaku && testContext.waku) {
|
|
await tearDownNodes(testContext.nwaku, testContext.waku);
|
|
}
|
|
});
|
|
}
|
|
|
|
export function printSizeDistributionReport(
|
|
report: TestContext["report"]
|
|
): void {
|
|
if (!report) return;
|
|
|
|
const sizes = report.map((r) => r.size ?? 0);
|
|
const sizesSet = new Set(sizes);
|
|
if (sizesSet.size <= 1) return; // Only one size, no need to print distribution
|
|
|
|
const buckets = [10, 100, 1000, 10000, 100000];
|
|
const sizeCounts: Record<number, number> = {};
|
|
for (const entry of report) {
|
|
if (entry.size !== undefined) {
|
|
// Find closest bucket
|
|
let closest = buckets[0];
|
|
let minDiff = Math.abs(entry.size - buckets[0]);
|
|
for (const b of buckets) {
|
|
const diff = Math.abs(entry.size - b);
|
|
if (diff < minDiff) {
|
|
minDiff = diff;
|
|
closest = b;
|
|
}
|
|
}
|
|
sizeCounts[closest] = (sizeCounts[closest] || 0) + 1;
|
|
}
|
|
}
|
|
console.log("\nMessage size distribution (mapped to fixed buckets):");
|
|
for (const size of buckets) {
|
|
console.log(`Size ${size} bytes: ${sizeCounts[size] || 0} messages`);
|
|
}
|
|
}
|
|
|
|
export interface RunTestOptions {
|
|
testContext: TestContext;
|
|
testDurationMs: number;
|
|
testName: string;
|
|
messageGenerator?: (messageId: number) => string;
|
|
messageTimeoutMs?: number;
|
|
delayBetweenMessagesMs?: number;
|
|
}
|
|
|
|
export function runTest(options: RunTestOptions): void {
|
|
const {
|
|
testContext,
|
|
testDurationMs,
|
|
testName,
|
|
messageGenerator,
|
|
delayBetweenMessagesMs = 400
|
|
} = options;
|
|
|
|
describe(testName, function () {
|
|
this.timeout(testDurationMs * 1.1); // Timing out if test runs for 10% more than expected. Used to avoid hangs and freezes
|
|
|
|
it(testName, async function () {
|
|
const clusterId = 2;
|
|
const shards = [1];
|
|
const numShardsInCluster = 8;
|
|
|
|
const contentTopic = "/waku/2/content/test.js";
|
|
|
|
const testStart = new Date();
|
|
const testEnd = Date.now() + testDurationMs;
|
|
|
|
const testNetworkConfig: AutoSharding = {
|
|
clusterId: clusterId,
|
|
numShardsInCluster: numShardsInCluster
|
|
};
|
|
const testRoutingInfo = createRoutingInfo(testNetworkConfig, {
|
|
contentTopic: contentTopic
|
|
});
|
|
|
|
const report: {
|
|
messageId: number;
|
|
size?: number;
|
|
timestamp: string;
|
|
sent: boolean;
|
|
received: boolean;
|
|
error?: string;
|
|
}[] = [];
|
|
|
|
await testContext.nwaku!.start(
|
|
{
|
|
store: true,
|
|
filter: true,
|
|
relay: true,
|
|
clusterId,
|
|
shard: shards,
|
|
numShardsInNetwork: numShardsInCluster,
|
|
contentTopic: [contentTopic]
|
|
},
|
|
{ retries: 3 }
|
|
);
|
|
|
|
await delay(1000);
|
|
|
|
await testContext.nwaku!.ensureSubscriptions([
|
|
testRoutingInfo.pubsubTopic
|
|
]);
|
|
|
|
testContext.waku = await createLightNode({
|
|
networkConfig: { clusterId, numShardsInCluster }
|
|
});
|
|
await testContext.waku.start();
|
|
await testContext.waku.dial(
|
|
await testContext.nwaku!.getMultiaddrWithId()
|
|
);
|
|
await testContext.waku.waitForPeers([Protocols.Filter]);
|
|
|
|
const decoder = createDecoder(contentTopic, testRoutingInfo);
|
|
const hasSubscribed = await testContext.waku.filter.subscribe(
|
|
[decoder],
|
|
testContext.messageCollector!.callback
|
|
);
|
|
if (!hasSubscribed)
|
|
throw new Error("Failed to subscribe from the start.");
|
|
|
|
let messageId = 0;
|
|
|
|
console.log("Received messages via filter:");
|
|
|
|
while (Date.now() < testEnd) {
|
|
const now = new Date();
|
|
const message = messageGenerator
|
|
? messageGenerator(messageId)
|
|
: `ping-${messageId}`;
|
|
let sent = false;
|
|
let received = false;
|
|
let err: string | undefined;
|
|
|
|
try {
|
|
await testContext.nwaku!.sendMessage(
|
|
ServiceNode.toMessageRpcQuery({
|
|
contentTopic,
|
|
payload: utf8ToBytes(message)
|
|
}),
|
|
testRoutingInfo
|
|
);
|
|
sent = true;
|
|
|
|
received = await testContext.messageCollector!.waitForMessages(1, {
|
|
timeoutDuration: 5000
|
|
});
|
|
|
|
if (received) {
|
|
testContext.messageCollector!.verifyReceivedMessage(0, {
|
|
expectedMessageText: message,
|
|
expectedContentTopic: contentTopic,
|
|
expectedPubsubTopic: testRoutingInfo.pubsubTopic
|
|
});
|
|
}
|
|
|
|
console.log(
|
|
JSON.stringify(testContext.messageCollector!.getMessage(0))
|
|
);
|
|
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
|
} catch (e: any) {
|
|
err = e.message || String(e);
|
|
console.log(`Issue/Error/Failure for message: ${String(e)}`);
|
|
}
|
|
|
|
report.push({
|
|
messageId,
|
|
size: message.length,
|
|
timestamp: now.toISOString(),
|
|
sent,
|
|
received,
|
|
error: err
|
|
});
|
|
|
|
messageId++;
|
|
testContext.messageCollector!.list = []; // clearing the message collector
|
|
await delay(delayBetweenMessagesMs);
|
|
}
|
|
|
|
testContext.report = report;
|
|
|
|
const failedMessages = report.filter(
|
|
(m) => !m.sent || !m.received || m.error
|
|
);
|
|
|
|
console.log(`\n=== ${testName} Summary ===`);
|
|
console.log("Start time:", testStart.toISOString());
|
|
console.log("End time:", new Date().toISOString());
|
|
console.log("Total messages:", report.length);
|
|
console.log("Failures:", failedMessages.length);
|
|
|
|
if (failedMessages.length > 0) {
|
|
console.log("\n--- Failed Messages ---");
|
|
for (const fail of failedMessages) {
|
|
console.log(
|
|
`#${fail.messageId} (size: ${fail.size} bytes) @ ${fail.timestamp} | sent: ${fail.sent} | received: ${fail.received} | error: ${fail.error || "N/A"}`
|
|
);
|
|
}
|
|
}
|
|
|
|
expect(
|
|
failedMessages.length,
|
|
`Some messages failed: ${failedMessages.length}`
|
|
).to.eq(0);
|
|
});
|
|
});
|
|
}
|
|
|
|
export function generateRandomString(size: number): string {
|
|
const chars =
|
|
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
|
|
|
|
let result = "";
|
|
for (let i = 0; i < size; i++) {
|
|
result += chars.charAt(Math.floor(Math.random() * chars.length));
|
|
}
|
|
return result;
|
|
}
|
|
|
|
export function execCommand(command: string): void {
|
|
try {
|
|
execSync(command);
|
|
} catch (e) {
|
|
console.warn(
|
|
`Failed to execute command "${command}", continuing without it:`,
|
|
e
|
|
);
|
|
}
|
|
}
|
|
|
|
/* eslint-enable no-console */
|