From 69406bf1f0adc6af1f47089ab8acb7feb10a1e23 Mon Sep 17 00:00:00 2001 From: Arseniy Klempner Date: Tue, 9 Jan 2024 23:34:30 -0800 Subject: [PATCH] reintroduce and deprecate named sharding (#1751) Co-authored-by: danisharora099 --- packages/core/src/lib/base_protocol.ts | 15 +- packages/core/src/lib/filter/index.ts | 11 +- packages/core/src/lib/light_push/index.ts | 2 +- packages/core/src/lib/message/version_0.ts | 5 +- packages/core/src/lib/store/index.ts | 2 +- packages/core/src/lib/waku.ts | 4 +- packages/interfaces/src/filter.ts | 4 +- packages/interfaces/src/message.ts | 4 + packages/interfaces/src/protocols.ts | 14 +- packages/message-encryption/src/ecies.ts | 28 +- packages/message-encryption/src/symmetric.ts | 24 +- packages/relay/src/index.ts | 10 +- packages/sdk/src/create.ts | 81 +++++- .../tests/filter/multiple_pubsub.node.spec.ts | 151 ++++++++++ packages/tests/tests/filter/utils.ts | 1 + .../light-push/multiple_pubsub.node.spec.ts | 169 ++++++++++- packages/tests/tests/light-push/utils.ts | 1 + .../tests/relay/multiple_pubsub.node.spec.ts | 273 ++++++++++++++++++ .../tests/tests/store/multiple_pubsub.spec.ts | 160 ++++++++++ packages/tests/tests/store/utils.ts | 1 + packages/utils/src/common/sharding.spec.ts | 11 + packages/utils/src/common/sharding.ts | 48 ++- 22 files changed, 949 insertions(+), 70 deletions(-) diff --git a/packages/core/src/lib/base_protocol.ts b/packages/core/src/lib/base_protocol.ts index b1f4620cc2..bed4d8fc89 100644 --- a/packages/core/src/lib/base_protocol.ts +++ b/packages/core/src/lib/base_protocol.ts @@ -5,8 +5,8 @@ import { Peer, PeerStore } from "@libp2p/interface/peer-store"; import type { IBaseProtocol, Libp2pComponents, - PubsubTopic, - ShardingParams + ProtocolCreateOptions, + PubsubTopic } from "@waku/interfaces"; import { DefaultPubsubTopic } from "@waku/interfaces"; import { shardInfoToPubsubTopics } from "@waku/utils"; @@ -103,9 +103,12 @@ export class BaseProtocol implements IBaseProtocol { return filterPeers(allPeersForProtocol, numPeers, maxBootstrapPeers); } - initializePubsubTopic(shardInfo?: ShardingParams): PubsubTopic[] { - return shardInfo - ? shardInfoToPubsubTopics(shardInfo) - : [DefaultPubsubTopic]; + initializePubsubTopic(options?: ProtocolCreateOptions): PubsubTopic[] { + return ( + options?.pubsubTopics ?? + (options?.shardInfo + ? shardInfoToPubsubTopics(options.shardInfo) + : [DefaultPubsubTopic]) + ); } } diff --git a/packages/core/src/lib/filter/index.ts b/packages/core/src/lib/filter/index.ts index c54d1df39a..06caa77b51 100644 --- a/packages/core/src/lib/filter/index.ts +++ b/packages/core/src/lib/filter/index.ts @@ -281,7 +281,7 @@ class Filter extends BaseProtocol implements IReceiver { constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) { super(FilterCodecs.SUBSCRIBE, libp2p.components); - this.pubsubTopics = this.initializePubsubTopic(options?.shardInfo); + this.pubsubTopics = this.initializePubsubTopic(options); libp2p.handle(FilterCodecs.PUSH, this.onRequest.bind(this)).catch((e) => { log.error("Failed to register ", FilterCodecs.PUSH, e); @@ -291,11 +291,12 @@ class Filter extends BaseProtocol implements IReceiver { } async createSubscription( - pubsubTopicShardInfo?: SingleShardInfo + pubsubTopicShardInfo: SingleShardInfo | PubsubTopic = DefaultPubsubTopic ): Promise { - const pubsubTopic = pubsubTopicShardInfo - ? singleShardInfoToPubsubTopic(pubsubTopicShardInfo) - : DefaultPubsubTopic; + const pubsubTopic = + typeof pubsubTopicShardInfo == "string" + ? pubsubTopicShardInfo + : singleShardInfoToPubsubTopic(pubsubTopicShardInfo); ensurePubsubTopicIsConfigured(pubsubTopic, this.pubsubTopics); diff --git a/packages/core/src/lib/light_push/index.ts b/packages/core/src/lib/light_push/index.ts index 9d052b5b02..773e033565 100644 --- a/packages/core/src/lib/light_push/index.ts +++ b/packages/core/src/lib/light_push/index.ts @@ -49,7 +49,7 @@ class LightPush extends BaseProtocol implements ILightPush { constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) { super(LightPushCodec, libp2p.components); - this.pubsubTopics = this.initializePubsubTopic(options?.shardInfo); + this.pubsubTopics = this.initializePubsubTopic(options); } private async preparePushMessage( diff --git a/packages/core/src/lib/message/version_0.ts b/packages/core/src/lib/message/version_0.ts index a55b82059f..d90335dc27 100644 --- a/packages/core/src/lib/message/version_0.ts +++ b/packages/core/src/lib/message/version_0.ts @@ -118,6 +118,7 @@ export class Encoder implements IEncoder { * messages. */ export function createEncoder({ + pubsubTopic, pubsubTopicShardInfo, contentTopic, ephemeral, @@ -126,7 +127,7 @@ export function createEncoder({ return new Encoder( contentTopic, ephemeral, - determinePubsubTopic(contentTopic, pubsubTopicShardInfo), + determinePubsubTopic(contentTopic, pubsubTopic ?? pubsubTopicShardInfo), metaSetter ); } @@ -186,7 +187,7 @@ export class Decoder implements IDecoder { */ export function createDecoder( contentTopic: string, - pubsubTopicShardInfo?: SingleShardInfo + pubsubTopicShardInfo?: SingleShardInfo | PubsubTopic ): Decoder { return new Decoder( determinePubsubTopic(contentTopic, pubsubTopicShardInfo), diff --git a/packages/core/src/lib/store/index.ts b/packages/core/src/lib/store/index.ts index 53fb0b0d19..1e06533311 100644 --- a/packages/core/src/lib/store/index.ts +++ b/packages/core/src/lib/store/index.ts @@ -79,7 +79,7 @@ class Store extends BaseProtocol implements IStore { constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) { super(StoreCodec, libp2p.components); - this.pubsubTopics = this.initializePubsubTopic(options?.shardInfo); + this.pubsubTopics = this.initializePubsubTopic(options); } /** diff --git a/packages/core/src/lib/waku.ts b/packages/core/src/lib/waku.ts index c452cf6db6..1fc3f35643 100644 --- a/packages/core/src/lib/waku.ts +++ b/packages/core/src/lib/waku.ts @@ -55,6 +55,7 @@ export class WakuNode implements Waku { constructor( options: WakuOptions, + pubsubTopics: PubsubTopic[] = [], libp2p: Libp2p, pubsubShardInfo?: ShardingParams, store?: (libp2p: Libp2p) => IStore, @@ -63,7 +64,8 @@ export class WakuNode implements Waku { relay?: (libp2p: Libp2p) => IRelay ) { if (!pubsubShardInfo) { - this.pubsubTopics = [DefaultPubsubTopic]; + this.pubsubTopics = + pubsubTopics.length > 0 ? pubsubTopics : [DefaultPubsubTopic]; } else { this.pubsubTopics = shardInfoToPubsubTopics(pubsubShardInfo); } diff --git a/packages/interfaces/src/filter.ts b/packages/interfaces/src/filter.ts index a5d6798858..b6c342f32f 100644 --- a/packages/interfaces/src/filter.ts +++ b/packages/interfaces/src/filter.ts @@ -1,7 +1,7 @@ import type { PeerId } from "@libp2p/interface/peer-id"; import type { IDecodedMessage, IDecoder, SingleShardInfo } from "./message.js"; -import type { ContentTopic } from "./misc.js"; +import type { ContentTopic, PubsubTopic } from "./misc.js"; import type { Callback, IBaseProtocol } from "./protocols.js"; import type { IReceiver } from "./receiver.js"; @@ -25,7 +25,7 @@ export interface IFilterSubscription { export type IFilter = IReceiver & IBaseProtocol & { createSubscription( - pubsubTopicShardInfo?: SingleShardInfo, + pubsubTopicShardInfo?: SingleShardInfo | PubsubTopic, peerId?: PeerId ): Promise; }; diff --git a/packages/interfaces/src/message.ts b/packages/interfaces/src/message.ts index 1a4dedeac9..1c8348239e 100644 --- a/packages/interfaces/src/message.ts +++ b/packages/interfaces/src/message.ts @@ -46,6 +46,10 @@ export interface IMetaSetter { } export interface EncoderOptions { + /** + * @deprecated + */ + pubsubTopic?: PubsubTopic; pubsubTopicShardInfo?: SingleShardInfo; /** The content topic to set on outgoing messages. */ contentTopic: string; diff --git a/packages/interfaces/src/protocols.ts b/packages/interfaces/src/protocols.ts index 33650580da..c7c223c6b4 100644 --- a/packages/interfaces/src/protocols.ts +++ b/packages/interfaces/src/protocols.ts @@ -5,6 +5,7 @@ import type { Peer, PeerStore } from "@libp2p/interface/peer-store"; import type { ShardInfo } from "./enr.js"; import type { CreateLibp2pOptions } from "./libp2p.js"; import type { IDecodedMessage } from "./message.js"; +import { PubsubTopic } from "./misc.js"; export enum Protocols { Relay = "relay", @@ -26,9 +27,20 @@ export type ContentTopicInfo = { contentTopics: string[]; }; -export type ShardingParams = ShardInfo | ContentTopicInfo; +export type ApplicationInfo = { + clusterId: number; + application: string; + version: string; +}; + +export type ShardingParams = ShardInfo | ContentTopicInfo | ApplicationInfo; export type ProtocolCreateOptions = { + /** + * @deprecated + * Waku will stop supporting named sharding. Only static sharding and autosharding will be supported moving forward. + */ + pubsubTopics?: PubsubTopic[]; /** * Waku supports usage of multiple pubsub topics. This is achieved through static sharding for now, and auto-sharding in the future. * The format to specify a shard is: diff --git a/packages/message-encryption/src/ecies.ts b/packages/message-encryption/src/ecies.ts index 6a892034eb..31c9069656 100644 --- a/packages/message-encryption/src/ecies.ts +++ b/packages/message-encryption/src/ecies.ts @@ -1,13 +1,14 @@ import { Decoder as DecoderV0 } from "@waku/core/lib/message/version_0"; -import type { - EncoderOptions as BaseEncoderOptions, - IDecoder, - IEncoder, - IMessage, - IMetaSetter, - IProtoMessage, - PubsubTopic, - SingleShardInfo +import { + type EncoderOptions as BaseEncoderOptions, + DefaultPubsubTopic, + type IDecoder, + type IEncoder, + type IMessage, + type IMetaSetter, + type IProtoMessage, + type PubsubTopic, + type SingleShardInfo } from "@waku/interfaces"; import { WakuMessage } from "@waku/proto"; import { determinePubsubTopic, Logger } from "@waku/utils"; @@ -79,6 +80,10 @@ class Encoder implements IEncoder { } export interface EncoderOptions extends BaseEncoderOptions { + /** + * @deprecated + */ + pubsubTopic?: PubsubTopic; /** The public key to encrypt the payload for. */ publicKey: Uint8Array; /** An optional private key to be used to sign the payload before encryption. */ @@ -98,6 +103,7 @@ export interface EncoderOptions extends BaseEncoderOptions { * in [26/WAKU2-PAYLOAD](https://rfc.vac.dev/spec/26/). */ export function createEncoder({ + pubsubTopic, pubsubTopicShardInfo, contentTopic, publicKey, @@ -106,7 +112,7 @@ export function createEncoder({ metaSetter }: EncoderOptions): Encoder { return new Encoder( - determinePubsubTopic(contentTopic, pubsubTopicShardInfo), + determinePubsubTopic(contentTopic, pubsubTopic ?? pubsubTopicShardInfo), contentTopic, publicKey, sigPrivKey, @@ -194,7 +200,7 @@ class Decoder extends DecoderV0 implements IDecoder { export function createDecoder( contentTopic: string, privateKey: Uint8Array, - pubsubTopicShardInfo?: SingleShardInfo + pubsubTopicShardInfo: SingleShardInfo | PubsubTopic = DefaultPubsubTopic ): Decoder { return new Decoder( determinePubsubTopic(contentTopic, pubsubTopicShardInfo), diff --git a/packages/message-encryption/src/symmetric.ts b/packages/message-encryption/src/symmetric.ts index 841784db55..60011b8e36 100644 --- a/packages/message-encryption/src/symmetric.ts +++ b/packages/message-encryption/src/symmetric.ts @@ -1,13 +1,14 @@ import { Decoder as DecoderV0 } from "@waku/core/lib/message/version_0"; -import type { - EncoderOptions as BaseEncoderOptions, - IDecoder, - IEncoder, - IMessage, - IMetaSetter, - IProtoMessage, - PubsubTopic, - SingleShardInfo +import { + type EncoderOptions as BaseEncoderOptions, + DefaultPubsubTopic, + type IDecoder, + type IEncoder, + type IMessage, + type IMetaSetter, + type IProtoMessage, + type PubsubTopic, + type SingleShardInfo } from "@waku/interfaces"; import { WakuMessage } from "@waku/proto"; import { determinePubsubTopic, Logger } from "@waku/utils"; @@ -98,6 +99,7 @@ export interface EncoderOptions extends BaseEncoderOptions { * in [26/WAKU2-PAYLOAD](https://rfc.vac.dev/spec/26/). */ export function createEncoder({ + pubsubTopic = DefaultPubsubTopic, pubsubTopicShardInfo, contentTopic, symKey, @@ -106,7 +108,7 @@ export function createEncoder({ metaSetter }: EncoderOptions): Encoder { return new Encoder( - determinePubsubTopic(contentTopic, pubsubTopicShardInfo), + determinePubsubTopic(contentTopic, pubsubTopic ?? pubsubTopicShardInfo), contentTopic, symKey, sigPrivKey, @@ -194,7 +196,7 @@ class Decoder extends DecoderV0 implements IDecoder { export function createDecoder( contentTopic: string, symKey: Uint8Array, - pubsubTopicShardInfo?: SingleShardInfo + pubsubTopicShardInfo: SingleShardInfo | PubsubTopic = DefaultPubsubTopic ): Decoder { return new Decoder( determinePubsubTopic(contentTopic, pubsubTopicShardInfo), diff --git a/packages/relay/src/index.ts b/packages/relay/src/index.ts index 26b5055722..682f028e61 100644 --- a/packages/relay/src/index.ts +++ b/packages/relay/src/index.ts @@ -9,10 +9,10 @@ import { SignaturePolicy } from "@chainsafe/libp2p-gossipsub/types"; import type { PeerId } from "@libp2p/interface/peer-id"; import type { PubSub as Libp2pPubsub } from "@libp2p/interface/pubsub"; import { sha256 } from "@noble/hashes/sha256"; -import { DefaultPubsubTopic } from "@waku/interfaces"; import { ActiveSubscriptions, Callback, + DefaultPubsubTopic, IAsyncIterator, IDecodedMessage, IDecoder, @@ -72,9 +72,11 @@ class Relay implements IRelay { } this.gossipSub = libp2p.services.pubsub as GossipSub; - this.pubsubTopics = options?.shardInfo - ? new Set(shardInfoToPubsubTopics(options.shardInfo)) - : new Set([DefaultPubsubTopic]); + this.pubsubTopics = new Set( + options?.shardInfo + ? shardInfoToPubsubTopics(options.shardInfo) + : options?.pubsubTopics ?? [DefaultPubsubTopic] + ); if (this.gossipSub.isStarted()) { this.subscribeToAllTopics(); diff --git a/packages/sdk/src/create.ts b/packages/sdk/src/create.ts index 01d254f0ca..63e33ee482 100644 --- a/packages/sdk/src/create.ts +++ b/packages/sdk/src/create.ts @@ -39,16 +39,31 @@ const DEFAULT_NODE_REQUIREMENTS = { export { Libp2pComponents }; +const ensureShardingConfigured = (shardInfo: ShardingParams): void => { + if ( + ("shards" in shardInfo && shardInfo.shards.length < 1) || + ("contentTopics" in shardInfo && shardInfo.contentTopics.length < 1) + ) { + throw new Error( + "Missing required configuration options for static sharding or autosharding." + ); + } +}; + /** - * Create a Waku node that uses Waku Light Push, Filter and Store to send and - * receive messages, enabling low resource consumption. - * Uses Waku Filter V2 by default. + * Create a Waku node configured to use autosharding or static sharding. */ -export async function createLightNode( - options?: ProtocolCreateOptions & WakuOptions +export async function createNode( + options?: ProtocolCreateOptions & WakuOptions & Partial ): Promise { options = options ?? {}; + if (!options.shardInfo) { + throw new Error("Shard info must be set"); + } + + ensureShardingConfigured(options.shardInfo); + const libp2pOptions = options?.libp2p ?? {}; const peerDiscovery = libp2pOptions.peerDiscovery ?? []; if (options?.defaultBootstrap) { @@ -57,8 +72,8 @@ export async function createLightNode( } const libp2p = await defaultLibp2p( - options.shardInfo, undefined, + wakuGossipSub(options), libp2pOptions, options?.userAgent ); @@ -69,6 +84,50 @@ export async function createLightNode( return new WakuNode( options ?? {}, + [], + libp2p, + options.shardInfo, + store, + lightPush, + filter + ) as LightNode; +} + +/** + * Create a Waku node that uses Waku Light Push, Filter and Store to send and + * receive messages, enabling low resource consumption. + * Uses Waku Filter V2 by default. + */ +export async function createLightNode( + options?: ProtocolCreateOptions & WakuOptions +): Promise { + options = options ?? {}; + + if (options.shardInfo) { + ensureShardingConfigured(options.shardInfo); + } + + const libp2pOptions = options?.libp2p ?? {}; + const peerDiscovery = libp2pOptions.peerDiscovery ?? []; + if (options?.defaultBootstrap) { + peerDiscovery.push(...defaultPeerDiscoveries()); + Object.assign(libp2pOptions, { peerDiscovery }); + } + + const libp2p = await defaultLibp2p( + options.shardInfo, + wakuGossipSub(options), + libp2pOptions, + options?.userAgent + ); + + const store = wakuStore(options); + const lightPush = wakuLightPush(options); + const filter = wakuFilter(options); + + return new WakuNode( + options ?? {}, + options.pubsubTopics, libp2p, options.shardInfo, store, @@ -86,6 +145,10 @@ export async function createRelayNode( ): Promise { options = options ?? {}; + if (options.shardInfo) { + ensureShardingConfigured(options.shardInfo); + } + const libp2pOptions = options?.libp2p ?? {}; const peerDiscovery = libp2pOptions.peerDiscovery ?? []; if (options?.defaultBootstrap) { @@ -104,6 +167,7 @@ export async function createRelayNode( return new WakuNode( options, + options.pubsubTopics, libp2p, options.shardInfo, undefined, @@ -131,6 +195,10 @@ export async function createFullNode( ): Promise { options = options ?? {}; + if (options.shardInfo) { + ensureShardingConfigured(options.shardInfo); + } + const libp2pOptions = options?.libp2p ?? {}; const peerDiscovery = libp2pOptions.peerDiscovery ?? []; if (options?.defaultBootstrap) { @@ -152,6 +220,7 @@ export async function createFullNode( return new WakuNode( options ?? {}, + options.pubsubTopics, libp2p, options.shardInfo, store, diff --git a/packages/tests/tests/filter/multiple_pubsub.node.spec.ts b/packages/tests/tests/filter/multiple_pubsub.node.spec.ts index 9875e12d49..ba43471fbe 100644 --- a/packages/tests/tests/filter/multiple_pubsub.node.spec.ts +++ b/packages/tests/tests/filter/multiple_pubsub.node.spec.ts @@ -354,3 +354,154 @@ describe("Waku Filter V2 (Autosharding): Multiple PubsubTopics", function () { } }); }); + +describe("Waku Filter V2 (Named sharding): Multiple PubsubTopics", function () { + // Set the timeout for all tests in this suite. Can be overwritten at test level + this.timeout(30000); + let waku: LightNode; + let nwaku: NimGoNode; + let nwaku2: NimGoNode; + let subscription: IFilterSubscription; + let messageCollector: MessageCollector; + + const customPubsubTopic1 = singleShardInfoToPubsubTopic({ + clusterId: 3, + shard: 1 + }); + const customPubsubTopic2 = singleShardInfoToPubsubTopic({ + clusterId: 3, + shard: 2 + }); + const customContentTopic1 = "/test/2/waku-filter"; + const customContentTopic2 = "/test/3/waku-filter"; + const customEncoder1 = createEncoder({ + pubsubTopic: customPubsubTopic1, + contentTopic: customContentTopic1 + }); + const customDecoder1 = createDecoder(customContentTopic1, customPubsubTopic1); + const customEncoder2 = createEncoder({ + pubsubTopic: customPubsubTopic2, + contentTopic: customContentTopic2 + }); + const customDecoder2 = createDecoder(customContentTopic2, customPubsubTopic2); + + this.beforeEach(async function () { + this.timeout(15000); + [nwaku, waku] = await runNodes(this, [ + customPubsubTopic1, + customPubsubTopic2 + ]); + subscription = await waku.filter.createSubscription(customPubsubTopic1); + messageCollector = new MessageCollector(); + }); + + this.afterEach(async function () { + this.timeout(15000); + await tearDownNodes([nwaku, nwaku2], waku); + }); + + it("Subscribe and receive messages on custom pubsubtopic", async function () { + await subscription.subscribe([customDecoder1], messageCollector.callback); + await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") }); + expect(await messageCollector.waitForMessages(1)).to.eq(true); + messageCollector.verifyReceivedMessage(0, { + expectedContentTopic: customContentTopic1, + expectedPubsubTopic: customPubsubTopic1, + expectedMessageText: "M1" + }); + }); + + it("Subscribe and receive messages on 2 different pubsubtopics", async function () { + await subscription.subscribe([customDecoder1], messageCollector.callback); + + // Subscribe from the same lightnode to the 2nd pubsubtopic + const subscription2 = await waku.filter.createSubscription( + pubsubTopicToSingleShardInfo(customPubsubTopic2) + ); + + const messageCollector2 = new MessageCollector(); + + await subscription2.subscribe([customDecoder2], messageCollector2.callback); + + await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") }); + await waku.lightPush.send(customEncoder2, { payload: utf8ToBytes("M2") }); + + expect(await messageCollector.waitForMessages(1)).to.eq(true); + expect(await messageCollector2.waitForMessages(1)).to.eq(true); + + messageCollector.verifyReceivedMessage(0, { + expectedContentTopic: customContentTopic1, + expectedPubsubTopic: customPubsubTopic1, + expectedMessageText: "M1" + }); + + messageCollector2.verifyReceivedMessage(0, { + expectedContentTopic: customContentTopic2, + expectedPubsubTopic: customPubsubTopic2, + expectedMessageText: "M2" + }); + }); + + it("Subscribe and receive messages from 2 nwaku nodes each with different pubsubtopics", async function () { + await subscription.subscribe([customDecoder1], messageCollector.callback); + + // Set up and start a new nwaku node with customPubsubTopic1 + nwaku2 = new NimGoNode(makeLogFileName(this) + "2"); + await nwaku2.start({ + filter: true, + lightpush: true, + relay: true, + pubsubTopic: [customPubsubTopic2] + }); + await waku.dial(await nwaku2.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]); + + // Subscribe from the same lightnode to the new nwaku on the new pubsubtopic + const subscription2 = await waku.filter.createSubscription( + pubsubTopicToSingleShardInfo(customPubsubTopic2), + await nwaku2.getPeerId() + ); + await nwaku2.ensureSubscriptions([customPubsubTopic2]); + + const messageCollector2 = new MessageCollector(); + + await subscription2.subscribe([customDecoder2], messageCollector2.callback); + + // Making sure that messages are send and reveiced for both subscriptions + // While loop is done because of https://github.com/waku-org/js-waku/issues/1606 + while ( + !(await messageCollector.waitForMessages(1, { + pubsubTopic: customPubsubTopic1 + })) || + !(await messageCollector2.waitForMessages(1, { + pubsubTopic: customPubsubTopic2 + })) + ) { + await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") }); + await waku.lightPush.send(customEncoder2, { payload: utf8ToBytes("M2") }); + } + + messageCollector.verifyReceivedMessage(0, { + expectedContentTopic: customContentTopic1, + expectedPubsubTopic: customPubsubTopic1, + expectedMessageText: "M1" + }); + + messageCollector2.verifyReceivedMessage(0, { + expectedContentTopic: customContentTopic2, + expectedPubsubTopic: customPubsubTopic2, + expectedMessageText: "M2" + }); + }); + + it("Should fail to subscribe with decoder with wrong pubsubTopic", async function () { + // this subscription object is set up with the `customPubsubTopic` but we're passing it a Decoder with the `DefaultPubsubTopic` + try { + await subscription.subscribe([customDecoder2], messageCollector.callback); + } catch (error) { + expect((error as Error).message).to.include( + "Pubsub topic not configured" + ); + } + }); +}); diff --git a/packages/tests/tests/filter/utils.ts b/packages/tests/tests/filter/utils.ts index 2541184101..c7a929cfa2 100644 --- a/packages/tests/tests/filter/utils.ts +++ b/packages/tests/tests/filter/utils.ts @@ -63,6 +63,7 @@ export async function runNodes( const waku_options = { staticNoiseKey: NOISE_KEY_1, libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }, + pubsubTopics: shardInfo ? undefined : pubsubTopics, ...((pubsubTopics.length !== 1 || pubsubTopics[0] !== DefaultPubsubTopic) && { shardInfo: shardInfo diff --git a/packages/tests/tests/light-push/multiple_pubsub.node.spec.ts b/packages/tests/tests/light-push/multiple_pubsub.node.spec.ts index 32d7bad791..19c08ccf13 100644 --- a/packages/tests/tests/light-push/multiple_pubsub.node.spec.ts +++ b/packages/tests/tests/light-push/multiple_pubsub.node.spec.ts @@ -184,7 +184,7 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () { // When using lightpush, we have to use a cluster id of 1 because that is the default cluster id for autosharding // With a different cluster id, we never find a viable peer const clusterId = 1; - const customContentTopic1 = "/waku/2/content/utf8"; + const customContentTopic1 = "/waku/2/content/test.js"; const customContentTopic2 = "/myapp/1/latest/proto"; const autoshardingPubsubTopic1 = contentTopicToPubsubTopic( customContentTopic1, @@ -194,19 +194,17 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () { customContentTopic2, clusterId ); - const contentTopicInfo: ContentTopicInfo = { + const shardInfo: ContentTopicInfo = { clusterId, contentTopics: [customContentTopic1, customContentTopic2] }; const customEncoder1 = createEncoder({ contentTopic: customContentTopic1, - pubsubTopicShardInfo: { - clusterId - } + pubsubTopicShardInfo: shardInfo }); const customEncoder2 = createEncoder({ contentTopic: customContentTopic2, - pubsubTopicShardInfo: { clusterId } + pubsubTopicShardInfo: shardInfo }); let nimPeerId: PeerId; @@ -216,7 +214,7 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () { [nwaku, waku] = await runNodes( this, [autoshardingPubsubTopic1, autoshardingPubsubTopic2], - contentTopicInfo + shardInfo ); messageCollector = new MessageCollector(nwaku); nimPeerId = await nwaku.getPeerId(); @@ -331,3 +329,160 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () { }); }); }); + +describe("Waku Light Push (named sharding): Multiple PubsubTopics", function () { + this.timeout(30000); + let waku: LightNode; + let nwaku: NimGoNode; + let nwaku2: NimGoNode; + let messageCollector: MessageCollector; + + // When using lightpush, we have to use a cluster id of 1 because that is the default cluster id for autosharding + // With a different cluster id, we never find a viable peer + const clusterId = 1; + const customContentTopic1 = "/waku/2/content/utf8"; + const customContentTopic2 = "/myapp/1/latest/proto"; + const autoshardingPubsubTopic1 = contentTopicToPubsubTopic( + customContentTopic1, + clusterId + ); + const autoshardingPubsubTopic2 = contentTopicToPubsubTopic( + customContentTopic2, + clusterId + ); + const contentTopicInfo: ContentTopicInfo = { + clusterId, + contentTopics: [customContentTopic1, customContentTopic2] + }; + const customEncoder1 = createEncoder({ + contentTopic: customContentTopic1, + pubsubTopicShardInfo: { + clusterId + } + }); + const customEncoder2 = createEncoder({ + contentTopic: customContentTopic2, + pubsubTopicShardInfo: { clusterId } + }); + + let nimPeerId: PeerId; + + this.beforeEach(async function () { + this.timeout(15000); + [nwaku, waku] = await runNodes( + this, + [autoshardingPubsubTopic1, autoshardingPubsubTopic2], + contentTopicInfo + ); + messageCollector = new MessageCollector(nwaku); + nimPeerId = await nwaku.getPeerId(); + }); + + this.afterEach(async function () { + this.timeout(15000); + await tearDownNodes([nwaku, nwaku2], waku); + }); + + it("Push message on custom pubsubTopic", async function () { + const pushResponse = await waku.lightPush.send(customEncoder1, { + payload: utf8ToBytes(messageText) + }); + + expect(pushResponse.recipients[0].toString()).to.eq(nimPeerId.toString()); + + expect( + await messageCollector.waitForMessages(1, { + pubsubTopic: autoshardingPubsubTopic1 + }) + ).to.eq(true); + messageCollector.verifyReceivedMessage(0, { + expectedMessageText: messageText, + expectedContentTopic: customContentTopic1 + }); + }); + + it("Subscribe and receive messages on 2 different pubsubtopics", async function () { + const pushResponse1 = await waku.lightPush.send(customEncoder1, { + payload: utf8ToBytes("M1") + }); + const pushResponse2 = await waku.lightPush.send(customEncoder2, { + payload: utf8ToBytes("M2") + }); + expect(pushResponse1.recipients[0].toString()).to.eq(nimPeerId.toString()); + expect(pushResponse2.recipients[0].toString()).to.eq(nimPeerId.toString()); + + const messageCollector2 = new MessageCollector(nwaku); + + expect( + await messageCollector.waitForMessages(1, { + pubsubTopic: autoshardingPubsubTopic1 + }) + ).to.eq(true); + + expect( + await messageCollector2.waitForMessages(1, { + pubsubTopic: autoshardingPubsubTopic2 + }) + ).to.eq(true); + + messageCollector.verifyReceivedMessage(0, { + expectedMessageText: "M1", + expectedContentTopic: customContentTopic1, + expectedPubsubTopic: autoshardingPubsubTopic1 + }); + messageCollector2.verifyReceivedMessage(0, { + expectedMessageText: "M2", + expectedContentTopic: customContentTopic2, + expectedPubsubTopic: autoshardingPubsubTopic2 + }); + }); + + it("Light push messages to 2 nwaku nodes each with different pubsubtopics", async function () { + // Set up and start a new nwaku node with Default PubsubTopic + nwaku2 = new NimGoNode(makeLogFileName(this) + "2"); + await nwaku2.start({ + filter: true, + lightpush: true, + relay: true, + pubsubTopic: [autoshardingPubsubTopic2] + }); + await nwaku2.ensureSubscriptions([autoshardingPubsubTopic2]); + await waku.dial(await nwaku2.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.LightPush]); + + const messageCollector2 = new MessageCollector(nwaku2); + + let pushResponse1: SendResult; + let pushResponse2: SendResult; + // Making sure that we send messages to both nwaku nodes + // While loop is done because of https://github.com/waku-org/js-waku/issues/1606 + while ( + !(await messageCollector.waitForMessages(1, { + pubsubTopic: autoshardingPubsubTopic1 + })) || + !(await messageCollector2.waitForMessages(1, { + pubsubTopic: autoshardingPubsubTopic2 + })) || + pushResponse1!.recipients[0].toString() === + pushResponse2!.recipients[0].toString() + ) { + pushResponse1 = await waku.lightPush.send(customEncoder1, { + payload: utf8ToBytes("M1") + }); + pushResponse2 = await waku.lightPush.send(customEncoder2, { + payload: utf8ToBytes("M2") + }); + } + + messageCollector.verifyReceivedMessage(0, { + expectedMessageText: "M1", + expectedContentTopic: customContentTopic1, + expectedPubsubTopic: autoshardingPubsubTopic1 + }); + messageCollector2.verifyReceivedMessage(0, { + expectedMessageText: "M2", + expectedContentTopic: customContentTopic2, + expectedPubsubTopic: autoshardingPubsubTopic2 + }); + }); +}); diff --git a/packages/tests/tests/light-push/utils.ts b/packages/tests/tests/light-push/utils.ts index 635d8e3989..435b7b50d2 100644 --- a/packages/tests/tests/light-push/utils.ts +++ b/packages/tests/tests/light-push/utils.ts @@ -35,6 +35,7 @@ export async function runNodes( pubsubTopics[0] !== DefaultPubsubTopic) && { shardInfo: shardInfo }), + pubsubTopics: shardInfo ? undefined : pubsubTopics, staticNoiseKey: NOISE_KEY_1 }); await waku.start(); diff --git a/packages/tests/tests/relay/multiple_pubsub.node.spec.ts b/packages/tests/tests/relay/multiple_pubsub.node.spec.ts index f1cfaf886f..f070eef140 100644 --- a/packages/tests/tests/relay/multiple_pubsub.node.spec.ts +++ b/packages/tests/tests/relay/multiple_pubsub.node.spec.ts @@ -638,3 +638,276 @@ describe("Waku Relay (Autosharding), multiple pubsub topics", function () { expect(waku2ReceivedMsg.pubsubTopic).to.eq(autoshardingPubsubTopic1); }); }); + +describe("Waku Relay (named sharding), multiple pubsub topics", function () { + this.timeout(15000); + let waku1: RelayNode; + let waku2: RelayNode; + let waku3: RelayNode; + + const customPubsubTopic1 = singleShardInfoToPubsubTopic({ + clusterId: 3, + shard: 1 + }); + const customPubsubTopic2 = singleShardInfoToPubsubTopic({ + clusterId: 3, + shard: 2 + }); + const customContentTopic1 = "/test/2/waku-relay/utf8"; + const customContentTopic2 = "/test/3/waku-relay/utf8"; + const customEncoder1 = createEncoder({ + pubsubTopic: customPubsubTopic1, + contentTopic: customContentTopic1 + }); + const customDecoder1 = createDecoder(customContentTopic1, customPubsubTopic1); + const customEncoder2 = createEncoder({ + pubsubTopic: customPubsubTopic2, + contentTopic: customContentTopic2 + }); + const customDecoder2 = createDecoder(customContentTopic2, customPubsubTopic2); + + afterEach(async function () { + this.timeout(15000); + await tearDownNodes([], [waku1, waku2, waku3]); + }); + + [ + { + pubsub: customPubsubTopic1, + encoder: customEncoder1, + decoder: customDecoder1 + }, + { + pubsub: customPubsubTopic2, + encoder: customEncoder2, + decoder: customDecoder2 + } + ].forEach((testItem) => { + it(`3 nodes on ${testItem.pubsub} topic`, async function () { + const [msgCollector1, msgCollector2, msgCollector3] = Array(3) + .fill(null) + .map(() => new MessageCollector()); + + [waku1, waku2, waku3] = await Promise.all([ + createRelayNode({ + pubsubTopics: [testItem.pubsub], + staticNoiseKey: NOISE_KEY_1 + }).then((waku) => waku.start().then(() => waku)), + createRelayNode({ + pubsubTopics: [testItem.pubsub], + staticNoiseKey: NOISE_KEY_2, + libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } } + }).then((waku) => waku.start().then(() => waku)), + createRelayNode({ + pubsubTopics: [testItem.pubsub], + staticNoiseKey: NOISE_KEY_3 + }).then((waku) => waku.start().then(() => waku)) + ]); + + await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, { + multiaddrs: waku2.libp2p.getMultiaddrs() + }); + await waku3.libp2p.peerStore.merge(waku2.libp2p.peerId, { + multiaddrs: waku2.libp2p.getMultiaddrs() + }); + await Promise.all([ + waku1.dial(waku2.libp2p.peerId), + waku3.dial(waku2.libp2p.peerId) + ]); + + await Promise.all([ + waitForRemotePeer(waku1, [Protocols.Relay]), + waitForRemotePeer(waku2, [Protocols.Relay]), + waitForRemotePeer(waku3, [Protocols.Relay]) + ]); + + await waku1.relay.subscribe([testItem.decoder], msgCollector1.callback); + await waku2.relay.subscribe([testItem.decoder], msgCollector2.callback); + await waku3.relay.subscribe([testItem.decoder], msgCollector3.callback); + + // The nodes are setup in such a way that all messages send should be relayed to the other nodes in the network + const relayResponse1 = await waku1.relay.send(testItem.encoder, { + payload: utf8ToBytes("M1") + }); + const relayResponse2 = await waku2.relay.send(testItem.encoder, { + payload: utf8ToBytes("M2") + }); + const relayResponse3 = await waku3.relay.send(testItem.encoder, { + payload: utf8ToBytes("M3") + }); + + expect(relayResponse1.recipients[0].toString()).to.eq( + waku2.libp2p.peerId.toString() + ); + expect(relayResponse3.recipients[0].toString()).to.eq( + waku2.libp2p.peerId.toString() + ); + expect(relayResponse2.recipients.map((r) => r.toString())).to.include( + waku1.libp2p.peerId.toString() + ); + expect(relayResponse2.recipients.map((r) => r.toString())).to.include( + waku3.libp2p.peerId.toString() + ); + + expect(await msgCollector1.waitForMessages(2, { exact: true })).to.eq( + true + ); + expect(await msgCollector2.waitForMessages(2, { exact: true })).to.eq( + true + ); + expect(await msgCollector3.waitForMessages(2, { exact: true })).to.eq( + true + ); + + expect( + msgCollector1.hasMessage(testItem.encoder.contentTopic, "M2") + ).to.eq(true); + expect( + msgCollector1.hasMessage(testItem.encoder.contentTopic, "M3") + ).to.eq(true); + expect( + msgCollector2.hasMessage(testItem.encoder.contentTopic, "M1") + ).to.eq(true); + expect( + msgCollector2.hasMessage(testItem.encoder.contentTopic, "M3") + ).to.eq(true); + expect( + msgCollector3.hasMessage(testItem.encoder.contentTopic, "M1") + ).to.eq(true); + expect( + msgCollector3.hasMessage(testItem.encoder.contentTopic, "M2") + ).to.eq(true); + }); + }); + + it("Nodes with multiple pubsub topic", async function () { + const [msgCollector1, msgCollector2, msgCollector3] = Array(3) + .fill(null) + .map(() => new MessageCollector()); + + // Waku1 and waku2 are using multiple pubsub topis + [waku1, waku2, waku3] = await Promise.all([ + createRelayNode({ + pubsubTopics: [customPubsubTopic1, customPubsubTopic2], + staticNoiseKey: NOISE_KEY_1 + }).then((waku) => waku.start().then(() => waku)), + createRelayNode({ + pubsubTopics: [customPubsubTopic1, customPubsubTopic2], + staticNoiseKey: NOISE_KEY_2, + libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } } + }).then((waku) => waku.start().then(() => waku)), + createRelayNode({ + pubsubTopics: [customPubsubTopic1], + staticNoiseKey: NOISE_KEY_3 + }).then((waku) => waku.start().then(() => waku)) + ]); + + await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, { + multiaddrs: waku2.libp2p.getMultiaddrs() + }); + await waku3.libp2p.peerStore.merge(waku2.libp2p.peerId, { + multiaddrs: waku2.libp2p.getMultiaddrs() + }); + await Promise.all([ + waku1.dial(waku2.libp2p.peerId), + waku3.dial(waku2.libp2p.peerId) + ]); + + await Promise.all([ + waitForRemotePeer(waku1, [Protocols.Relay]), + waitForRemotePeer(waku2, [Protocols.Relay]), + waitForRemotePeer(waku3, [Protocols.Relay]) + ]); + + await waku1.relay.subscribe( + [customDecoder1, customDecoder2], + msgCollector1.callback + ); + await waku2.relay.subscribe( + [customDecoder1, customDecoder2], + msgCollector2.callback + ); + await waku3.relay.subscribe([customDecoder1], msgCollector3.callback); + + // The nodes are setup in such a way that all messages send should be relayed to the other nodes in the network + // However onlt waku1 and waku2 are receiving messages on the CustomPubSubTopic + await waku1.relay.send(customEncoder1, { payload: utf8ToBytes("M1") }); + await waku1.relay.send(customEncoder2, { payload: utf8ToBytes("M2") }); + await waku2.relay.send(customEncoder1, { payload: utf8ToBytes("M3") }); + await waku2.relay.send(customEncoder2, { payload: utf8ToBytes("M4") }); + await waku3.relay.send(customEncoder1, { payload: utf8ToBytes("M5") }); + await waku3.relay.send(customEncoder2, { payload: utf8ToBytes("M6") }); + + expect(await msgCollector1.waitForMessages(3, { exact: true })).to.eq(true); + expect(await msgCollector2.waitForMessages(3, { exact: true })).to.eq(true); + expect(await msgCollector3.waitForMessages(2, { exact: true })).to.eq(true); + expect(msgCollector1.hasMessage(customContentTopic1, "M3")).to.eq(true); + expect(msgCollector1.hasMessage(customContentTopic2, "M4")).to.eq(true); + expect(msgCollector1.hasMessage(customContentTopic1, "M5")).to.eq(true); + expect(msgCollector2.hasMessage(customContentTopic1, "M1")).to.eq(true); + expect(msgCollector2.hasMessage(customContentTopic2, "M2")).to.eq(true); + expect(msgCollector2.hasMessage(customContentTopic1, "M5")).to.eq(true); + expect(msgCollector3.hasMessage(customContentTopic1, "M1")).to.eq(true); + expect(msgCollector3.hasMessage(customContentTopic1, "M3")).to.eq(true); + }); + + it("n1 and n2 uses a custom pubsub, n3 uses the default pubsub", async function () { + [waku1, waku2, waku3] = await Promise.all([ + createRelayNode({ + pubsubTopics: [customPubsubTopic1], + staticNoiseKey: NOISE_KEY_1 + }).then((waku) => waku.start().then(() => waku)), + createRelayNode({ + pubsubTopics: [customPubsubTopic1], + staticNoiseKey: NOISE_KEY_2, + libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } } + }).then((waku) => waku.start().then(() => waku)), + createRelayNode({ + staticNoiseKey: NOISE_KEY_3 + }).then((waku) => waku.start().then(() => waku)) + ]); + + await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, { + multiaddrs: waku2.libp2p.getMultiaddrs() + }); + await waku3.libp2p.peerStore.merge(waku2.libp2p.peerId, { + multiaddrs: waku2.libp2p.getMultiaddrs() + }); + await Promise.all([ + waku1.dial(waku2.libp2p.peerId), + waku3.dial(waku2.libp2p.peerId) + ]); + + await Promise.all([ + waitForRemotePeer(waku1, [Protocols.Relay]), + waitForRemotePeer(waku2, [Protocols.Relay]) + ]); + + const messageText = "Communicating using a custom pubsub topic"; + + const waku2ReceivedMsgPromise: Promise = new Promise( + (resolve) => { + void waku2.relay.subscribe([customDecoder1], resolve); + } + ); + + // The promise **fails** if we receive a message on the default + // pubsub topic. + const waku3NoMsgPromise: Promise = new Promise( + (resolve, reject) => { + void waku3.relay.subscribe([TestDecoder], reject); + setTimeout(resolve, 1000); + } + ); + + await waku1.relay.send(customEncoder1, { + payload: utf8ToBytes(messageText) + }); + + const waku2ReceivedMsg = await waku2ReceivedMsgPromise; + await waku3NoMsgPromise; + + expect(bytesToUtf8(waku2ReceivedMsg.payload!)).to.eq(messageText); + expect(waku2ReceivedMsg.pubsubTopic).to.eq(customPubsubTopic1); + }); +}); diff --git a/packages/tests/tests/store/multiple_pubsub.spec.ts b/packages/tests/tests/store/multiple_pubsub.spec.ts index e8c3fc9665..74ad7219f1 100644 --- a/packages/tests/tests/store/multiple_pubsub.spec.ts +++ b/packages/tests/tests/store/multiple_pubsub.spec.ts @@ -321,3 +321,163 @@ describe("Waku Store (Autosharding), custom pubsub topic", function () { } }); }); + +describe("Waku Store (named sharding), custom pubsub topic", function () { + this.timeout(15000); + let waku: LightNode; + let nwaku: NimGoNode; + let nwaku2: NimGoNode; + + const customDecoder1 = createDecoder( + customContentTopic1, + customShardedPubsubTopic1 + ); + const customDecoder2 = createDecoder( + customContentTopic2, + customShardedPubsubTopic2 + ); + + beforeEach(async function () { + this.timeout(15000); + nwaku = new NimGoNode(makeLogFileName(this)); + await nwaku.start({ + store: true, + pubsubTopic: [customShardedPubsubTopic1, customShardedPubsubTopic2], + relay: true + }); + await nwaku.ensureSubscriptions([ + customShardedPubsubTopic1, + customShardedPubsubTopic2 + ]); + }); + + afterEach(async function () { + this.timeout(15000); + await tearDownNodes([nwaku, nwaku2], waku); + }); + + it("Generator, custom pubsub topic", async function () { + await sendMessages( + nwaku, + totalMsgs, + customContentTopic1, + customShardedPubsubTopic1 + ); + waku = await startAndConnectLightNode(nwaku, [ + customShardedPubsubTopic1, + customShardedPubsubTopic2 + ]); + const messages = await processQueriedMessages( + waku, + [customDecoder1], + customShardedPubsubTopic1 + ); + + expect(messages?.length).eq(totalMsgs); + const result = messages?.findIndex((msg) => { + return msg.payload![0]! === 0; + }); + expect(result).to.not.eq(-1); + }); + + it("Generator, 2 different pubsubtopics", async function () { + this.timeout(10000); + + const totalMsgs = 10; + await sendMessages( + nwaku, + totalMsgs, + customContentTopic1, + customShardedPubsubTopic1 + ); + await sendMessages( + nwaku, + totalMsgs, + customContentTopic2, + customShardedPubsubTopic2 + ); + + waku = await startAndConnectLightNode(nwaku, [ + customShardedPubsubTopic1, + customShardedPubsubTopic2 + ]); + + const customMessages = await processQueriedMessages( + waku, + [customDecoder1], + customShardedPubsubTopic1 + ); + expect(customMessages?.length).eq(totalMsgs); + const result1 = customMessages?.findIndex((msg) => { + return msg.payload![0]! === 0; + }); + expect(result1).to.not.eq(-1); + + const testMessages = await processQueriedMessages( + waku, + [customDecoder2], + customShardedPubsubTopic2 + ); + expect(testMessages?.length).eq(totalMsgs); + const result2 = testMessages?.findIndex((msg) => { + return msg.payload![0]! === 0; + }); + expect(result2).to.not.eq(-1); + }); + + it("Generator, 2 nwaku nodes each with different pubsubtopics", async function () { + this.timeout(10000); + + // Set up and start a new nwaku node with Default Pubsubtopic + nwaku2 = new NimGoNode(makeLogFileName(this) + "2"); + await nwaku2.start({ + store: true, + pubsubTopic: [customShardedPubsubTopic2], + relay: true + }); + await nwaku2.ensureSubscriptions([customShardedPubsubTopic2]); + + const totalMsgs = 10; + await sendMessages( + nwaku, + totalMsgs, + customContentTopic1, + customShardedPubsubTopic1 + ); + await sendMessages( + nwaku2, + totalMsgs, + customContentTopic2, + customShardedPubsubTopic2 + ); + + waku = await createLightNode({ + staticNoiseKey: NOISE_KEY_1, + pubsubTopics: [customShardedPubsubTopic1, customShardedPubsubTopic2] + }); + await waku.start(); + + await waku.dial(await nwaku.getMultiaddrWithId()); + await waku.dial(await nwaku2.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.Store]); + + let customMessages: IMessage[] = []; + let testMessages: IMessage[] = []; + + while ( + customMessages.length != totalMsgs || + testMessages.length != totalMsgs + ) { + customMessages = await processQueriedMessages( + waku, + [customDecoder1], + customShardedPubsubTopic1 + ); + testMessages = await processQueriedMessages( + waku, + [customDecoder2], + customShardedPubsubTopic2 + ); + } + }); +}); diff --git a/packages/tests/tests/store/utils.ts b/packages/tests/tests/store/utils.ts index 76ce57f09f..252df84cb0 100644 --- a/packages/tests/tests/store/utils.ts +++ b/packages/tests/tests/store/utils.ts @@ -111,6 +111,7 @@ export async function startAndConnectLightNode( pubsubTopics[0] !== DefaultPubsubTopic) && { shardInfo: shardInfo }), + pubsubTopics: shardInfo ? undefined : pubsubTopics, staticNoiseKey: NOISE_KEY_1 }); await waku.start(); diff --git a/packages/utils/src/common/sharding.spec.ts b/packages/utils/src/common/sharding.spec.ts index 06484615ac..ce1f0017b1 100644 --- a/packages/utils/src/common/sharding.spec.ts +++ b/packages/utils/src/common/sharding.spec.ts @@ -128,3 +128,14 @@ describe("contentTopicsByPubsubTopic", () => { } }); }); + +describe("contentTopicsByPubsubTopic", () => { + it("groups content topics by expected pubsub topic", () => { + const contentTopics = ["/toychat/2/huilong/proto", "/myapp/1/latest/proto"]; + const grouped = contentTopicsByPubsubTopic(contentTopics); + for (const contentTopic of contentTopics) { + const pubsubTopic = contentTopicToPubsubTopic(contentTopic); + expect(grouped.get(pubsubTopic)?.includes(contentTopic)).to.be.true; + } + }); +}); diff --git a/packages/utils/src/common/sharding.ts b/packages/utils/src/common/sharding.ts index f4a8afb71b..f6f3cd7b34 100644 --- a/packages/utils/src/common/sharding.ts +++ b/packages/utils/src/common/sharding.ts @@ -23,14 +23,31 @@ export const shardInfoToPubsubTopics = ( if (shardInfo.clusterId === undefined) throw new Error("Cluster ID must be specified"); if ("contentTopics" in shardInfo) { - return shardInfo.contentTopics.map((contentTopic) => - contentTopicToPubsubTopic(contentTopic, shardInfo.clusterId) + // Autosharding: explicitly defined content topics + return Array.from( + new Set( + shardInfo.contentTopics.map((contentTopic) => + contentTopicToPubsubTopic(contentTopic, shardInfo.clusterId) + ) + ) + ); + } else if ("shards" in shardInfo) { + // Static sharding + if (shardInfo.shards === undefined) throw new Error("Invalid shard"); + return Array.from( + new Set( + shardInfo.shards.map( + (index) => `/waku/2/rs/${shardInfo.clusterId}/${index}` + ) + ) ); } else { - if (shardInfo.shards === undefined) throw new Error("Invalid shard"); - return shardInfo.shards.map( - (index) => `/waku/2/rs/${shardInfo.clusterId}/${index}` - ); + // Autosharding: single shard from application and version + return [ + contentTopicToPubsubTopic( + `/${shardInfo.application}/${shardInfo.version}/default/default` + ) + ]; } }; @@ -183,11 +200,18 @@ export function contentTopicsByPubsubTopic( */ export function determinePubsubTopic( contentTopic: string, - pubsubTopicShardInfo?: SingleShardInfo + pubsubTopicShardInfo: SingleShardInfo | PubsubTopic = DefaultPubsubTopic ): string { - return pubsubTopicShardInfo - ? pubsubTopicShardInfo.shard - ? singleShardInfoToPubsubTopic(pubsubTopicShardInfo) - : contentTopicToPubsubTopic(contentTopic, pubsubTopicShardInfo.clusterId) - : DefaultPubsubTopic; + if (typeof pubsubTopicShardInfo == "string") { + return pubsubTopicShardInfo; + } else { + return pubsubTopicShardInfo + ? pubsubTopicShardInfo.shard + ? singleShardInfoToPubsubTopic(pubsubTopicShardInfo) + : contentTopicToPubsubTopic( + contentTopic, + pubsubTopicShardInfo.clusterId + ) + : DefaultPubsubTopic; + } }