chore(lightpush): use multiple service nodes for lightpush (instead of just one)

- nwaku now expects >=1 nodes at least connected
This commit is contained in:
Danish Arora 2025-01-17 15:47:36 +05:30
parent bcea5cf04a
commit 661d274b90
No known key found for this signature in database
GPG Key ID: 1C6EF37CDAE1426E

View File

@ -16,24 +16,20 @@ import {
} from "@waku/utils"; } from "@waku/utils";
import { utf8ToBytes } from "@waku/utils/bytes"; import { utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai"; import { expect } from "chai";
import { Context } from "mocha";
import { import {
afterEachCustom, afterEachCustom,
beforeEachCustom, beforeEachCustom,
makeLogFileName, runMultipleNodes,
MessageCollector, ServiceNodesFleet,
ServiceNode,
tearDownNodes tearDownNodes
} from "../../../src/index.js"; } from "../../../src/index.js";
import { messageText, runNodes } from "../utils.js"; import { messageText } from "../utils.js";
describe("Waku Light Push : Multiple PubsubTopics", function () { describe("Waku Light Push : Multiple PubsubTopics", function () {
this.timeout(30000); this.timeout(30000);
let waku: LightNode; let waku: LightNode;
let nwaku: ServiceNode; let serviceNodes: ServiceNodesFleet;
let nwaku2: ServiceNode;
let messageCollector: MessageCollector;
const shardInfo: ShardInfo = { clusterId: 3, shards: [1, 2] }; const shardInfo: ShardInfo = { clusterId: 3, shards: [1, 2] };
const singleShardInfo1: SingleShardInfo = { clusterId: 3, shard: 1 }; const singleShardInfo1: SingleShardInfo = { clusterId: 3, shard: 1 };
@ -55,13 +51,19 @@ describe("Waku Light Push : Multiple PubsubTopics", function () {
let node1PeerId: PeerId; let node1PeerId: PeerId;
beforeEachCustom(this, async () => { beforeEachCustom(this, async () => {
[nwaku, waku] = await runNodes(this.ctx, shardInfo); [serviceNodes, waku] = await runMultipleNodes(
messageCollector = new MessageCollector(nwaku); this.ctx,
node1PeerId = await nwaku.getPeerId(); shardInfo,
undefined,
true,
2,
true
);
node1PeerId = await serviceNodes.nodes[0].getPeerId();
}); });
afterEachCustom(this, async () => { afterEachCustom(this, async () => {
await tearDownNodes([nwaku, nwaku2], waku); await tearDownNodes(serviceNodes.nodes, waku);
}); });
it("Push message on custom pubsubTopic", async function () { it("Push message on custom pubsubTopic", async function () {
@ -72,11 +74,11 @@ describe("Waku Light Push : Multiple PubsubTopics", function () {
expect(pushResponse.successes[0].toString()).to.eq(node1PeerId.toString()); expect(pushResponse.successes[0].toString()).to.eq(node1PeerId.toString());
expect( expect(
await messageCollector.waitForMessages(1, { await serviceNodes.messageCollector.waitForMessages(1, {
pubsubTopic: customPubsubTopic1 pubsubTopic: customPubsubTopic1
}) })
).to.eq(true); ).to.eq(true);
messageCollector.verifyReceivedMessage(0, { serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText, expectedMessageText: messageText,
expectedContentTopic: customContentTopic1 expectedContentTopic: customContentTopic1
}); });
@ -92,50 +94,48 @@ describe("Waku Light Push : Multiple PubsubTopics", function () {
expect(pushResponse1.successes[0].toString()).to.eq(node1PeerId.toString()); expect(pushResponse1.successes[0].toString()).to.eq(node1PeerId.toString());
expect(pushResponse2.successes[0].toString()).to.eq(node1PeerId.toString()); expect(pushResponse2.successes[0].toString()).to.eq(node1PeerId.toString());
const messageCollector2 = new MessageCollector(nwaku);
expect( expect(
await messageCollector.waitForMessages(1, { await serviceNodes.messageCollector.waitForMessages(1, {
pubsubTopic: customPubsubTopic1 pubsubTopic: customPubsubTopic1
}) })
).to.eq(true); ).to.eq(true);
expect( expect(
await messageCollector2.waitForMessages(1, { await serviceNodes.messageCollector.waitForMessages(1, {
pubsubTopic: customPubsubTopic2 pubsubTopic: customPubsubTopic2
}) })
).to.eq(true); ).to.eq(true);
messageCollector.verifyReceivedMessage(0, { serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: "M1", expectedMessageText: "M1",
expectedContentTopic: customContentTopic1, expectedContentTopic: customContentTopic1,
expectedPubsubTopic: customPubsubTopic1 expectedPubsubTopic: customPubsubTopic1
}); });
messageCollector2.verifyReceivedMessage(0, { serviceNodes.messageCollector.verifyReceivedMessage(1, {
expectedMessageText: "M2", expectedMessageText: "M2",
expectedContentTopic: customContentTopic2, expectedContentTopic: customContentTopic2,
expectedPubsubTopic: customPubsubTopic2 expectedPubsubTopic: customPubsubTopic2
}); });
}); });
it("Light push messages to 2 nwaku nodes each with different pubsubtopics", async function () { it("Light push messages to 2 service nodes each with different pubsubtopics", async function () {
// Set up and start a new nwaku node with Default PubsubTopic const [serviceNodes2, waku2] = await runMultipleNodes(
nwaku2 = new ServiceNode(makeLogFileName(this) + "2"); this.ctx,
await nwaku2.start({ {
filter: true, clusterId: singleShardInfo2.clusterId,
lightpush: true, shards: [singleShardInfo2.shard!]
relay: true, },
pubsubTopic: [singleShardInfoToPubsubTopic(singleShardInfo2)], undefined,
clusterId: singleShardInfo2.clusterId true,
}); 1
await nwaku2.ensureSubscriptions([ );
await serviceNodes2.nodes[0].ensureSubscriptions([
singleShardInfoToPubsubTopic(singleShardInfo2) singleShardInfoToPubsubTopic(singleShardInfo2)
]); ]);
await waku.dial(await nwaku2.getMultiaddrWithId()); await waku.dial(await serviceNodes2.nodes[0].getMultiaddrWithId());
await waku.waitForPeers([Protocols.LightPush]); await waku.waitForPeers([Protocols.LightPush]);
const messageCollector2 = new MessageCollector(nwaku2);
await waku.lightPush.send(customEncoder1, { await waku.lightPush.send(customEncoder1, {
payload: utf8ToBytes("M1") payload: utf8ToBytes("M1")
}); });
@ -143,33 +143,33 @@ describe("Waku Light Push : Multiple PubsubTopics", function () {
payload: utf8ToBytes("M2") payload: utf8ToBytes("M2")
}); });
await messageCollector.waitForMessages(1, { await serviceNodes.messageCollector.waitForMessages(1, {
pubsubTopic: customPubsubTopic1 pubsubTopic: customPubsubTopic1
}); });
await serviceNodes2.messageCollector.waitForMessages(1, {
await messageCollector2.waitForMessages(1, {
pubsubTopic: singleShardInfoToPubsubTopic(singleShardInfo2) pubsubTopic: singleShardInfoToPubsubTopic(singleShardInfo2)
}); });
messageCollector.verifyReceivedMessage(0, { serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: "M1", expectedMessageText: "M1",
expectedContentTopic: customContentTopic1, expectedContentTopic: customContentTopic1,
expectedPubsubTopic: customPubsubTopic1 expectedPubsubTopic: customPubsubTopic1
}); });
messageCollector2.verifyReceivedMessage(0, { serviceNodes2.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: "M2", expectedMessageText: "M2",
expectedContentTopic: customContentTopic2, expectedContentTopic: customContentTopic2,
expectedPubsubTopic: singleShardInfoToPubsubTopic(singleShardInfo2) expectedPubsubTopic: singleShardInfoToPubsubTopic(singleShardInfo2)
}); });
// Clean up second fleet
await tearDownNodes(serviceNodes2.nodes, waku2);
}); });
}); });
describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () { describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () {
this.timeout(30000); this.timeout(30000);
let waku: LightNode; let waku: LightNode;
let nwaku: ServiceNode; let serviceNodes: ServiceNodesFleet;
let nwaku2: ServiceNode;
let messageCollector: MessageCollector;
const clusterId = 4; const clusterId = 4;
const customContentTopic1 = "/waku/2/content/test.js"; const customContentTopic1 = "/waku/2/content/test.js";
@ -198,13 +198,19 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () {
let node1PeerId: PeerId; let node1PeerId: PeerId;
beforeEachCustom(this, async () => { beforeEachCustom(this, async () => {
[nwaku, waku] = await runNodes(this.ctx, shardInfo); [serviceNodes, waku] = await runMultipleNodes(
messageCollector = new MessageCollector(nwaku); this.ctx,
node1PeerId = await nwaku.getPeerId(); shardInfo,
undefined,
true,
2,
true
);
node1PeerId = await serviceNodes.nodes[0].getPeerId();
}); });
afterEachCustom(this, async () => { afterEachCustom(this, async () => {
await tearDownNodes([nwaku, nwaku2], waku); await tearDownNodes(serviceNodes.nodes, waku);
}); });
it("Push message on custom pubsubTopic", async function () { it("Push message on custom pubsubTopic", async function () {
@ -212,15 +218,14 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () {
payload: utf8ToBytes(messageText) payload: utf8ToBytes(messageText)
}); });
expect(pushResponse.failures).to.be.empty;
expect(pushResponse.successes[0].toString()).to.eq(node1PeerId.toString()); expect(pushResponse.successes[0].toString()).to.eq(node1PeerId.toString());
expect( expect(
await messageCollector.waitForMessagesAutosharding(1, { await serviceNodes.messageCollector.waitForMessages(1, {
contentTopic: customContentTopic1 pubsubTopic: autoshardingPubsubTopic1
}) })
).to.eq(true); ).to.eq(true);
messageCollector.verifyReceivedMessage(0, { serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText, expectedMessageText: messageText,
expectedContentTopic: customContentTopic1 expectedContentTopic: customContentTopic1
}); });
@ -236,47 +241,45 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () {
expect(pushResponse1.successes[0].toString()).to.eq(node1PeerId.toString()); expect(pushResponse1.successes[0].toString()).to.eq(node1PeerId.toString());
expect(pushResponse2.successes[0].toString()).to.eq(node1PeerId.toString()); expect(pushResponse2.successes[0].toString()).to.eq(node1PeerId.toString());
const messageCollector2 = new MessageCollector(nwaku);
expect( expect(
await messageCollector.waitForMessagesAutosharding(1, { await serviceNodes.messageCollector.waitForMessages(1, {
contentTopic: customContentTopic1 pubsubTopic: autoshardingPubsubTopic1
}) })
).to.eq(true); ).to.eq(true);
expect( expect(
await messageCollector2.waitForMessagesAutosharding(1, { await serviceNodes.messageCollector.waitForMessages(1, {
contentTopic: customContentTopic2 pubsubTopic: autoshardingPubsubTopic2
}) })
).to.eq(true); ).to.eq(true);
messageCollector.verifyReceivedMessage(0, { serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: "M1", expectedMessageText: "M1",
expectedContentTopic: customContentTopic1, expectedContentTopic: customContentTopic1,
expectedPubsubTopic: autoshardingPubsubTopic1 expectedPubsubTopic: autoshardingPubsubTopic1
}); });
messageCollector2.verifyReceivedMessage(0, { serviceNodes.messageCollector.verifyReceivedMessage(1, {
expectedMessageText: "M2", expectedMessageText: "M2",
expectedContentTopic: customContentTopic2, expectedContentTopic: customContentTopic2,
expectedPubsubTopic: autoshardingPubsubTopic2 expectedPubsubTopic: autoshardingPubsubTopic2
}); });
}); });
it("Light push messages to 2 nwaku nodes each with different pubsubtopics", async function () { it("Light push messages to 2 service nodes each with different pubsubtopics", async function () {
// Set up and start a new nwaku node with Default PubsubTopic // Create a second fleet for the second pubsub topic
nwaku2 = new ServiceNode(makeLogFileName(this) + "2"); const [serviceNodes2, waku2] = await runMultipleNodes(
await nwaku2.start({ this.ctx,
filter: true, { clusterId, contentTopics: [customContentTopic2] },
lightpush: true, undefined,
relay: true, true,
pubsubTopic: [autoshardingPubsubTopic2], 1 // Only need one node for second fleet
clusterId: shardInfo.clusterId );
});
await nwaku2.ensureSubscriptionsAutosharding([customContentTopic2]);
await waku.dial(await nwaku2.getMultiaddrWithId());
await waku.waitForPeers([Protocols.LightPush]);
const messageCollector2 = new MessageCollector(nwaku2); await serviceNodes2.nodes[0].ensureSubscriptionsAutosharding([
customContentTopic2
]);
await waku.dial(await serviceNodes2.nodes[0].getMultiaddrWithId());
await waku.waitForPeers([Protocols.LightPush]);
await waku.lightPush.send(customEncoder1, { await waku.lightPush.send(customEncoder1, {
payload: utf8ToBytes("M1") payload: utf8ToBytes("M1")
@ -285,34 +288,33 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () {
payload: utf8ToBytes("M2") payload: utf8ToBytes("M2")
}); });
await messageCollector.waitForMessagesAutosharding(1, { await serviceNodes.messageCollector.waitForMessagesAutosharding(1, {
contentTopic: customContentTopic1 contentTopic: customContentTopic1
}); });
await messageCollector2.waitForMessagesAutosharding(1, { await serviceNodes2.messageCollector.waitForMessagesAutosharding(1, {
contentTopic: customContentTopic2 contentTopic: customContentTopic2
}); });
messageCollector.verifyReceivedMessage(0, { serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: "M1", expectedMessageText: "M1",
expectedContentTopic: customContentTopic1, expectedContentTopic: customContentTopic1,
expectedPubsubTopic: autoshardingPubsubTopic1 expectedPubsubTopic: autoshardingPubsubTopic1
}); });
messageCollector2.verifyReceivedMessage(0, { serviceNodes2.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: "M2", expectedMessageText: "M2",
expectedContentTopic: customContentTopic2, expectedContentTopic: customContentTopic2,
expectedPubsubTopic: autoshardingPubsubTopic2 expectedPubsubTopic: autoshardingPubsubTopic2
}); });
// Clean up second fleet
await tearDownNodes(serviceNodes2.nodes, waku2);
}); });
}); });
describe("Waku Light Push (named sharding): Multiple PubsubTopics", function () { describe("Waku Light Push (named sharding): Multiple PubsubTopics", function () {
this.timeout(30000); this.timeout(30000);
let waku: LightNode; let waku: LightNode;
let waku2: LightNode; let serviceNodes: ServiceNodesFleet;
let nwaku: ServiceNode;
let nwaku2: ServiceNode;
let messageCollector: MessageCollector;
let ctx: Context;
const clusterId = 3; const clusterId = 3;
const customContentTopic1 = "/waku/2/content/utf8"; const customContentTopic1 = "/waku/2/content/utf8";
@ -355,14 +357,19 @@ describe("Waku Light Push (named sharding): Multiple PubsubTopics", function ()
let node1PeerId: PeerId; let node1PeerId: PeerId;
beforeEachCustom(this, async () => { beforeEachCustom(this, async () => {
ctx = this.ctx; [serviceNodes, waku] = await runMultipleNodes(
[nwaku, waku] = await runNodes(ctx, testShardInfo); this.ctx,
messageCollector = new MessageCollector(nwaku); testShardInfo,
node1PeerId = await nwaku.getPeerId(); undefined,
true,
2,
true
);
node1PeerId = await serviceNodes.nodes[0].getPeerId();
}); });
afterEachCustom(this, async () => { afterEachCustom(this, async () => {
await tearDownNodes([nwaku, nwaku2], [waku, waku2]); await tearDownNodes(serviceNodes.nodes, waku);
}); });
it("Push message on custom pubsubTopic", async function () { it("Push message on custom pubsubTopic", async function () {
@ -373,11 +380,11 @@ describe("Waku Light Push (named sharding): Multiple PubsubTopics", function ()
expect(pushResponse.successes[0].toString()).to.eq(node1PeerId.toString()); expect(pushResponse.successes[0].toString()).to.eq(node1PeerId.toString());
expect( expect(
await messageCollector.waitForMessages(1, { await serviceNodes.messageCollector.waitForMessages(1, {
pubsubTopic: autoshardingPubsubTopic1 pubsubTopic: autoshardingPubsubTopic1
}) })
).to.eq(true); ).to.eq(true);
messageCollector.verifyReceivedMessage(0, { serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText, expectedMessageText: messageText,
expectedContentTopic: customContentTopic1 expectedContentTopic: customContentTopic1
}); });
@ -393,68 +400,71 @@ describe("Waku Light Push (named sharding): Multiple PubsubTopics", function ()
expect(pushResponse1.successes[0].toString()).to.eq(node1PeerId.toString()); expect(pushResponse1.successes[0].toString()).to.eq(node1PeerId.toString());
expect(pushResponse2.successes[0].toString()).to.eq(node1PeerId.toString()); expect(pushResponse2.successes[0].toString()).to.eq(node1PeerId.toString());
const messageCollector2 = new MessageCollector(nwaku);
expect( expect(
await messageCollector.waitForMessages(1, { await serviceNodes.messageCollector.waitForMessages(1, {
pubsubTopic: autoshardingPubsubTopic1 pubsubTopic: autoshardingPubsubTopic1
}) })
).to.eq(true); ).to.eq(true);
expect( expect(
await messageCollector2.waitForMessages(1, { await serviceNodes.messageCollector.waitForMessages(1, {
pubsubTopic: autoshardingPubsubTopic2 pubsubTopic: autoshardingPubsubTopic2
}) })
).to.eq(true); ).to.eq(true);
messageCollector.verifyReceivedMessage(0, { serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: "M1", expectedMessageText: "M1",
expectedContentTopic: customContentTopic1, expectedContentTopic: customContentTopic1,
expectedPubsubTopic: autoshardingPubsubTopic1 expectedPubsubTopic: autoshardingPubsubTopic1
}); });
messageCollector2.verifyReceivedMessage(0, { serviceNodes.messageCollector.verifyReceivedMessage(1, {
expectedMessageText: "M2", expectedMessageText: "M2",
expectedContentTopic: customContentTopic2, expectedContentTopic: customContentTopic2,
expectedPubsubTopic: autoshardingPubsubTopic2 expectedPubsubTopic: autoshardingPubsubTopic2
}); });
}); });
it("Light push messages to 2 nwaku nodes each with different pubsubtopics", async function () { it("Light push messages to 2 service nodes each with different pubsubtopics", async function () {
// Set up and start a new nwaku node with Default PubsubTopic const [serviceNodes2, waku2] = await runMultipleNodes(
[nwaku2] = await runNodes(ctx, shardInfo2); this.ctx,
shardInfo2,
undefined,
true,
1
);
await nwaku2.ensureSubscriptions([autoshardingPubsubTopic2]); await serviceNodes2.nodes[0].ensureSubscriptions([
await waku.dial(await nwaku2.getMultiaddrWithId()); autoshardingPubsubTopic2
]);
await waku.dial(await serviceNodes2.nodes[0].getMultiaddrWithId());
await waku.waitForPeers([Protocols.LightPush]); await waku.waitForPeers([Protocols.LightPush]);
const messageCollector2 = new MessageCollector(nwaku2); await waku.lightPush.send(customEncoder1, {
const { failures: f1 } = await waku.lightPush.send(customEncoder1, {
payload: utf8ToBytes("M1") payload: utf8ToBytes("M1")
}); });
const { failures: f2 } = await waku.lightPush.send(customEncoder2, { await waku.lightPush.send(customEncoder2, {
payload: utf8ToBytes("M2") payload: utf8ToBytes("M2")
}); });
expect(f1).to.be.empty; await serviceNodes.messageCollector.waitForMessages(1, {
expect(f2).to.be.empty;
await messageCollector.waitForMessages(1, {
pubsubTopic: autoshardingPubsubTopic1 pubsubTopic: autoshardingPubsubTopic1
}); });
await messageCollector2.waitForMessages(1, { await serviceNodes2.messageCollector.waitForMessages(1, {
pubsubTopic: autoshardingPubsubTopic2 pubsubTopic: autoshardingPubsubTopic2
}); });
messageCollector.verifyReceivedMessage(0, { serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: "M1", expectedMessageText: "M1",
expectedContentTopic: customContentTopic1, expectedContentTopic: customContentTopic1,
expectedPubsubTopic: autoshardingPubsubTopic1 expectedPubsubTopic: autoshardingPubsubTopic1
}); });
messageCollector2.verifyReceivedMessage(0, { serviceNodes2.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: "M2", expectedMessageText: "M2",
expectedContentTopic: customContentTopic2, expectedContentTopic: customContentTopic2,
expectedPubsubTopic: autoshardingPubsubTopic2 expectedPubsubTopic: autoshardingPubsubTopic2
}); });
// Clean up second fleet
await tearDownNodes(serviceNodes2.nodes, waku2);
}); });
}); });