import { DecodedMessage } from "@waku/core"; import { NetworkConfig } from "@waku/interfaces"; import { derivePubsubTopicsFromNetworkConfig, Logger } from "@waku/utils"; import { expect } from "chai"; import { DefaultTestPubsubTopic } from "../constants.js"; import { Args, MessageRpcQuery, MessageRpcResponse } from "../types.js"; import { delay, makeLogFileName } from "../utils/index.js"; import { MessageCollector } from "./message_collector.js"; import { runNodes } from "./runNodes.js"; import { defaultArgs, ServiceNode } from "./service_node.js"; export { ServiceNode, MessageCollector, defaultArgs }; export { runNodes }; const log = new Logger("test:message-collector"); /** * This class is a wrapper over the ServiceNode & MessageCollector class * that allows for the creation & handling of multiple ServiceNodes */ export class ServiceNodesFleet { public static async createAndRun( mochaContext: Mocha.Context, nodesToCreate: number = 3, strictChecking: boolean = false, networkConfig: NetworkConfig, _args?: Args, withoutFilter = false ): Promise { const nodes: ServiceNode[] = []; for (let i = 0; i < nodesToCreate; i++) { const node = new ServiceNode( makeLogFileName(mochaContext) + Math.random().toString(36).substring(7) ); const args = getArgs(networkConfig, _args); // If this is not the first node and previous node had a nodekey, use its multiaddr as static node if (i > 0) { const prevNode = nodes[i - 1]; const multiaddr = await prevNode.getExternalWebsocketMultiaddr(); args.staticnode = multiaddr; } await node.start(args, { retries: 3 }); nodes.push(node); } return new ServiceNodesFleet(nodes, withoutFilter, strictChecking); } /** * Convert a [[WakuMessage]] to a [[WakuRelayMessage]]. The latter is used * by the nwaku JSON-RPC API. */ public static toMessageRpcQuery(message: { payload: Uint8Array; contentTopic: string; timestamp?: Date; }): MessageRpcQuery { return ServiceNode.toMessageRpcQuery(message); } public messageCollector: MultipleNodesMessageCollector; private constructor( public nodes: ServiceNode[], relay: boolean, private strictChecking: boolean ) { const _messageCollectors: MessageCollector[] = []; this.nodes.forEach((node) => { _messageCollectors.push(new MessageCollector(node)); }); this.messageCollector = new MultipleNodesMessageCollector( _messageCollectors, relay ? this.nodes : undefined, strictChecking ); } public get type(): "go-waku" | "nwaku" { const nodeType = new Set( this.nodes.map((node) => { return node.type; }) ); if (nodeType.size > 1) { throw new Error("Multiple node types"); } return nodeType.values().next().value; } public async start(): Promise { const startPromises = this.nodes.map((node) => node.start()); await Promise.all(startPromises); } public async sendRelayMessage( message: MessageRpcQuery, pubsubTopic: string = DefaultTestPubsubTopic ): Promise { const relayMessagePromises: Promise[] = this.nodes.map((node) => node.sendMessage(message, pubsubTopic) ); const relayMessages = await Promise.all(relayMessagePromises); return relayMessages.every((message) => message); } public async confirmMessageLength( numMessages: number, { encryptedPayload }: { encryptedPayload?: boolean } = { encryptedPayload: false } ): Promise { if (encryptedPayload) { const filteredMessageList = Array.from( new Set( this.messageCollector.messageList .filter((msg) => msg.payload?.toString) .map((msg) => msg.payload.toString()) ) ); expect(filteredMessageList.length).to.equal(numMessages); return; } if (this.strictChecking) { await Promise.all( this.nodes.map(async (node) => expect(await node.messages()).to.have.length(numMessages) ) ); } else { // Wait for all promises to resolve and check if any meets the condition const results = await Promise.all( this.nodes.map(async (node) => { const msgs = await node.messages(); return msgs.length === numMessages; }) ); // Check if at least one result meets the condition const conditionMet = results.some((result) => result); expect(conditionMet).to.be.true; } } } class MultipleNodesMessageCollector { public callback: (msg: DecodedMessage) => void = () => {}; public readonly messageList: Array = []; public constructor( private messageCollectors: MessageCollector[], private relayNodes?: ServiceNode[], private strictChecking: boolean = false ) { this.callback = (msg: DecodedMessage): void => { log.info("Got a message"); this.messageList.push(msg); }; } public get count(): number { return this.messageList.length; } public hasMessage(topic: string, text: string): boolean { if (this.strictChecking) { return this.messageCollectors.every((collector) => collector.hasMessage(topic, text) ); } else { return this.messageCollectors.some((collector) => collector.hasMessage(topic, text) ); } } public getMessage(index: number): MessageRpcResponse | DecodedMessage { return this.messageList[index]; } /** * Verifies a received message against expected values on all nodes. * Returns true if any node's collector verifies the message successfully. */ public verifyReceivedMessage( index: number, options: { expectedMessageText: string | Uint8Array | undefined; expectedContentTopic?: string; expectedPubsubTopic?: string; expectedVersion?: number; expectedMeta?: Uint8Array; expectedEphemeral?: boolean; expectedTimestamp?: bigint | number; checkTimestamp?: boolean; } ): boolean { if (this.strictChecking) { return this.messageCollectors.every((collector, _i) => { try { collector.verifyReceivedMessage(index, options); return true; } catch (error) { return false; } }); } else { return this.messageCollectors.some((collector, _i) => { try { collector.verifyReceivedMessage(index, options); return true; } catch (error) { return false; } }); } } /** * Waits for a total number of messages across all nodes. */ public async waitForMessages( numMessages: number, options?: { pubsubTopic?: string; timeoutDuration?: number; exact?: boolean; } ): Promise { const pubsubTopic = options?.pubsubTopic || DefaultTestPubsubTopic; const timeoutDuration = options?.timeoutDuration || 400; const maxTimeout = Math.min(timeoutDuration * numMessages, 30000); const exact = options?.exact || false; try { const timeoutPromise = new Promise((resolve) => { setTimeout(() => { log.warn(`Timeout waiting for messages after ${maxTimeout}ms`); resolve(false); }, maxTimeout); }); const checkMessagesPromise = new Promise((resolve) => { const checkMessages = (): void => { // Check local messages if (this.messageList.length >= numMessages) { if (exact && this.messageList.length !== numMessages) { log.warn( `Was expecting exactly ${numMessages} messages. Received: ${this.messageList.length}` ); resolve(false); return; } resolve(true); return; } if (this.relayNodes) { void Promise.all( this.relayNodes.map((node) => node.messages(pubsubTopic)) ).then((nodeMessages) => { const hasEnoughMessages = this.strictChecking ? nodeMessages.every((msgs) => msgs.length >= numMessages) : nodeMessages.some((msgs) => msgs.length >= numMessages); if (hasEnoughMessages) { resolve(true); return; } setTimeout(checkMessages, 100); }); } else { setTimeout(checkMessages, 100); } }; // Start checking checkMessages(); }); // Race between timeout and message checking return Promise.race([timeoutPromise, checkMessagesPromise]); } catch (error) { log.error("Error in waitForMessages:", error); return false; } } /** * Waits for a total number of messages across all nodes using autosharding. */ public async waitForMessagesAutosharding( numMessages: number, options?: { contentTopic: string; timeoutDuration?: number; exact?: boolean; } ): Promise { const startTime = Date.now(); const timeoutDuration = options?.timeoutDuration || 400; const exact = options?.exact || false; while (this.messageList.length < numMessages) { if (this.relayNodes) { if (this.strictChecking) { // In strict mode, all nodes must have the messages const results = await Promise.all( this.messageCollectors.map(async (collector) => { return collector.waitForMessagesAutosharding( numMessages, options ); }) ); if (results.every((result) => result)) { return true; } } else { // In non-strict mode, at least one node must have the messages const results = await Promise.all( this.messageCollectors.map(async (collector) => { return collector.waitForMessagesAutosharding( numMessages, options ); }) ); if (results.some((result) => result)) { return true; } } if (Date.now() - startTime > timeoutDuration * numMessages) { return false; } await delay(10); } else { // If no relay nodes, just wait for messages in the list if (Date.now() - startTime > timeoutDuration * numMessages) { return false; } await delay(10); } } if (exact) { if (this.messageList.length == numMessages) { return true; } else { log.warn( `Was expecting exactly ${numMessages} messages. Received: ${this.messageList.length}` ); return false; } } else { return true; } } } function getArgs(networkConfig: NetworkConfig, args?: Args): Args { const pubsubTopics = derivePubsubTopicsFromNetworkConfig(networkConfig); const defaultArgs = { lightpush: true, filter: true, discv5Discovery: true, peerExchange: true, relay: true, pubsubTopic: pubsubTopics, clusterId: networkConfig.clusterId } as Args; return { ...defaultArgs, ...args }; }