From bcea5cf04af817cc130043f376578f7b0e00b5eb Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Fri, 17 Jan 2025 15:46:53 +0530 Subject: [PATCH] feat: connect nwaku nodes amongst each other over relay --- packages/tests/src/lib/dockerode.ts | 2 +- packages/tests/src/lib/index.ts | 119 ++++++++++++++++++++----- packages/tests/src/lib/service_node.ts | 9 ++ 3 files changed, 108 insertions(+), 22 deletions(-) diff --git a/packages/tests/src/lib/dockerode.ts b/packages/tests/src/lib/dockerode.ts index 823a02859b..2428b67cb9 100644 --- a/packages/tests/src/lib/dockerode.ts +++ b/packages/tests/src/lib/dockerode.ts @@ -18,7 +18,7 @@ export default class Dockerode { public containerId?: string; private static network: Docker.Network; - private containerIp: string; + public readonly containerIp: string; private constructor(imageName: string, containerIp: string) { this.docker = new Docker(); diff --git a/packages/tests/src/lib/index.ts b/packages/tests/src/lib/index.ts index 8f804633a3..2a5c02acdf 100644 --- a/packages/tests/src/lib/index.ts +++ b/packages/tests/src/lib/index.ts @@ -29,24 +29,28 @@ export class ServiceNodesFleet { _args?: Args, withoutFilter = false ): Promise { - const serviceNodePromises = Array.from( - { length: nodesToCreate }, - async () => { - const node = new ServiceNode( - makeLogFileName(mochaContext) + - Math.random().toString(36).substring(7) - ); + const nodes: ServiceNode[] = []; - const args = getArgs(networkConfig, _args); - await node.start(args, { - retries: 3 - }); + for (let index = 0; index < nodesToCreate; index++) { + const node = new ServiceNode( + makeLogFileName(mochaContext) + Math.random().toString(36).substring(7) + ); - return node; + const args = getArgs(networkConfig, _args); + + // If this is not the first node and previous node had a nodekey, use its multiaddr as static node + if (index > 0) { + const prevNode = nodes[index - 1]; + const multiaddr = await prevNode.getExternalWebsocketMultiaddr(); + args.staticnode = multiaddr; } - ); - const nodes = await Promise.all(serviceNodePromises); + await node.start(args, { + retries: 3 + }); + + nodes.push(node); + } return new ServiceNodesFleet(nodes, withoutFilter, strictChecking); } @@ -182,21 +186,21 @@ class MultipleNodesMessageCollector { } ): boolean { if (this.strictChecking) { - return this.messageCollectors.every((collector) => { + return this.messageCollectors.every((collector, _i) => { try { collector.verifyReceivedMessage(index, options); - return true; // Verification successful + return true; } catch (error) { - return false; // Verification failed, continue with the next collector + return false; } }); } else { - return this.messageCollectors.some((collector) => { + return this.messageCollectors.some((collector, _i) => { try { collector.verifyReceivedMessage(index, options); - return true; // Verification successful + return true; } catch (error) { - return false; // Verification failed, continue with the next collector + return false; } }); } @@ -239,7 +243,8 @@ class MultipleNodesMessageCollector { } } - if (Date.now() - startTime > timeoutDuration * numMessages) { + const elapsed = Date.now() - startTime; + if (elapsed > timeoutDuration * numMessages) { return false; } @@ -253,7 +258,79 @@ class MultipleNodesMessageCollector { log.warn( `Was expecting exactly ${numMessages} messages. Received: ${this.messageList.length}` ); + return false; + } + } else { + return true; + } + } + /** + * Waits for a total number of messages across all nodes using autosharding. + */ + public async waitForMessagesAutosharding( + numMessages: number, + options?: { + contentTopic: string; + timeoutDuration?: number; + exact?: boolean; + } + ): Promise { + const startTime = Date.now(); + const timeoutDuration = options?.timeoutDuration || 400; + const exact = options?.exact || false; + + while (this.messageList.length < numMessages) { + if (this.relayNodes) { + if (this.strictChecking) { + // In strict mode, all nodes must have the messages + const results = await Promise.all( + this.messageCollectors.map(async (collector) => { + return collector.waitForMessagesAutosharding( + numMessages, + options + ); + }) + ); + if (results.every((result) => result)) { + return true; + } + } else { + // In non-strict mode, at least one node must have the messages + const results = await Promise.all( + this.messageCollectors.map(async (collector) => { + return collector.waitForMessagesAutosharding( + numMessages, + options + ); + }) + ); + if (results.some((result) => result)) { + return true; + } + } + + if (Date.now() - startTime > timeoutDuration * numMessages) { + return false; + } + + await delay(10); + } else { + // If no relay nodes, just wait for messages in the list + if (Date.now() - startTime > timeoutDuration * numMessages) { + return false; + } + await delay(10); + } + } + + if (exact) { + if (this.messageList.length == numMessages) { + return true; + } else { + log.warn( + `Was expecting exactly ${numMessages} messages. Received: ${this.messageList.length}` + ); return false; } } else { diff --git a/packages/tests/src/lib/service_node.ts b/packages/tests/src/lib/service_node.ts index b443fd6cb2..5dac5d318f 100644 --- a/packages/tests/src/lib/service_node.ts +++ b/packages/tests/src/lib/service_node.ts @@ -402,6 +402,15 @@ export class ServiceNode { throw `${this.type} container hasn't started`; } } + + public async getExternalWebsocketMultiaddr(): Promise { + if (!this.docker?.container) { + return undefined; + } + const containerIp = this.docker.containerIp; + const peerId = await this.getPeerId(); + return `/ip4/${containerIp}/tcp/${this.websocketPort}/ws/p2p/${peerId}`; + } } export function defaultArgs(): Args {