mirror of
https://github.com/logos-messaging/js-waku.git
synced 2026-02-20 05:43:11 +00:00
Concepts are being mixed up between the global network config (static vs auto sharding), that needs to be the same of all nodes in the network, individual node configuration (eg relay node subscribing to a given shard), and the routing characteristic of a specific message (eg pubsub topic, shard). This stops proper configuration of nwaku post 0.36.0 because we know need to be deliberate on whether nwaku nodes are running with auto or static sharding. It also included various back and forth conversions between shards, pubsub topics, etc. With this change, we tidy up the network configuration, and make it explicit whether it is static or auto sharded. We also introduce the concept of routing info, which is specific to a message, and tied to the overall network configuration. Routing info abstract pubsub topic, shard, and autosharding needs. Which should lead to easier tidy up of the pubsub concept at a later stage. # Conflicts: # packages/core/src/lib/connection_manager/connection_manager.ts # packages/core/src/lib/metadata/metadata.ts # packages/interfaces/src/metadata.ts # packages/interfaces/src/sharding.ts # packages/relay/src/create.ts # packages/sdk/src/filter/filter.ts # packages/sdk/src/filter/types.ts # packages/sdk/src/light_push/light_push.spec.ts # packages/tests/tests/sharding/auto_sharding.spec.ts # packages/tests/tests/sharding/static_sharding.spec.ts # Conflicts: # packages/sdk/src/store/store.ts
352 lines
10 KiB
TypeScript
352 lines
10 KiB
TypeScript
import { MetadataCodec } from "@waku/core";
|
|
import type { LightNode } from "@waku/interfaces";
|
|
import { createLightNode } from "@waku/sdk";
|
|
import { decodeRelayShard } from "@waku/utils";
|
|
import chai, { expect } from "chai";
|
|
import chaiAsPromised from "chai-as-promised";
|
|
|
|
import {
|
|
afterEachCustom,
|
|
beforeEachCustom,
|
|
delay,
|
|
makeLogFileName,
|
|
ServiceNode,
|
|
tearDownNodes
|
|
} from "../src/index.js";
|
|
|
|
chai.use(chaiAsPromised);
|
|
|
|
describe("Metadata Protocol", function () {
|
|
this.timeout(55000);
|
|
let waku: LightNode;
|
|
let nwaku1: ServiceNode;
|
|
|
|
beforeEachCustom(this, async () => {
|
|
nwaku1 = new ServiceNode(makeLogFileName(this.ctx) + "1");
|
|
});
|
|
|
|
afterEachCustom(this, async () => {
|
|
await tearDownNodes([nwaku1], waku);
|
|
});
|
|
|
|
describe("static sharding", function () {
|
|
it("same cluster, static sharding: nodes connect", async function () {
|
|
const clusterId = 2;
|
|
const shards = [1];
|
|
const numShardsInCluster = 8;
|
|
|
|
await nwaku1.start({
|
|
relay: true,
|
|
discv5Discovery: true,
|
|
peerExchange: true,
|
|
clusterId,
|
|
shard: shards,
|
|
numShardsInNetwork: numShardsInCluster
|
|
});
|
|
|
|
const nwaku1Ma = await nwaku1.getMultiaddrWithId();
|
|
const nwaku1PeerId = await nwaku1.getPeerId();
|
|
|
|
waku = await createLightNode({
|
|
networkConfig: { clusterId, numShardsInCluster }
|
|
});
|
|
await waku.start();
|
|
await waku.libp2p.dialProtocol(nwaku1Ma, MetadataCodec);
|
|
|
|
if (!waku.libp2p.services.metadata) {
|
|
expect(waku.libp2p.services.metadata).to.not.be.undefined;
|
|
return;
|
|
}
|
|
|
|
const { error, shardInfo: shardInfoRes } =
|
|
await waku.libp2p.services.metadata.query(nwaku1PeerId);
|
|
|
|
if (error) {
|
|
expect(error).to.be.null;
|
|
return;
|
|
}
|
|
|
|
expect(shardInfoRes).to.not.be.undefined;
|
|
expect(shardInfoRes.clusterId).to.equal(clusterId);
|
|
expect(shardInfoRes.shards).to.include.members(shards);
|
|
|
|
const activeConnections = waku.libp2p.getConnections();
|
|
expect(activeConnections.length).to.equal(1);
|
|
});
|
|
|
|
it("different cluster: nodes don't connect", async function () {
|
|
const clusterIdNwaku = 2;
|
|
const custerIdJsWaku = 3;
|
|
const shards = [1];
|
|
const numShardsInCluster = 8;
|
|
|
|
await nwaku1.start({
|
|
relay: true,
|
|
discv5Discovery: true,
|
|
peerExchange: true,
|
|
clusterId: clusterIdNwaku,
|
|
shard: shards,
|
|
numShardsInNetwork: numShardsInCluster
|
|
});
|
|
|
|
const nwaku1Ma = await nwaku1.getMultiaddrWithId();
|
|
|
|
waku = await createLightNode({
|
|
networkConfig: { clusterId: custerIdJsWaku, numShardsInCluster }
|
|
});
|
|
await waku.start();
|
|
await waku.libp2p.dialProtocol(nwaku1Ma, MetadataCodec);
|
|
|
|
// ensure the connection is closed from the other side
|
|
let counter = 0;
|
|
while (waku.libp2p.getConnections().length !== 0) {
|
|
if (counter > 10) {
|
|
break;
|
|
}
|
|
await delay(100);
|
|
counter++;
|
|
}
|
|
|
|
expect(waku.libp2p.getConnections().length).to.equal(0);
|
|
});
|
|
|
|
it("PeerStore has remote peer's shard info after successful connection", async function () {
|
|
const clusterId = 2;
|
|
const shards = [1];
|
|
const numShardsInCluster = 8;
|
|
|
|
await nwaku1.start({
|
|
relay: true,
|
|
discv5Discovery: true,
|
|
peerExchange: true,
|
|
clusterId,
|
|
shard: shards,
|
|
numShardsInNetwork: numShardsInCluster
|
|
});
|
|
|
|
const nwaku1Ma = await nwaku1.getMultiaddrWithId();
|
|
const nwaku1PeerId = await nwaku1.getPeerId();
|
|
|
|
waku = await createLightNode({
|
|
networkConfig: { clusterId, numShardsInCluster }
|
|
});
|
|
await waku.start();
|
|
await waku.libp2p.dialProtocol(nwaku1Ma, MetadataCodec);
|
|
|
|
// delay to ensure the connection is estabilished and shardInfo is updated
|
|
await delay(500);
|
|
|
|
const encodedShardInfo = (
|
|
await waku.libp2p.peerStore.get(nwaku1PeerId)
|
|
).metadata.get("shardInfo");
|
|
expect(encodedShardInfo).to.not.be.undefined;
|
|
|
|
const metadataShardInfo = decodeRelayShard(encodedShardInfo!);
|
|
expect(metadataShardInfo).not.be.undefined;
|
|
|
|
expect(metadataShardInfo!.clusterId).to.eq(clusterId);
|
|
expect(metadataShardInfo.shards).to.include.members(shards);
|
|
});
|
|
|
|
it("receiving a ping from a peer does not overwrite shard info", async function () {
|
|
const clusterId = 2;
|
|
const shards = [1];
|
|
const numShardsInCluster = 8;
|
|
|
|
await nwaku1.start({
|
|
relay: true,
|
|
discv5Discovery: true,
|
|
peerExchange: true,
|
|
clusterId,
|
|
shard: shards
|
|
});
|
|
|
|
const nwaku1Ma = await nwaku1.getMultiaddrWithId();
|
|
const nwaku1PeerId = await nwaku1.getPeerId();
|
|
|
|
waku = await createLightNode({
|
|
networkConfig: {
|
|
clusterId,
|
|
numShardsInCluster
|
|
},
|
|
connectionManager: {
|
|
pingKeepAlive: 1
|
|
}
|
|
});
|
|
await waku.start();
|
|
await waku.libp2p.dialProtocol(nwaku1Ma, MetadataCodec);
|
|
|
|
// delay to ensure the connection is estabilished, shardInfo is updated, and there is a ping
|
|
await delay(1500);
|
|
|
|
const metadata = (await waku.libp2p.peerStore.get(nwaku1PeerId)).metadata;
|
|
expect(metadata.get("shardInfo")).to.not.be.undefined;
|
|
|
|
const pingInfo = metadata.get("ping");
|
|
expect(pingInfo).to.not.be.undefined;
|
|
});
|
|
});
|
|
describe("auto sharding", function () {
|
|
it("same cluster: nodes connect", async function () {
|
|
const clusterId = 2;
|
|
const contentTopic = "/foo/1/bar/proto";
|
|
const numShardsInCluster = 0;
|
|
|
|
await nwaku1.start({
|
|
relay: true,
|
|
discv5Discovery: true,
|
|
peerExchange: true,
|
|
clusterId,
|
|
contentTopic: [contentTopic],
|
|
numShardsInNetwork: numShardsInCluster
|
|
});
|
|
|
|
const nwaku1Ma = await nwaku1.getMultiaddrWithId();
|
|
const nwaku1PeerId = await nwaku1.getPeerId();
|
|
|
|
waku = await createLightNode({
|
|
networkConfig: { clusterId, numShardsInCluster }
|
|
});
|
|
await waku.start();
|
|
await waku.libp2p.dialProtocol(nwaku1Ma, MetadataCodec);
|
|
|
|
if (!waku.libp2p.services.metadata) {
|
|
expect(waku.libp2p.services.metadata).to.not.be.undefined;
|
|
return;
|
|
}
|
|
|
|
const { error, shardInfo: shardInfoRes } =
|
|
await waku.libp2p.services.metadata.query(nwaku1PeerId);
|
|
|
|
if (error) {
|
|
expect(error).to.be.null;
|
|
return;
|
|
}
|
|
|
|
expect(shardInfoRes).to.not.be.undefined;
|
|
expect(shardInfoRes.clusterId).to.equal(clusterId);
|
|
// TODO: calculate shards from content topics
|
|
//expect(shardInfoRes.shards).to.include.members(shards);
|
|
|
|
const activeConnections = waku.libp2p.getConnections();
|
|
expect(activeConnections.length).to.equal(1);
|
|
});
|
|
|
|
it("different cluster: nodes don't connect", async function () {
|
|
const clusterIdNwaku = 2;
|
|
const clusterIdJSWaku = 3;
|
|
const contentTopic = ["/foo/1/bar/proto"];
|
|
const numShardsInCluster = 0;
|
|
|
|
await nwaku1.start({
|
|
relay: true,
|
|
discv5Discovery: true,
|
|
peerExchange: true,
|
|
clusterId: clusterIdNwaku,
|
|
contentTopic,
|
|
numShardsInNetwork: numShardsInCluster
|
|
});
|
|
|
|
const nwaku1Ma = await nwaku1.getMultiaddrWithId();
|
|
|
|
waku = await createLightNode({
|
|
networkConfig: {
|
|
clusterId: clusterIdJSWaku,
|
|
numShardsInCluster
|
|
}
|
|
});
|
|
await waku.start();
|
|
await waku.libp2p.dialProtocol(nwaku1Ma, MetadataCodec);
|
|
|
|
// ensure the connection is closed from the other side
|
|
let counter = 0;
|
|
while (waku.libp2p.getConnections().length !== 0) {
|
|
if (counter > 10) {
|
|
console.error("Connection was not closed");
|
|
break;
|
|
}
|
|
await delay(100);
|
|
counter++;
|
|
}
|
|
|
|
expect(waku.libp2p.getConnections().length).to.equal(0);
|
|
});
|
|
|
|
it("PeerStore has remote peer's shard info after successful connection", async function () {
|
|
const clusterId = 2;
|
|
const contentTopic = ["/foo/1/bar/proto"];
|
|
const numShardsInCluster = 0;
|
|
|
|
await nwaku1.start({
|
|
relay: true,
|
|
discv5Discovery: true,
|
|
peerExchange: true,
|
|
clusterId,
|
|
contentTopic
|
|
});
|
|
|
|
const nwaku1Ma = await nwaku1.getMultiaddrWithId();
|
|
const nwaku1PeerId = await nwaku1.getPeerId();
|
|
|
|
waku = await createLightNode({
|
|
networkConfig: { clusterId, numShardsInCluster }
|
|
});
|
|
await waku.start();
|
|
await waku.libp2p.dialProtocol(nwaku1Ma, MetadataCodec);
|
|
|
|
// delay to ensure the connection is estabilished and shardInfo is updated
|
|
await delay(500);
|
|
|
|
const encodedShardInfo = (
|
|
await waku.libp2p.peerStore.get(nwaku1PeerId)
|
|
).metadata.get("shardInfo");
|
|
expect(encodedShardInfo).to.not.be.undefined;
|
|
|
|
const metadataShardInfo = decodeRelayShard(encodedShardInfo!);
|
|
expect(metadataShardInfo).not.be.undefined;
|
|
|
|
expect(metadataShardInfo!.clusterId).to.eq(clusterId);
|
|
// TODO derive shard from content topic
|
|
// expect(metadataShardInfo.shards).to.include.members(shards);
|
|
});
|
|
|
|
it("receiving a ping from a peer does not overwrite shard info", async function () {
|
|
const clusterId = 2;
|
|
const contentTopic = ["/foo/1/bar/proto"];
|
|
const numShardsInCluster = 0;
|
|
|
|
await nwaku1.start({
|
|
relay: true,
|
|
discv5Discovery: true,
|
|
peerExchange: true,
|
|
clusterId,
|
|
contentTopic
|
|
});
|
|
|
|
const nwaku1Ma = await nwaku1.getMultiaddrWithId();
|
|
const nwaku1PeerId = await nwaku1.getPeerId();
|
|
|
|
waku = await createLightNode({
|
|
networkConfig: {
|
|
clusterId,
|
|
numShardsInCluster
|
|
},
|
|
connectionManager: {
|
|
pingKeepAlive: 1
|
|
}
|
|
});
|
|
await waku.start();
|
|
await waku.libp2p.dialProtocol(nwaku1Ma, MetadataCodec);
|
|
|
|
// delay to ensure the connection is estabilished, shardInfo is updated, and there is a ping
|
|
await delay(1500);
|
|
|
|
const metadata = (await waku.libp2p.peerStore.get(nwaku1PeerId)).metadata;
|
|
expect(metadata.get("shardInfo")).to.not.be.undefined;
|
|
|
|
const pingInfo = metadata.get("ping");
|
|
expect(pingInfo).to.not.be.undefined;
|
|
});
|
|
});
|
|
});
|