From e138b4f5c49a35a37830e31e8be87d824f53249f Mon Sep 17 00:00:00 2001 From: Arseniy Klempner Date: Thu, 25 Jan 2024 20:07:58 -0800 Subject: [PATCH] feat: decouple sharding params out of core --- packages/core/src/lib/base_protocol.ts | 22 +------- packages/core/src/lib/filter/index.ts | 10 +++- packages/core/src/lib/light_push/index.ts | 8 ++- packages/core/src/lib/metadata/index.ts | 14 +++-- packages/core/src/lib/store/index.ts | 2 +- packages/core/src/lib/wait_for_remote_peer.ts | 17 +++--- packages/core/src/lib/waku.ts | 20 +++---- packages/interfaces/src/waku.ts | 4 +- .../peer-exchange/src/waku_peer_exchange.ts | 16 +++--- .../src/waku_peer_exchange_discovery.ts | 18 ++++--- packages/relay/src/index.ts | 18 ++----- packages/sdk/src/create.ts | 54 +++++++++++-------- packages/sdk/src/relay/index.ts | 30 +++++++---- .../tests/tests/connection_manager.spec.ts | 6 ++- packages/tests/tests/getPeers.spec.ts | 2 +- .../tests/tests/peer_exchange.node.spec.ts | 25 ++++++--- .../tests/peer_exchange.optional.spec.ts | 4 +- .../tests/sharding/peer_management.spec.ts | 8 +-- .../tests/sharding/running_nodes.spec.ts | 2 +- packages/utils/src/common/sharding.ts | 4 +- 20 files changed, 150 insertions(+), 134 deletions(-) diff --git a/packages/core/src/lib/base_protocol.ts b/packages/core/src/lib/base_protocol.ts index ea96b2b293..aea1ab1847 100644 --- a/packages/core/src/lib/base_protocol.ts +++ b/packages/core/src/lib/base_protocol.ts @@ -6,12 +6,7 @@ import type { ProtocolCreateOptions, PubsubTopic } from "@waku/interfaces"; -import { DefaultPubsubTopic } from "@waku/interfaces"; -import { - ensureShardingConfigured, - Logger, - shardInfoToPubsubTopics -} from "@waku/utils"; +import { ensureShardingConfigured, Logger } from "@waku/utils"; import { getConnectedPeersForProtocolAndShard, getPeersForProtocol, @@ -32,16 +27,14 @@ export class BaseProtocol implements IBaseProtocol { public readonly removeLibp2pEventListener: Libp2p["removeEventListener"]; readonly numPeersToUse: number; protected streamManager: StreamManager; - protected pubsubTopics: PubsubTopic[]; constructor( public multicodec: string, private components: Libp2pComponents, private log: Logger, + protected pubsubTopics: PubsubTopic[], private options?: ProtocolCreateOptions ) { - this.pubsubTopics = this.initializePubsubTopic(options); - this.numPeersToUse = options?.numPeersToUse ?? DEFAULT_NUM_PEERS_TO_USE; this.addLibp2pEventListener = components.events.addEventListener.bind( @@ -143,15 +136,4 @@ export class BaseProtocol implements IBaseProtocol { return sortedFilteredPeers; } - - private initializePubsubTopic( - options?: ProtocolCreateOptions - ): PubsubTopic[] { - return ( - options?.pubsubTopics ?? - (options?.shardInfo - ? shardInfoToPubsubTopics(options.shardInfo) - : [DefaultPubsubTopic]) - ); - } } diff --git a/packages/core/src/lib/filter/index.ts b/packages/core/src/lib/filter/index.ts index 7e6353964c..cadf1720c2 100644 --- a/packages/core/src/lib/filter/index.ts +++ b/packages/core/src/lib/filter/index.ts @@ -358,7 +358,13 @@ class Filter extends BaseProtocol implements IReceiver { } constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) { - super(FilterCodecs.SUBSCRIBE, libp2p.components, log, options); + super( + FilterCodecs.SUBSCRIBE, + libp2p.components, + log, + options!.pubsubTopics!, + options + ); libp2p.handle(FilterCodecs.PUSH, this.onRequest.bind(this)).catch((e) => { log.error("Failed to register ", FilterCodecs.PUSH, e); @@ -493,7 +499,7 @@ class Filter extends BaseProtocol implements IReceiver { } export function wakuFilter( - init: Partial = {} + init: ProtocolCreateOptions = { pubsubTopics: [] } ): (libp2p: Libp2p) => IFilter { return (libp2p: Libp2p) => new Filter(libp2p, init); } diff --git a/packages/core/src/lib/light_push/index.ts b/packages/core/src/lib/light_push/index.ts index ffab97cb55..d983ea5233 100644 --- a/packages/core/src/lib/light_push/index.ts +++ b/packages/core/src/lib/light_push/index.ts @@ -43,7 +43,13 @@ type PreparePushMessageResult = */ class LightPush extends BaseProtocol implements ILightPush { constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) { - super(LightPushCodec, libp2p.components, log, options); + super( + LightPushCodec, + libp2p.components, + log, + options!.pubsubTopics!, + options + ); } private async preparePushMessage( diff --git a/packages/core/src/lib/metadata/index.ts b/packages/core/src/lib/metadata/index.ts index f48024b3f7..0860294197 100644 --- a/packages/core/src/lib/metadata/index.ts +++ b/packages/core/src/lib/metadata/index.ts @@ -4,11 +4,10 @@ import type { IMetadata, Libp2pComponents, PeerIdStr, - ShardInfo, - ShardingParams + ShardInfo } from "@waku/interfaces"; import { proto_metadata } from "@waku/proto"; -import { encodeRelayShard, Logger } from "@waku/utils"; +import { encodeRelayShard, Logger, shardInfoToPubsubTopics } from "@waku/utils"; import all from "it-all"; import * as lp from "it-length-prefixed"; import { pipe } from "it-pipe"; @@ -25,10 +24,15 @@ class Metadata extends BaseProtocol implements IMetadata { handshakesConfirmed: Set = new Set(); constructor( - public shardInfo: ShardingParams, + public shardInfo: ShardInfo, libp2p: Libp2pComponents ) { - super(MetadataCodec, libp2p.components, log, shardInfo && { shardInfo }); + super( + MetadataCodec, + libp2p.components, + log, + shardInfoToPubsubTopics(shardInfo) + ); this.libp2pComponents = libp2p; void libp2p.registrar.handle(MetadataCodec, (streamData) => { void this.onRequest(streamData); diff --git a/packages/core/src/lib/store/index.ts b/packages/core/src/lib/store/index.ts index d5ca1c1b47..8b77adb605 100644 --- a/packages/core/src/lib/store/index.ts +++ b/packages/core/src/lib/store/index.ts @@ -76,7 +76,7 @@ class Store extends BaseProtocol implements IStore { private readonly NUM_PEERS_PROTOCOL = 1; constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) { - super(StoreCodec, libp2p.components, log, options); + super(StoreCodec, libp2p.components, log, options!.pubsubTopics!, options); } /** diff --git a/packages/core/src/lib/wait_for_remote_peer.ts b/packages/core/src/lib/wait_for_remote_peer.ts index 183e81d75c..2ddcd65768 100644 --- a/packages/core/src/lib/wait_for_remote_peer.ts +++ b/packages/core/src/lib/wait_for_remote_peer.ts @@ -31,11 +31,6 @@ export async function waitForRemotePeer( ): Promise { protocols = protocols ?? getEnabledProtocols(waku); - const isShardingEnabled = waku.shardInfo !== undefined; - const metadataService = isShardingEnabled - ? waku.libp2p.services.metadata - : undefined; - if (!waku.isStarted()) return Promise.reject("Waku node is not started"); const promises = []; @@ -49,19 +44,25 @@ export async function waitForRemotePeer( if (protocols.includes(Protocols.Store)) { if (!waku.store) throw new Error("Cannot wait for Store peer: protocol not mounted"); - promises.push(waitForConnectedPeer(waku.store, metadataService)); + promises.push( + waitForConnectedPeer(waku.store, waku.libp2p.services.metadata) + ); } if (protocols.includes(Protocols.LightPush)) { if (!waku.lightPush) throw new Error("Cannot wait for LightPush peer: protocol not mounted"); - promises.push(waitForConnectedPeer(waku.lightPush, metadataService)); + promises.push( + waitForConnectedPeer(waku.lightPush, waku.libp2p.services.metadata) + ); } if (protocols.includes(Protocols.Filter)) { if (!waku.filter) throw new Error("Cannot wait for Filter peer: protocol not mounted"); - promises.push(waitForConnectedPeer(waku.filter, metadataService)); + promises.push( + waitForConnectedPeer(waku.filter, waku.libp2p.services.metadata) + ); } if (timeoutMs) { diff --git a/packages/core/src/lib/waku.ts b/packages/core/src/lib/waku.ts index 0671386e2a..dabf3ca5d0 100644 --- a/packages/core/src/lib/waku.ts +++ b/packages/core/src/lib/waku.ts @@ -8,11 +8,10 @@ import type { IStore, Libp2p, PubsubTopic, - ShardingParams, Waku } from "@waku/interfaces"; -import { DefaultPubsubTopic, Protocols } from "@waku/interfaces"; -import { Logger, shardInfoToPubsubTopics } from "@waku/utils"; +import { Protocols } from "@waku/interfaces"; +import { Logger } from "@waku/utils"; import { ConnectionManager } from "./connection_manager.js"; @@ -42,6 +41,7 @@ export interface WakuOptions { * @default {@link @waku/core.DefaultUserAgent} */ userAgent?: string; + pubsubTopics: PubsubTopic[]; } export class WakuNode implements Waku { @@ -55,20 +55,16 @@ export class WakuNode implements Waku { constructor( options: WakuOptions, - pubsubTopics: PubsubTopic[] = [], libp2p: Libp2p, - private pubsubShardInfo?: ShardingParams, store?: (libp2p: Libp2p) => IStore, lightPush?: (libp2p: Libp2p) => ILightPush, filter?: (libp2p: Libp2p) => IFilter, relay?: (libp2p: Libp2p) => IRelay ) { - if (!pubsubShardInfo) { - this.pubsubTopics = - pubsubTopics.length > 0 ? pubsubTopics : [DefaultPubsubTopic]; - } else { - this.pubsubTopics = shardInfoToPubsubTopics(pubsubShardInfo); + if (options.pubsubTopics.length == 0) { + throw new Error("At least one pubsub topic must be provided"); } + this.pubsubTopics = options.pubsubTopics; this.libp2p = libp2p; @@ -110,10 +106,6 @@ export class WakuNode implements Waku { ); } - get shardInfo(): ShardingParams | undefined { - return this.pubsubShardInfo; - } - /** * Dials to the provided peer. * diff --git a/packages/interfaces/src/waku.ts b/packages/interfaces/src/waku.ts index 260c806235..ffb6118ca3 100644 --- a/packages/interfaces/src/waku.ts +++ b/packages/interfaces/src/waku.ts @@ -5,7 +5,7 @@ import { IConnectionManager } from "./connection_manager.js"; import type { IFilter } from "./filter.js"; import type { Libp2p } from "./libp2p.js"; import type { ILightPush } from "./light_push.js"; -import { Protocols, ShardingParams } from "./protocols.js"; +import { Protocols } from "./protocols.js"; import type { IRelay } from "./relay.js"; import type { IStore } from "./store.js"; @@ -16,8 +16,6 @@ export interface Waku { filter?: IFilter; lightPush?: ILightPush; - shardInfo?: ShardingParams; - connectionManager: IConnectionManager; dial(peer: PeerId | Multiaddr, protocols?: Protocols[]): Promise; diff --git a/packages/peer-exchange/src/waku_peer_exchange.ts b/packages/peer-exchange/src/waku_peer_exchange.ts index 473864474a..9471b72271 100644 --- a/packages/peer-exchange/src/waku_peer_exchange.ts +++ b/packages/peer-exchange/src/waku_peer_exchange.ts @@ -4,7 +4,8 @@ import type { IPeerExchange, Libp2pComponents, PeerExchangeQueryParams, - PeerInfo + PeerInfo, + PubsubTopic } from "@waku/interfaces"; import { isDefined } from "@waku/utils"; import { Logger } from "@waku/utils"; @@ -26,8 +27,8 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange { /** * @param components - libp2p components */ - constructor(components: Libp2pComponents) { - super(PeerExchangeCodec, components, log); + constructor(components: Libp2pComponents, pubsubTopics: PubsubTopic[]) { + super(PeerExchangeCodec, components, log, pubsubTopics); } /** @@ -91,8 +92,9 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange { * * @returns A function that creates a new peer exchange protocol */ -export function wakuPeerExchange(): ( - components: Libp2pComponents -) => WakuPeerExchange { - return (components: Libp2pComponents) => new WakuPeerExchange(components); +export function wakuPeerExchange( + pubsubTopics: PubsubTopic[] +): (components: Libp2pComponents) => WakuPeerExchange { + return (components: Libp2pComponents) => + new WakuPeerExchange(components, pubsubTopics); } diff --git a/packages/peer-exchange/src/waku_peer_exchange_discovery.ts b/packages/peer-exchange/src/waku_peer_exchange_discovery.ts index 5690c13964..dc49339823 100644 --- a/packages/peer-exchange/src/waku_peer_exchange_discovery.ts +++ b/packages/peer-exchange/src/waku_peer_exchange_discovery.ts @@ -7,7 +7,7 @@ import type { PeerId, PeerInfo } from "@libp2p/interface"; -import { Libp2pComponents, Tags } from "@waku/interfaces"; +import { Libp2pComponents, PubsubTopic, Tags } from "@waku/interfaces"; import { encodeRelayShard, Logger } from "@waku/utils"; import { PeerExchangeCodec, WakuPeerExchange } from "./waku_peer_exchange.js"; @@ -77,10 +77,14 @@ export class PeerExchangeDiscovery ); }; - constructor(components: Libp2pComponents, options: Options = {}) { + constructor( + components: Libp2pComponents, + pubsubTopics: PubsubTopic[], + options: Options = {} + ) { super(); this.components = components; - this.peerExchange = new WakuPeerExchange(components); + this.peerExchange = new WakuPeerExchange(components, pubsubTopics); this.options = options; this.isStarted = false; } @@ -219,9 +223,9 @@ export class PeerExchangeDiscovery } } -export function wakuPeerExchangeDiscovery(): ( - components: Libp2pComponents -) => PeerExchangeDiscovery { +export function wakuPeerExchangeDiscovery( + pubsubTopics: PubsubTopic[] +): (components: Libp2pComponents) => PeerExchangeDiscovery { return (components: Libp2pComponents) => - new PeerExchangeDiscovery(components); + new PeerExchangeDiscovery(components, pubsubTopics); } diff --git a/packages/relay/src/index.ts b/packages/relay/src/index.ts index 0f9ac329f1..a5f6f241f7 100644 --- a/packages/relay/src/index.ts +++ b/packages/relay/src/index.ts @@ -24,11 +24,7 @@ import { SendError, SendResult } from "@waku/interfaces"; -import { - isWireSizeUnderCap, - shardInfoToPubsubTopics, - toAsyncIterator -} from "@waku/utils"; +import { isWireSizeUnderCap, toAsyncIterator } from "@waku/utils"; import { pushOrInitMapSet } from "@waku/utils"; import { Logger } from "@waku/utils"; @@ -63,7 +59,7 @@ class Relay implements IRelay { */ private observers: Map>>; - constructor(libp2p: Libp2p, options?: Partial) { + constructor(libp2p: Libp2p, pubsubTopics: PubsubTopic[]) { if (!this.isRelayPubsub(libp2p.services.pubsub)) { throw Error( `Failed to initialize Relay. libp2p.pubsub does not support ${Relay.multicodec}` @@ -71,11 +67,7 @@ class Relay implements IRelay { } this.gossipSub = libp2p.services.pubsub as GossipSub; - this.pubsubTopics = new Set( - options?.shardInfo - ? shardInfoToPubsubTopics(options.shardInfo) - : options?.pubsubTopics ?? [DefaultPubsubTopic] - ); + this.pubsubTopics = new Set(pubsubTopics); if (this.gossipSub.isStarted()) { this.subscribeToAllTopics(); @@ -283,9 +275,9 @@ class Relay implements IRelay { } export function wakuRelay( - init: Partial = {} + pubsubTopics: PubsubTopic[] ): (libp2p: Libp2p) => IRelay { - return (libp2p: Libp2p) => new Relay(libp2p, init); + return (libp2p: Libp2p) => new Relay(libp2p, pubsubTopics); } export function wakuGossipSub( diff --git a/packages/sdk/src/create.ts b/packages/sdk/src/create.ts index 4f438e45ee..4abc43b533 100644 --- a/packages/sdk/src/create.ts +++ b/packages/sdk/src/create.ts @@ -18,12 +18,14 @@ import { import { enrTree, wakuDnsDiscovery } from "@waku/dns-discovery"; import { type CreateLibp2pOptions, + DefaultPubsubTopic, type FullNode, type IMetadata, type Libp2p, type Libp2pComponents, type LightNode, type ProtocolCreateOptions, + PubsubTopic, type ShardInfo } from "@waku/interfaces"; import { wakuPeerExchangeDiscovery } from "@waku/peer-exchange"; @@ -43,20 +45,24 @@ export { Libp2pComponents }; * Create a Waku node configured to use autosharding or static sharding. */ export async function createNode( - options?: ProtocolCreateOptions & WakuOptions & Partial + options?: ProtocolCreateOptions & + Partial & + Partial ): Promise { - options = options ?? {}; + options = options ?? { pubsubTopics: [] }; if (!options.shardInfo) { throw new Error("Shard info must be set"); } const shardInfo = ensureShardingConfigured(options.shardInfo); + options.pubsubTopics = shardInfo.pubsubTopics; + options.shardInfo = shardInfo.shardInfo; const libp2pOptions = options?.libp2p ?? {}; const peerDiscovery = libp2pOptions.peerDiscovery ?? []; if (options?.defaultBootstrap) { - peerDiscovery.push(...defaultPeerDiscoveries()); + peerDiscovery.push(...defaultPeerDiscoveries(shardInfo.pubsubTopics)); Object.assign(libp2pOptions, { peerDiscovery }); } @@ -72,10 +78,8 @@ export async function createNode( const filter = wakuFilter(options); return new WakuNode( - options ?? {}, - [], + options as WakuOptions, libp2p, - shardInfo.shardInfo, store, lightPush, filter @@ -88,7 +92,7 @@ export async function createNode( * Uses Waku Filter V2 by default. */ export async function createLightNode( - options?: ProtocolCreateOptions & WakuOptions + options?: ProtocolCreateOptions & Partial ): Promise { options = options ?? {}; @@ -96,10 +100,13 @@ export async function createLightNode( ? ensureShardingConfigured(options.shardInfo) : undefined; + options.pubsubTopics = shardInfo?.pubsubTopics ?? + options.pubsubTopics ?? [DefaultPubsubTopic]; + const libp2pOptions = options?.libp2p ?? {}; const peerDiscovery = libp2pOptions.peerDiscovery ?? []; if (options?.defaultBootstrap) { - peerDiscovery.push(...defaultPeerDiscoveries()); + peerDiscovery.push(...defaultPeerDiscoveries(options.pubsubTopics)); Object.assign(libp2pOptions, { peerDiscovery }); } @@ -115,10 +122,8 @@ export async function createLightNode( const filter = wakuFilter(options); return new WakuNode( - options ?? {}, - options.pubsubTopics, + options as WakuOptions, libp2p, - shardInfo?.shardingParams, store, lightPush, filter @@ -139,18 +144,25 @@ export async function createLightNode( * @internal */ export async function createFullNode( - options?: ProtocolCreateOptions & WakuOptions & Partial + options?: ProtocolCreateOptions & + Partial & + Partial ): Promise { - options = options ?? {}; + options = options ?? { pubsubTopics: [] }; const shardInfo = options.shardInfo ? ensureShardingConfigured(options.shardInfo) : undefined; + const pubsubTopics = shardInfo?.pubsubTopics ?? + options.pubsubTopics ?? [DefaultPubsubTopic]; + options.pubsubTopics = pubsubTopics; + options.shardInfo = shardInfo?.shardInfo; + const libp2pOptions = options?.libp2p ?? {}; const peerDiscovery = libp2pOptions.peerDiscovery ?? []; if (options?.defaultBootstrap) { - peerDiscovery.push(...defaultPeerDiscoveries()); + peerDiscovery.push(...defaultPeerDiscoveries(pubsubTopics)); Object.assign(libp2pOptions, { peerDiscovery }); } @@ -164,13 +176,11 @@ export async function createFullNode( const store = wakuStore(options); const lightPush = wakuLightPush(options); const filter = wakuFilter(options); - const relay = wakuRelay(options); + const relay = wakuRelay(pubsubTopics); return new WakuNode( - options ?? {}, - options.pubsubTopics, + options as WakuOptions, libp2p, - shardInfo?.shardingParams, store, lightPush, filter, @@ -178,12 +188,12 @@ export async function createFullNode( ) as FullNode; } -export function defaultPeerDiscoveries(): (( - components: Libp2pComponents -) => PeerDiscovery)[] { +export function defaultPeerDiscoveries( + pubsubTopics: PubsubTopic[] +): ((components: Libp2pComponents) => PeerDiscovery)[] { const discoveries = [ wakuDnsDiscovery([enrTree["PROD"]], DEFAULT_NODE_REQUIREMENTS), - wakuPeerExchangeDiscovery() + wakuPeerExchangeDiscovery(pubsubTopics) ]; return discoveries; } diff --git a/packages/sdk/src/relay/index.ts b/packages/sdk/src/relay/index.ts index 0e1be794df..6a993852f7 100644 --- a/packages/sdk/src/relay/index.ts +++ b/packages/sdk/src/relay/index.ts @@ -1,5 +1,9 @@ import { WakuNode, WakuOptions } from "@waku/core"; -import type { ProtocolCreateOptions, RelayNode } from "@waku/interfaces"; +import { + DefaultPubsubTopic, + type ProtocolCreateOptions, + type RelayNode +} from "@waku/interfaces"; import { RelayCreateOptions, wakuGossipSub, wakuRelay } from "@waku/relay"; import { ensureShardingConfigured } from "@waku/utils"; @@ -16,21 +20,27 @@ import { defaultLibp2p, defaultPeerDiscoveries } from "../create.js"; * or use this function with caution. */ export async function createRelayNode( - options?: ProtocolCreateOptions & WakuOptions & Partial + options?: ProtocolCreateOptions & + Partial & + Partial ): Promise { - options = options ?? {}; + options = options ?? { pubsubTopics: [] }; const libp2pOptions = options?.libp2p ?? {}; const peerDiscovery = libp2pOptions.peerDiscovery ?? []; - if (options?.defaultBootstrap) { - peerDiscovery.push(...defaultPeerDiscoveries()); - Object.assign(libp2pOptions, { peerDiscovery }); - } const shardInfo = options.shardInfo ? ensureShardingConfigured(options.shardInfo) : undefined; + options.pubsubTopics = shardInfo?.pubsubTopics ?? + options.pubsubTopics ?? [DefaultPubsubTopic]; + + if (options?.defaultBootstrap) { + peerDiscovery.push(...defaultPeerDiscoveries(options.pubsubTopics)); + Object.assign(libp2pOptions, { peerDiscovery }); + } + const libp2p = await defaultLibp2p( shardInfo?.shardInfo, wakuGossipSub(options), @@ -38,13 +48,11 @@ export async function createRelayNode( options?.userAgent ); - const relay = wakuRelay(options); + const relay = wakuRelay(options.pubsubTopics); return new WakuNode( - options, - options.pubsubTopics, + options as WakuOptions, libp2p, - shardInfo?.shardingParams, undefined, undefined, undefined, diff --git a/packages/tests/tests/connection_manager.spec.ts b/packages/tests/tests/connection_manager.spec.ts index 21989689e0..170c72da3f 100644 --- a/packages/tests/tests/connection_manager.spec.ts +++ b/packages/tests/tests/connection_manager.spec.ts @@ -24,7 +24,9 @@ describe("ConnectionManager", function () { let waku: LightNode; beforeEach(async function () { - waku = await createLightNode(); + waku = await createLightNode({ + shardInfo: { shards: [0] } + }); }); afterEach(async () => { @@ -271,7 +273,7 @@ describe("ConnectionManager", function () { this.beforeEach(async function () { this.timeout(15000); - waku = await createLightNode(); + waku = await createLightNode({ shardInfo: { shards: [0] } }); isPeerTopicConfigured = sinon.stub( waku.connectionManager as any, "isPeerTopicConfigured" diff --git a/packages/tests/tests/getPeers.spec.ts b/packages/tests/tests/getPeers.spec.ts index 7103aac760..638107ff3c 100644 --- a/packages/tests/tests/getPeers.spec.ts +++ b/packages/tests/tests/getPeers.spec.ts @@ -63,7 +63,7 @@ describe("getConnectedPeersForProtocolAndShard", function () { waku.libp2p.getConnections(), waku.libp2p.peerStore, waku.libp2p.getProtocols(), - shardInfo + ensureShardingConfigured(shardInfo).shardInfo ); expect(peers.length).to.be.greaterThan(0); }); diff --git a/packages/tests/tests/peer_exchange.node.spec.ts b/packages/tests/tests/peer_exchange.node.spec.ts index fb7f3b52ee..be8e4fa442 100644 --- a/packages/tests/tests/peer_exchange.node.spec.ts +++ b/packages/tests/tests/peer_exchange.node.spec.ts @@ -7,7 +7,12 @@ import { PeerExchangeDiscovery, WakuPeerExchange } from "@waku/peer-exchange"; -import { createLightNode, Libp2pComponents } from "@waku/sdk"; +import { + createLightNode, + DEFAULT_CLUSTER_ID, + DefaultPubsubTopic, + Libp2pComponents +} from "@waku/sdk"; import { expect } from "chai"; import { @@ -34,13 +39,14 @@ describe("Peer Exchange", () => { await tearDownNodes([nwaku1, nwaku2], waku); }); - it("nwaku interop", async function () { + it.skip("nwaku interop", async function () { this.timeout(55_000); await nwaku1.start({ relay: true, discv5Discovery: true, - peerExchange: true + peerExchange: true, + clusterId: DEFAULT_CLUSTER_ID }); const enr = (await nwaku1.info()).enrUri; @@ -49,20 +55,23 @@ describe("Peer Exchange", () => { relay: true, discv5Discovery: true, peerExchange: true, - discv5BootstrapNode: enr + discv5BootstrapNode: enr, + clusterId: DEFAULT_CLUSTER_ID }); const nwaku1PeerId = await nwaku1.getPeerId(); const nwaku2PeerId = await nwaku2.getPeerId(); const nwaku2Ma = await nwaku2.getMultiaddrWithId(); - waku = await createLightNode(); + waku = await createLightNode({ shardInfo: { shards: [0] } }); await waku.start(); await waku.libp2p.dialProtocol(nwaku2Ma, PeerExchangeCodec); await waitForRemotePeerWithCodec(waku, PeerExchangeCodec, nwaku2PeerId); const components = waku.libp2p.components as unknown as Libp2pComponents; - const peerExchange = new WakuPeerExchange(components); + const peerExchange = new WakuPeerExchange(components, [ + DefaultPubsubTopic + ]); const numPeersToRequest = 1; @@ -149,7 +158,9 @@ describe("Peer Exchange", () => { void waku.libp2p.dialProtocol(nwaku2Ma, PeerExchangeCodec); }, 1000); - return new PeerExchangeDiscovery(waku.libp2p.components); + return new PeerExchangeDiscovery(waku.libp2p.components, [ + DefaultPubsubTopic + ]); }, teardown: async () => { this.timeout(15000); diff --git a/packages/tests/tests/peer_exchange.optional.spec.ts b/packages/tests/tests/peer_exchange.optional.spec.ts index 6c940c252f..f61df4a04e 100644 --- a/packages/tests/tests/peer_exchange.optional.spec.ts +++ b/packages/tests/tests/peer_exchange.optional.spec.ts @@ -5,7 +5,7 @@ import { } from "@waku/core/lib/predefined_bootstrap_nodes"; import type { LightNode } from "@waku/interfaces"; import { wakuPeerExchangeDiscovery } from "@waku/peer-exchange"; -import { createLightNode } from "@waku/sdk"; +import { createLightNode, DefaultPubsubTopic } from "@waku/sdk"; import { expect } from "chai"; import { tearDownNodes } from "../src"; @@ -33,7 +33,7 @@ describe("Peer Exchange", () => { libp2p: { peerDiscovery: [ bootstrap({ list: predefinedNodes }), - wakuPeerExchangeDiscovery() + wakuPeerExchangeDiscovery([DefaultPubsubTopic]) ] } }); diff --git a/packages/tests/tests/sharding/peer_management.spec.ts b/packages/tests/tests/sharding/peer_management.spec.ts index 1a461b6553..4a8f69547e 100644 --- a/packages/tests/tests/sharding/peer_management.spec.ts +++ b/packages/tests/tests/sharding/peer_management.spec.ts @@ -88,7 +88,7 @@ describe("Static Sharding: Peer Management", function () { libp2p: { peerDiscovery: [ bootstrap({ list: [nwaku3Ma.toString()] }), - wakuPeerExchangeDiscovery() + wakuPeerExchangeDiscovery(pubsubTopics) ] } }); @@ -163,7 +163,7 @@ describe("Static Sharding: Peer Management", function () { libp2p: { peerDiscovery: [ bootstrap({ list: [nwaku3Ma.toString()] }), - wakuPeerExchangeDiscovery() + wakuPeerExchangeDiscovery(pubsubTopicsToDial) ] } }); @@ -262,7 +262,7 @@ describe("Autosharding: Peer Management", function () { libp2p: { peerDiscovery: [ bootstrap({ list: [nwaku3Ma.toString()] }), - wakuPeerExchangeDiscovery() + wakuPeerExchangeDiscovery(pubsubTopics) ] } }); @@ -336,7 +336,7 @@ describe("Autosharding: Peer Management", function () { libp2p: { peerDiscovery: [ bootstrap({ list: [nwaku3Ma.toString()] }), - wakuPeerExchangeDiscovery() + wakuPeerExchangeDiscovery(pubsubTopicsToDial) ] } }); diff --git a/packages/tests/tests/sharding/running_nodes.spec.ts b/packages/tests/tests/sharding/running_nodes.spec.ts index 2ea82e6ff2..01558717dd 100644 --- a/packages/tests/tests/sharding/running_nodes.spec.ts +++ b/packages/tests/tests/sharding/running_nodes.spec.ts @@ -131,7 +131,7 @@ describe("Autosharding: Running Nodes", () => { this.timeout(15_000); waku = await createLightNode({ shardInfo: { - ...shardInfoBothShards, + clusterId: 0, // For autosharding, we configure multiple pubsub topics by using two content topics that hash to different shards contentTopics: [ContentTopic, ContentTopic2] } diff --git a/packages/utils/src/common/sharding.ts b/packages/utils/src/common/sharding.ts index 6e2b6de68b..236ebc0f1f 100644 --- a/packages/utils/src/common/sharding.ts +++ b/packages/utils/src/common/sharding.ts @@ -280,9 +280,7 @@ export const ensureShardingConfigured = ( ) ); const shards = Array.from( - new Set( - contentTopics.map((topic) => contentTopicToShardIndex(topic, clusterId)) - ) + new Set(contentTopics.map((topic) => contentTopicToShardIndex(topic))) ); return { shardingParams: { clusterId, contentTopics },