From 124a29ebba59c05fbbf199d969e6ba3f9e57d45b Mon Sep 17 00:00:00 2001 From: Danish Arora <35004822+danisharora099@users.noreply.github.com> Date: Tue, 10 Oct 2023 20:18:02 +0530 Subject: [PATCH] feat(static-sharding): filter peer connections per shards (#1626) * add interface for `ShardInfo` * enr: add deserialization logic & setup getters * add sharding related utils * utils: add shard<-> bytes conversion helpers * pass `pubSubTopics` to `Waku` * add `rs`/`rsv` details during discovery * connection-manager: discard irrelevant peers * add tests for static sharding - peer exchange * update `ConnectionManager` tests to account for topic validity * add js suffix to import * address some comments * move shardInfo encoding to ENR * test: update for new API * enr: add tests for serialisation & deserialisation * address comment * update test * move getPeershardInfo to ConnectionManager and return ShardInfo instead of bytes * update encoding and decoding relay shards to also factor for shards>64 * relay shard encoding decoding: use DataView and verbose spec tests * improve tests for relay shard encoding decoding * rm: only * improve log message for unconfigured pubsub topic * minor improvement * fix: buffer <> Uint8array problems with shard decoding * fix: test * rm: only --- package-lock.json | 4 + packages/core/package.json | 1 + packages/core/src/lib/connection_manager.ts | 52 ++++- packages/core/src/lib/waku.ts | 3 + packages/dns-discovery/src/dns_discovery.ts | 24 ++- packages/enr/package.json | 1 + packages/enr/src/enr.ts | 10 +- packages/enr/src/index.ts | 1 + packages/enr/src/raw_enr.ts | 21 +- packages/enr/src/relay_shard_codec.spec.ts | 68 +++++++ packages/enr/src/relay_shard_codec.ts | 60 ++++++ packages/interfaces/src/enr.ts | 6 + .../src/waku_peer_exchange_discovery.ts | 10 +- packages/sdk/src/create.ts | 24 ++- .../tests/tests/connection_manager.spec.ts | 7 + .../tests/sharding/peer_management.spec.ts | 186 ++++++++++++++++++ .../running_nodes.spec.ts} | 17 +- packages/utils/src/common/sharding.ts | 10 +- 18 files changed, 480 insertions(+), 25 deletions(-) create mode 100644 packages/enr/src/relay_shard_codec.spec.ts create mode 100644 packages/enr/src/relay_shard_codec.ts create mode 100644 packages/tests/tests/sharding/peer_management.spec.ts rename packages/tests/tests/{sharding.spec.ts => sharding/running_nodes.spec.ts} (81%) diff --git a/package-lock.json b/package-lock.json index 383d52c432..a7a08f5558 100644 --- a/package-lock.json +++ b/package-lock.json @@ -25763,6 +25763,7 @@ "license": "MIT OR Apache-2.0", "dependencies": { "@noble/hashes": "^1.3.2", + "@waku/enr": "^0.0.17", "@waku/interfaces": "0.0.18", "@waku/proto": "0.0.5", "@waku/utils": "0.0.11", @@ -25864,6 +25865,7 @@ "@waku/interfaces": "0.0.18", "chai": "^4.3.7", "cspell": "^7.3.2", + "fast-check": "^3.13.1", "mocha": "^10.2.0", "npm-run-all": "^4.1.5", "process": "^0.11.10", @@ -29012,6 +29014,7 @@ "@types/mocha": "^10.0.1", "@types/uuid": "^9.0.3", "@waku/build-utils": "*", + "@waku/enr": "^0.0.17", "@waku/interfaces": "0.0.18", "@waku/proto": "0.0.5", "@waku/utils": "0.0.11", @@ -29116,6 +29119,7 @@ "chai": "^4.3.7", "cspell": "^7.3.2", "debug": "^4.3.4", + "fast-check": "^3.13.1", "js-sha3": "^0.9.2", "mocha": "^10.2.0", "npm-run-all": "^4.1.5", diff --git a/packages/core/package.json b/packages/core/package.json index f3d2c33688..3cec1f8272 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -73,6 +73,7 @@ }, "dependencies": { "@noble/hashes": "^1.3.2", + "@waku/enr": "^0.0.17", "@waku/interfaces": "0.0.18", "@waku/proto": "0.0.5", "@waku/utils": "0.0.11", diff --git a/packages/core/src/lib/connection_manager.ts b/packages/core/src/lib/connection_manager.ts index 293c5bb11f..859b64813a 100644 --- a/packages/core/src/lib/connection_manager.ts +++ b/packages/core/src/lib/connection_manager.ts @@ -1,7 +1,9 @@ import type { PeerId } from "@libp2p/interface/peer-id"; import type { PeerInfo } from "@libp2p/interface/peer-info"; import type { Peer } from "@libp2p/interface/peer-store"; +import type { PeerStore } from "@libp2p/interface/peer-store"; import { CustomEvent, EventEmitter } from "@libp2p/interfaces/events"; +import { decodeRelayShard } from "@waku/enr"; import { ConnectionManagerOptions, EPeersByDiscoveryEvents, @@ -9,9 +11,12 @@ import { IPeersByDiscoveryEvents, IRelay, KeepAliveOptions, - PeersByDiscoveryResult + PeersByDiscoveryResult, + PubSubTopic, + ShardInfo } from "@waku/interfaces"; import { Libp2p, Tags } from "@waku/interfaces"; +import { shardInfoToPubSubTopics } from "@waku/utils"; import debug from "debug"; import { KeepAliveManager } from "./keep_alive_manager.js"; @@ -40,6 +45,7 @@ export class ConnectionManager peerId: string, libp2p: Libp2p, keepAliveOptions: KeepAliveOptions, + pubSubTopics: PubSubTopic[], relay?: IRelay, options?: ConnectionManagerOptions ): ConnectionManager { @@ -48,6 +54,7 @@ export class ConnectionManager instance = new ConnectionManager( libp2p, keepAliveOptions, + pubSubTopics, relay, options ); @@ -104,11 +111,13 @@ export class ConnectionManager private constructor( libp2p: Libp2p, keepAliveOptions: KeepAliveOptions, + private configuredPubSubTopics: PubSubTopic[], relay?: IRelay, options?: Partial ) { super(); this.libp2p = libp2p; + this.configuredPubSubTopics = configuredPubSubTopics; this.options = { maxDialAttemptsForPeer: DEFAULT_MAX_DIAL_ATTEMPTS_FOR_PEER, maxBootstrapPeersAllowed: DEFAULT_MAX_BOOTSTRAP_PEERS_ALLOWED, @@ -314,6 +323,20 @@ export class ConnectionManager void (async () => { const { id: peerId } = evt.detail; + if (!(await this.isPeerTopicConfigured(peerId))) { + const shardInfo = await this.getPeerShardInfo( + peerId, + this.libp2p.peerStore + ); + log( + `Discovered peer ${peerId.toString()} with ShardInfo ${shardInfo} is not part of any of the configured pubsub topics (${ + this.configuredPubSubTopics + }). + Not dialing.` + ); + return; + } + const isBootstrap = (await this.getTagNamesForPeer(peerId)).includes( Tags.BOOTSTRAP ); @@ -430,4 +453,31 @@ export class ConnectionManager return []; } } + + private async isPeerTopicConfigured(peerId: PeerId): Promise { + const shardInfo = await this.getPeerShardInfo( + peerId, + this.libp2p.peerStore + ); + + // If there's no shard information, simply return true + if (!shardInfo) return true; + + const pubSubTopics = shardInfoToPubSubTopics(shardInfo); + + const isTopicConfigured = pubSubTopics.some((topic) => + this.configuredPubSubTopics.includes(topic) + ); + return isTopicConfigured; + } + + private async getPeerShardInfo( + peerId: PeerId, + peerStore: PeerStore + ): Promise { + const peer = await peerStore.get(peerId); + const shardInfoBytes = peer.metadata.get("shardInfo"); + if (!shardInfoBytes) return undefined; + return decodeRelayShard(shardInfoBytes); + } } diff --git a/packages/core/src/lib/waku.ts b/packages/core/src/lib/waku.ts index b17dd23138..d4134ac07f 100644 --- a/packages/core/src/lib/waku.ts +++ b/packages/core/src/lib/waku.ts @@ -7,6 +7,7 @@ import type { IRelay, IStore, Libp2p, + PubSubTopic, Waku } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces"; @@ -52,6 +53,7 @@ export class WakuNode implements Waku { constructor( options: WakuOptions, + public readonly pubSubTopics: PubSubTopic[], libp2p: Libp2p, store?: (libp2p: Libp2p) => IStore, lightPush?: (libp2p: Libp2p) => ILightPush, @@ -86,6 +88,7 @@ export class WakuNode implements Waku { peerId, libp2p, { pingKeepAlive, relayKeepAlive }, + pubSubTopics, this.relay ); diff --git a/packages/dns-discovery/src/dns_discovery.ts b/packages/dns-discovery/src/dns_discovery.ts index f4ac9e9db6..7fb85f8916 100644 --- a/packages/dns-discovery/src/dns_discovery.ts +++ b/packages/dns-discovery/src/dns_discovery.ts @@ -5,6 +5,7 @@ import type { } from "@libp2p/interface/peer-discovery"; import { peerDiscovery as symbol } from "@libp2p/interface/peer-discovery"; import type { PeerInfo } from "@libp2p/interface/peer-info"; +import { encodeRelayShard } from "@waku/enr"; import type { DnsDiscOptions, DnsDiscoveryComponents, @@ -72,18 +73,16 @@ export class PeerDiscoveryDns return; } - const peerInfo = peerEnr.peerInfo; + const { peerInfo, shardInfo } = peerEnr; if (!peerInfo) { continue; } const tagsToUpdate = { - tags: { - [DEFAULT_BOOTSTRAP_TAG_NAME]: { - value: this._options.tagValue ?? DEFAULT_BOOTSTRAP_TAG_VALUE, - ttl: this._options.tagTTL ?? DEFAULT_BOOTSTRAP_TAG_TTL - } + [DEFAULT_BOOTSTRAP_TAG_NAME]: { + value: this._options.tagValue ?? DEFAULT_BOOTSTRAP_TAG_VALUE, + ttl: this._options.tagTTL ?? DEFAULT_BOOTSTRAP_TAG_TTL } }; @@ -96,11 +95,20 @@ export class PeerDiscoveryDns if (!hasBootstrapTag) { isPeerChanged = true; - await this._components.peerStore.merge(peerInfo.id, tagsToUpdate); + await this._components.peerStore.merge(peerInfo.id, { + tags: tagsToUpdate + }); } } else { isPeerChanged = true; - await this._components.peerStore.save(peerInfo.id, tagsToUpdate); + await this._components.peerStore.save(peerInfo.id, { + tags: tagsToUpdate, + ...(shardInfo && { + metadata: { + shardInfo: encodeRelayShard(shardInfo) + } + }) + }); } if (isPeerChanged) { diff --git a/packages/enr/package.json b/packages/enr/package.json index 2354f8a982..9a34330df4 100644 --- a/packages/enr/package.json +++ b/packages/enr/package.json @@ -71,6 +71,7 @@ "@waku/interfaces": "0.0.18", "chai": "^4.3.7", "cspell": "^7.3.2", + "fast-check": "^3.13.1", "mocha": "^10.2.0", "npm-run-all": "^4.1.5", "process": "^0.11.10", diff --git a/packages/enr/src/enr.ts b/packages/enr/src/enr.ts index e3ee2d314a..4df4da5a4e 100644 --- a/packages/enr/src/enr.ts +++ b/packages/enr/src/enr.ts @@ -6,7 +6,8 @@ import type { ENRValue, IEnr, NodeId, - SequenceNumber + SequenceNumber, + ShardInfo } from "@waku/interfaces"; import debug from "debug"; @@ -64,6 +65,13 @@ export class ENR extends RawEnr implements IEnr { protocol: TransportProtocol | TransportProtocolPerIpVersion ) => Multiaddr | undefined = locationMultiaddrFromEnrFields.bind({}, this); + get shardInfo(): ShardInfo | undefined { + if (this.rs && this.rsv) { + log("Warning: ENR contains both `rs` and `rsv` fields."); + } + return this.rs || this.rsv; + } + setLocationMultiaddr(multiaddr: Multiaddr): void { const protoNames = multiaddr.protoNames(); if ( diff --git a/packages/enr/src/index.ts b/packages/enr/src/index.ts index d8b6fb5481..4835e900ba 100644 --- a/packages/enr/src/index.ts +++ b/packages/enr/src/index.ts @@ -5,3 +5,4 @@ export * from "./enr.js"; export * from "./peer_id.js"; export * from "./waku2_codec.js"; export * from "./crypto.js"; +export * from "./relay_shard_codec.js"; diff --git a/packages/enr/src/raw_enr.ts b/packages/enr/src/raw_enr.ts index 1109d74893..252d63a4a4 100644 --- a/packages/enr/src/raw_enr.ts +++ b/packages/enr/src/raw_enr.ts @@ -3,11 +3,18 @@ import { convertToBytes, convertToString } from "@multiformats/multiaddr/convert"; -import type { ENRKey, ENRValue, SequenceNumber, Waku2 } from "@waku/interfaces"; +import type { + ENRKey, + ENRValue, + SequenceNumber, + ShardInfo, + Waku2 +} from "@waku/interfaces"; import { bytesToUtf8 } from "@waku/utils/bytes"; import { ERR_INVALID_ID } from "./constants.js"; import { decodeMultiaddrs, encodeMultiaddrs } from "./multiaddrs_codec.js"; +import { decodeRelayShard } from "./relay_shard_codec.js"; import { decodeWaku2, encodeWaku2 } from "./waku2_codec.js"; export class RawEnr extends Map { @@ -45,6 +52,18 @@ export class RawEnr extends Map { } } + get rs(): ShardInfo | undefined { + const rs = this.get("rs"); + if (!rs) return undefined; + return decodeRelayShard(rs); + } + + get rsv(): ShardInfo | undefined { + const rsv = this.get("rsv"); + if (!rsv) return undefined; + return decodeRelayShard(rsv); + } + get ip(): string | undefined { return getStringValue(this, "ip", "ip4"); } diff --git a/packages/enr/src/relay_shard_codec.spec.ts b/packages/enr/src/relay_shard_codec.spec.ts new file mode 100644 index 0000000000..38857dba49 --- /dev/null +++ b/packages/enr/src/relay_shard_codec.spec.ts @@ -0,0 +1,68 @@ +import { expect } from "chai"; +import fc from "fast-check"; + +import { decodeRelayShard, encodeRelayShard } from "./relay_shard_codec.js"; + +describe("Relay Shard codec", () => { + // Boundary test case + it("should handle a minimal index list", () => { + const shardInfo = { cluster: 0, indexList: [0] }; + const encoded = encodeRelayShard(shardInfo); + const decoded = decodeRelayShard(encoded); + expect(decoded).to.deep.equal( + shardInfo, + "Decoded shard info does not match the original for minimal index list" + ); + }); + + // Property-based test for rs format (Index List) + it("should correctly encode and decode relay shards using rs format (Index List)", () => { + fc.assert( + fc.property( + fc.nat(65535), // cluster + fc + .array(fc.nat(1023), { minLength: 1, maxLength: 63 }) // indexList + .map((arr) => [...new Set(arr)].sort((a, b) => a - b)), + (cluster, indexList) => { + const shardInfo = { cluster, indexList }; + const encoded = encodeRelayShard(shardInfo); + const decoded = decodeRelayShard(encoded); + + expect(decoded).to.deep.equal( + shardInfo, + "Decoded shard info does not match the original for rs format" + ); + } + ) + ); + }); + + // Property-based test for rsv format (Bit Vector) + it("should correctly encode and decode relay shards using rsv format (Bit Vector)", () => { + fc.assert( + fc.property( + fc.nat(65535), // cluster + fc + .array(fc.nat(1023), { minLength: 64, maxLength: 1024 }) // indexList + .map((arr) => [...new Set(arr)].sort((a, b) => a - b)), + (cluster, indexList) => { + const shardInfo = { cluster, indexList }; + const encoded = encodeRelayShard(shardInfo); + const decoded = decodeRelayShard(encoded); + + expect(decoded).to.deep.equal( + shardInfo, + "Decoded shard info does not match the original for rsv format" + ); + } + ) + ); + }); + + // Error handling test case + it("should throw an error for insufficient data", () => { + expect(() => decodeRelayShard(new Uint8Array([0, 0]))).to.throw( + "Insufficient data" + ); + }); +}); diff --git a/packages/enr/src/relay_shard_codec.ts b/packages/enr/src/relay_shard_codec.ts new file mode 100644 index 0000000000..9eaad8fffd --- /dev/null +++ b/packages/enr/src/relay_shard_codec.ts @@ -0,0 +1,60 @@ +import type { ShardInfo } from "@waku/interfaces"; + +export const decodeRelayShard = (bytes: Uint8Array): ShardInfo => { + // explicitly converting to Uint8Array to avoid Buffer + // https://github.com/libp2p/js-libp2p/issues/2146 + bytes = new Uint8Array(bytes); + + if (bytes.length < 3) throw new Error("Insufficient data"); + + const view = new DataView(bytes.buffer); + const cluster = view.getUint16(0); + + const indexList = []; + + if (bytes.length === 130) { + // rsv format (Bit Vector) + for (let i = 0; i < 1024; i++) { + const byteIndex = Math.floor(i / 8) + 2; // Adjusted for the 2-byte cluster field + const bitIndex = 7 - (i % 8); + if (view.getUint8(byteIndex) & (1 << bitIndex)) { + indexList.push(i); + } + } + } else { + // rs format (Index List) + const numIndices = view.getUint8(2); + for (let i = 0, offset = 3; i < numIndices; i++, offset += 2) { + if (offset + 1 >= bytes.length) throw new Error("Unexpected end of data"); + indexList.push(view.getUint16(offset)); + } + } + + return { cluster, indexList }; +}; + +export const encodeRelayShard = (shardInfo: ShardInfo): Uint8Array => { + const { cluster, indexList } = shardInfo; + const totalLength = indexList.length >= 64 ? 130 : 3 + 2 * indexList.length; + const buffer = new ArrayBuffer(totalLength); + const view = new DataView(buffer); + + view.setUint16(0, cluster); + + if (indexList.length >= 64) { + // rsv format (Bit Vector) + for (const index of indexList) { + const byteIndex = Math.floor(index / 8) + 2; // Adjusted for the 2-byte cluster field + const bitIndex = 7 - (index % 8); + view.setUint8(byteIndex, view.getUint8(byteIndex) | (1 << bitIndex)); + } + } else { + // rs format (Index List) + view.setUint8(2, indexList.length); + for (let i = 0, offset = 3; i < indexList.length; i++, offset += 2) { + view.setUint16(offset, indexList[i]); + } + } + + return new Uint8Array(buffer); +}; diff --git a/packages/interfaces/src/enr.ts b/packages/interfaces/src/enr.ts index 549827ba84..b06d51cacd 100644 --- a/packages/interfaces/src/enr.ts +++ b/packages/interfaces/src/enr.ts @@ -18,6 +18,11 @@ export interface Waku2 { lightPush: boolean; } +export interface ShardInfo { + cluster: number; + indexList: number[]; +} + export interface IEnr extends Map { nodeId?: NodeId; peerId?: PeerId; @@ -34,6 +39,7 @@ export interface IEnr extends Map { multiaddrs?: Multiaddr[]; waku2?: Waku2; peerInfo: PeerInfo | undefined; + shardInfo?: ShardInfo; /** * @deprecated: use { @link IEnr.peerInfo } instead. diff --git a/packages/peer-exchange/src/waku_peer_exchange_discovery.ts b/packages/peer-exchange/src/waku_peer_exchange_discovery.ts index 14c19c45fe..7cbae74c80 100644 --- a/packages/peer-exchange/src/waku_peer_exchange_discovery.ts +++ b/packages/peer-exchange/src/waku_peer_exchange_discovery.ts @@ -7,6 +7,7 @@ import type { import { peerDiscovery as symbol } from "@libp2p/interface/peer-discovery"; import type { PeerId } from "@libp2p/interface/peer-id"; import type { PeerInfo } from "@libp2p/interface/peer-info"; +import { encodeRelayShard } from "@waku/enr"; import { Libp2pComponents, Tags } from "@waku/interfaces"; import debug from "debug"; @@ -174,7 +175,7 @@ export class PeerExchangeDiscovery continue; } - const { peerId, peerInfo } = ENR; + const { peerId, peerInfo, shardInfo } = ENR; if (!peerId || !peerInfo) { continue; } @@ -191,7 +192,12 @@ export class PeerExchangeDiscovery value: this.options.tagValue ?? DEFAULT_PEER_EXCHANGE_TAG_VALUE, ttl: this.options.tagTTL ?? DEFAULT_PEER_EXCHANGE_TAG_TTL } - } + }, + ...(shardInfo && { + metadata: { + shardInfo: encodeRelayShard(shardInfo) + } + }) }); log(`Discovered peer: ${peerId.toString()}`); diff --git a/packages/sdk/src/create.ts b/packages/sdk/src/create.ts index 0b78fd39fc..53a8fa5a02 100644 --- a/packages/sdk/src/create.ts +++ b/packages/sdk/src/create.ts @@ -5,6 +5,7 @@ import { mplex } from "@libp2p/mplex"; import { webSockets } from "@libp2p/websockets"; import { all as filterAll } from "@libp2p/websockets/filters"; import { + DefaultPubSubTopic, DefaultUserAgent, wakuFilter, wakuLightPush, @@ -43,6 +44,12 @@ export { Libp2pComponents }; export async function createLightNode( options?: ProtocolCreateOptions & WakuOptions ): Promise { + options = options ?? {}; + + if (!options.pubSubTopics) { + options.pubSubTopics = [DefaultPubSubTopic]; + } + const libp2pOptions = options?.libp2p ?? {}; const peerDiscovery = libp2pOptions.peerDiscovery ?? []; if (options?.defaultBootstrap) { @@ -62,6 +69,7 @@ export async function createLightNode( return new WakuNode( options ?? {}, + options.pubSubTopics, libp2p, store, lightPush, @@ -76,6 +84,12 @@ export async function createLightNode( export async function createRelayNode( options?: ProtocolCreateOptions & WakuOptions & Partial ): Promise { + options = options ?? {}; + + if (!options.pubSubTopics) { + options.pubSubTopics = [DefaultPubSubTopic]; + } + const libp2pOptions = options?.libp2p ?? {}; const peerDiscovery = libp2pOptions.peerDiscovery ?? []; if (options?.defaultBootstrap) { @@ -92,7 +106,8 @@ export async function createRelayNode( const relay = wakuRelay(options); return new WakuNode( - options ?? {}, + options, + options.pubSubTopics, libp2p, undefined, undefined, @@ -117,6 +132,12 @@ export async function createRelayNode( export async function createFullNode( options?: ProtocolCreateOptions & WakuOptions & Partial ): Promise { + options = options ?? {}; + + if (!options.pubSubTopics) { + options.pubSubTopics = [DefaultPubSubTopic]; + } + const libp2pOptions = options?.libp2p ?? {}; const peerDiscovery = libp2pOptions.peerDiscovery ?? []; if (options?.defaultBootstrap) { @@ -137,6 +158,7 @@ export async function createFullNode( return new WakuNode( options ?? {}, + options.pubSubTopics, libp2p, store, lightPush, diff --git a/packages/tests/tests/connection_manager.spec.ts b/packages/tests/tests/connection_manager.spec.ts index b6e1f50a5e..341af18444 100644 --- a/packages/tests/tests/connection_manager.spec.ts +++ b/packages/tests/tests/connection_manager.spec.ts @@ -146,16 +146,23 @@ describe("ConnectionManager", function () { let dialPeerStub: SinonStub; let getConnectionsStub: SinonStub; let getTagNamesForPeerStub: SinonStub; + let isPeerTopicConfigured: SinonStub; let waku: LightNode; this.beforeEach(async function () { this.timeout(15000); waku = await createLightNode(); + isPeerTopicConfigured = sinon.stub( + waku.connectionManager as any, + "isPeerTopicConfigured" + ); + isPeerTopicConfigured.resolves(true); }); afterEach(async () => { this.timeout(15000); await waku.stop(); + isPeerTopicConfigured.restore(); sinon.restore(); }); diff --git a/packages/tests/tests/sharding/peer_management.spec.ts b/packages/tests/tests/sharding/peer_management.spec.ts new file mode 100644 index 0000000000..38f4c3da9f --- /dev/null +++ b/packages/tests/tests/sharding/peer_management.spec.ts @@ -0,0 +1,186 @@ +import { bootstrap } from "@libp2p/bootstrap"; +import type { PeerId } from "@libp2p/interface/peer-id"; +import { wakuPeerExchangeDiscovery } from "@waku/peer-exchange"; +import { createLightNode, LightNode, Tags } from "@waku/sdk"; +import chai, { expect } from "chai"; +import chaiAsPromised from "chai-as-promised"; +import Sinon, { SinonSpy } from "sinon"; + +import { delay } from "../../src/delay.js"; +import { makeLogFileName } from "../../src/log_file.js"; +import { NimGoNode } from "../../src/node/node.js"; + +chai.use(chaiAsPromised); + +describe("Static Sharding: Peer Management", function () { + describe("Peer Exchange", function () { + let waku: LightNode; + let nwaku1: NimGoNode; + let nwaku2: NimGoNode; + let nwaku3: NimGoNode; + + let attemptDialSpy: SinonSpy; + + beforeEach(async function () { + nwaku1 = new NimGoNode(makeLogFileName(this) + "1"); + nwaku2 = new NimGoNode(makeLogFileName(this) + "2"); + nwaku3 = new NimGoNode(makeLogFileName(this) + "3"); + }); + + afterEach(async function () { + this.timeout(5_000); + await nwaku1?.stop(); + await nwaku2?.stop(); + await nwaku3?.stop(); + !!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e)); + + attemptDialSpy && attemptDialSpy.restore(); + }); + + it("all px service nodes subscribed to the shard topic should be dialed", async function () { + this.timeout(100_000); + + const pubSubTopics = ["/waku/2/rs/18/2"]; + + await nwaku1.start({ + topic: pubSubTopics, + discv5Discovery: true, + peerExchange: true, + relay: true + }); + + const enr1 = (await nwaku1.info()).enrUri; + + await nwaku2.start({ + topic: pubSubTopics, + discv5Discovery: true, + peerExchange: true, + discv5BootstrapNode: enr1, + relay: true + }); + + const enr2 = (await nwaku2.info()).enrUri; + + await nwaku3.start({ + topic: pubSubTopics, + discv5Discovery: true, + peerExchange: true, + discv5BootstrapNode: enr2, + relay: true + }); + const nwaku3Ma = await nwaku3.getMultiaddrWithId(); + + waku = await createLightNode({ + pubSubTopics, + libp2p: { + peerDiscovery: [ + bootstrap({ list: [nwaku3Ma.toString()] }), + wakuPeerExchangeDiscovery() + ] + } + }); + + await waku.start(); + + attemptDialSpy = Sinon.spy( + (waku as any).connectionManager, + "attemptDial" + ); + + const pxPeersDiscovered = new Set(); + + await new Promise((resolve) => { + waku.libp2p.addEventListener("peer:discovery", (evt) => { + return void (async () => { + const peerId = evt.detail.id; + const peer = await waku.libp2p.peerStore.get(peerId); + const tags = Array.from(peer.tags.keys()); + if (tags.includes(Tags.PEER_EXCHANGE)) { + pxPeersDiscovered.add(peerId); + if (pxPeersDiscovered.size === 2) { + resolve(); + } + } + })(); + }); + }); + + await delay(1000); + + expect(attemptDialSpy.callCount).to.equal(3); + }); + + it("px service nodes not subscribed to the shard should not be dialed", async function () { + this.timeout(100_000); + const pubSubTopicsToDial = ["/waku/2/rs/18/2"]; + const pubSubTopicsToIgnore = ["/waku/2/rs/18/3"]; + + // this service node is not subscribed to the shard + await nwaku1.start({ + topic: pubSubTopicsToIgnore, + relay: true, + discv5Discovery: true, + peerExchange: true + }); + + const enr1 = (await nwaku1.info()).enrUri; + + await nwaku2.start({ + topic: pubSubTopicsToDial, + relay: true, + discv5Discovery: true, + peerExchange: true, + discv5BootstrapNode: enr1 + }); + + const enr2 = (await nwaku2.info()).enrUri; + + await nwaku3.start({ + relay: true, + discv5Discovery: true, + peerExchange: true, + discv5BootstrapNode: enr2 + }); + const nwaku3Ma = await nwaku3.getMultiaddrWithId(); + + waku = await createLightNode({ + pubSubTopics: pubSubTopicsToDial, + libp2p: { + peerDiscovery: [ + bootstrap({ list: [nwaku3Ma.toString()] }), + wakuPeerExchangeDiscovery() + ] + } + }); + + attemptDialSpy = Sinon.spy( + (waku as any).connectionManager, + "attemptDial" + ); + + await waku.start(); + + const pxPeersDiscovered = new Set(); + + await new Promise((resolve) => { + waku.libp2p.addEventListener("peer:discovery", (evt) => { + return void (async () => { + const peerId = evt.detail.id; + const peer = await waku.libp2p.peerStore.get(peerId); + const tags = Array.from(peer.tags.keys()); + if (tags.includes(Tags.PEER_EXCHANGE)) { + pxPeersDiscovered.add(peerId); + if (pxPeersDiscovered.size === 1) { + resolve(); + } + } + })(); + }); + }); + + await delay(1000); + + expect(attemptDialSpy.callCount).to.equal(2); + }); + }); +}); diff --git a/packages/tests/tests/sharding.spec.ts b/packages/tests/tests/sharding/running_nodes.spec.ts similarity index 81% rename from packages/tests/tests/sharding.spec.ts rename to packages/tests/tests/sharding/running_nodes.spec.ts index 3e830e3e7f..09c474558c 100644 --- a/packages/tests/tests/sharding.spec.ts +++ b/packages/tests/tests/sharding/running_nodes.spec.ts @@ -1,19 +1,16 @@ -import { createLightNode, LightNode, utf8ToBytes } from "@waku/sdk"; -import { createEncoder } from "@waku/sdk"; -import chai, { expect } from "chai"; -import chaiAsPromised from "chai-as-promised"; +import { LightNode } from "@waku/interfaces"; +import { createEncoder, createLightNode, utf8ToBytes } from "@waku/sdk"; +import { expect } from "chai"; -import { makeLogFileName } from "../src/log_file.js"; -import { NimGoNode } from "../src/node/node.js"; +import { makeLogFileName } from "../../src/log_file.js"; +import { NimGoNode } from "../../src/node/node.js"; const PubSubTopic1 = "/waku/2/rs/0/2"; const PubSubTopic2 = "/waku/2/rs/0/3"; -const ContentTopic = "/waku/2/content/test"; +const ContentTopic = "/waku/2/content/test.js"; -chai.use(chaiAsPromised); - -describe("Static Sharding", () => { +describe("Static Sharding: Running Nodes", () => { let waku: LightNode; let nwaku: NimGoNode; diff --git a/packages/utils/src/common/sharding.ts b/packages/utils/src/common/sharding.ts index f14ba20d69..258962b61f 100644 --- a/packages/utils/src/common/sharding.ts +++ b/packages/utils/src/common/sharding.ts @@ -1,4 +1,12 @@ -import type { PubSubTopic } from "@waku/interfaces"; +import type { PubSubTopic, ShardInfo } from "@waku/interfaces"; + +export const shardInfoToPubSubTopics = ( + shardInfo: ShardInfo +): PubSubTopic[] => { + return shardInfo.indexList.map( + (index) => `/waku/2/rs/${shardInfo.cluster}/${index}` + ); +}; export function ensurePubsubTopicIsConfigured( pubsubTopic: PubSubTopic,