feat: connect nwaku nodes amongst each other over relay

This commit is contained in:
Danish Arora 2025-01-17 15:46:53 +05:30
parent ca9bd98058
commit bcea5cf04a
No known key found for this signature in database
GPG Key ID: 1C6EF37CDAE1426E
3 changed files with 108 additions and 22 deletions

View File

@ -18,7 +18,7 @@ export default class Dockerode {
public containerId?: string;
private static network: Docker.Network;
private containerIp: string;
public readonly containerIp: string;
private constructor(imageName: string, containerIp: string) {
this.docker = new Docker();

View File

@ -29,24 +29,28 @@ export class ServiceNodesFleet {
_args?: Args,
withoutFilter = false
): Promise<ServiceNodesFleet> {
const serviceNodePromises = Array.from(
{ length: nodesToCreate },
async () => {
const node = new ServiceNode(
makeLogFileName(mochaContext) +
Math.random().toString(36).substring(7)
);
const nodes: ServiceNode[] = [];
const args = getArgs(networkConfig, _args);
await node.start(args, {
retries: 3
});
for (let index = 0; index < nodesToCreate; index++) {
const node = new ServiceNode(
makeLogFileName(mochaContext) + Math.random().toString(36).substring(7)
);
return node;
const args = getArgs(networkConfig, _args);
// If this is not the first node and previous node had a nodekey, use its multiaddr as static node
if (index > 0) {
const prevNode = nodes[index - 1];
const multiaddr = await prevNode.getExternalWebsocketMultiaddr();
args.staticnode = multiaddr;
}
);
const nodes = await Promise.all(serviceNodePromises);
await node.start(args, {
retries: 3
});
nodes.push(node);
}
return new ServiceNodesFleet(nodes, withoutFilter, strictChecking);
}
@ -182,21 +186,21 @@ class MultipleNodesMessageCollector {
}
): boolean {
if (this.strictChecking) {
return this.messageCollectors.every((collector) => {
return this.messageCollectors.every((collector, _i) => {
try {
collector.verifyReceivedMessage(index, options);
return true; // Verification successful
return true;
} catch (error) {
return false; // Verification failed, continue with the next collector
return false;
}
});
} else {
return this.messageCollectors.some((collector) => {
return this.messageCollectors.some((collector, _i) => {
try {
collector.verifyReceivedMessage(index, options);
return true; // Verification successful
return true;
} catch (error) {
return false; // Verification failed, continue with the next collector
return false;
}
});
}
@ -239,7 +243,8 @@ class MultipleNodesMessageCollector {
}
}
if (Date.now() - startTime > timeoutDuration * numMessages) {
const elapsed = Date.now() - startTime;
if (elapsed > timeoutDuration * numMessages) {
return false;
}
@ -253,7 +258,79 @@ class MultipleNodesMessageCollector {
log.warn(
`Was expecting exactly ${numMessages} messages. Received: ${this.messageList.length}`
);
return false;
}
} else {
return true;
}
}
/**
* Waits for a total number of messages across all nodes using autosharding.
*/
public async waitForMessagesAutosharding(
numMessages: number,
options?: {
contentTopic: string;
timeoutDuration?: number;
exact?: boolean;
}
): Promise<boolean> {
const startTime = Date.now();
const timeoutDuration = options?.timeoutDuration || 400;
const exact = options?.exact || false;
while (this.messageList.length < numMessages) {
if (this.relayNodes) {
if (this.strictChecking) {
// In strict mode, all nodes must have the messages
const results = await Promise.all(
this.messageCollectors.map(async (collector) => {
return collector.waitForMessagesAutosharding(
numMessages,
options
);
})
);
if (results.every((result) => result)) {
return true;
}
} else {
// In non-strict mode, at least one node must have the messages
const results = await Promise.all(
this.messageCollectors.map(async (collector) => {
return collector.waitForMessagesAutosharding(
numMessages,
options
);
})
);
if (results.some((result) => result)) {
return true;
}
}
if (Date.now() - startTime > timeoutDuration * numMessages) {
return false;
}
await delay(10);
} else {
// If no relay nodes, just wait for messages in the list
if (Date.now() - startTime > timeoutDuration * numMessages) {
return false;
}
await delay(10);
}
}
if (exact) {
if (this.messageList.length == numMessages) {
return true;
} else {
log.warn(
`Was expecting exactly ${numMessages} messages. Received: ${this.messageList.length}`
);
return false;
}
} else {

View File

@ -402,6 +402,15 @@ export class ServiceNode {
throw `${this.type} container hasn't started`;
}
}
public async getExternalWebsocketMultiaddr(): Promise<string | undefined> {
if (!this.docker?.container) {
return undefined;
}
const containerIp = this.docker.containerIp;
const peerId = await this.getPeerId();
return `/ip4/${containerIp}/tcp/${this.websocketPort}/ws/p2p/${peerId}`;
}
}
export function defaultArgs(): Args {