diff --git a/packages/core/src/lib/base_protocol.ts b/packages/core/src/lib/base_protocol.ts index 1546fda480..f6fcd4663c 100644 --- a/packages/core/src/lib/base_protocol.ts +++ b/packages/core/src/lib/base_protocol.ts @@ -2,9 +2,16 @@ import type { Libp2p } from "@libp2p/interface"; import type { Stream } from "@libp2p/interface/connection"; import type { PeerId } from "@libp2p/interface/peer-id"; import { Peer, PeerStore } from "@libp2p/interface/peer-store"; -import type { IBaseProtocol, Libp2pComponents } from "@waku/interfaces"; +import type { + IBaseProtocol, + Libp2pComponents, + PubsubTopic, + ShardInfo +} from "@waku/interfaces"; +import { shardInfoToPubsubTopics } from "@waku/utils"; import { getPeersForProtocol, selectPeerForProtocol } from "@waku/utils/libp2p"; +import { DefaultPubsubTopic } from "./constants.js"; import { filterPeers } from "./filterPeers.js"; import { StreamManager } from "./stream_manager.js"; @@ -89,4 +96,10 @@ export class BaseProtocol implements IBaseProtocol { // Filter the peers based on the specified criteria return filterPeers(allPeersForProtocol, numPeers, maxBootstrapPeers); } + + initializePubsubTopic(shardInfo?: ShardInfo): PubsubTopic[] { + return shardInfo + ? shardInfoToPubsubTopics(shardInfo) + : [DefaultPubsubTopic]; + } } diff --git a/packages/core/src/lib/filter/index.ts b/packages/core/src/lib/filter/index.ts index d871238f84..93c365dcd8 100644 --- a/packages/core/src/lib/filter/index.ts +++ b/packages/core/src/lib/filter/index.ts @@ -14,12 +14,14 @@ import type { PeerIdStr, ProtocolCreateOptions, PubsubTopic, + SingleShardInfo, Unsubscribe } from "@waku/interfaces"; import { WakuMessage } from "@waku/proto"; import { ensurePubsubTopicIsConfigured, groupByContentTopic, + singleShardInfoToPubsubTopic, toAsyncIterator } from "@waku/utils"; import { Logger } from "@waku/utils"; @@ -279,7 +281,7 @@ class Filter extends BaseProtocol implements IReceiver { constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) { super(FilterCodecs.SUBSCRIBE, libp2p.components); - this.pubsubTopics = options?.pubsubTopics || [DefaultPubsubTopic]; + this.pubsubTopics = this.initializePubsubTopic(options?.shardInfo); libp2p.handle(FilterCodecs.PUSH, this.onRequest.bind(this)).catch((e) => { log.error("Failed to register ", FilterCodecs.PUSH, e); @@ -289,8 +291,12 @@ class Filter extends BaseProtocol implements IReceiver { } async createSubscription( - pubsubTopic: string = DefaultPubsubTopic + pubsubTopicShardInfo?: SingleShardInfo ): Promise { + const pubsubTopic = pubsubTopicShardInfo + ? singleShardInfoToPubsubTopic(pubsubTopicShardInfo) + : DefaultPubsubTopic; + ensurePubsubTopicIsConfigured(pubsubTopic, this.pubsubTopics); //TODO: get a relevant peer for the topic/shard diff --git a/packages/core/src/lib/keep_alive_manager.ts b/packages/core/src/lib/keep_alive_manager.ts index df63b891a5..b63c420fb2 100644 --- a/packages/core/src/lib/keep_alive_manager.ts +++ b/packages/core/src/lib/keep_alive_manager.ts @@ -2,7 +2,7 @@ import type { PeerId } from "@libp2p/interface/peer-id"; import type { PeerStore } from "@libp2p/interface/peer-store"; import type { IRelay, PeerIdStr } from "@waku/interfaces"; import type { KeepAliveOptions } from "@waku/interfaces"; -import { Logger } from "@waku/utils"; +import { Logger, pubsubTopicToSingleShardInfo } from "@waku/utils"; import { utf8ToBytes } from "@waku/utils/bytes"; import type { PingService } from "libp2p/ping"; @@ -129,7 +129,7 @@ export class KeepAliveManager { if (!meshPeers.includes(peerIdStr)) continue; const encoder = createEncoder({ - pubsubTopic: topic, + pubsubTopicShardInfo: pubsubTopicToSingleShardInfo(topic), contentTopic: RelayPingContentTopic, ephemeral: true }); diff --git a/packages/core/src/lib/light_push/index.ts b/packages/core/src/lib/light_push/index.ts index 7ba0020bf9..9d052b5b02 100644 --- a/packages/core/src/lib/light_push/index.ts +++ b/packages/core/src/lib/light_push/index.ts @@ -22,7 +22,6 @@ import { pipe } from "it-pipe"; import { Uint8ArrayList } from "uint8arraylist"; import { BaseProtocol } from "../base_protocol.js"; -import { DefaultPubsubTopic } from "../constants.js"; import { PushRpc } from "./push_rpc.js"; @@ -50,7 +49,7 @@ class LightPush extends BaseProtocol implements ILightPush { constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) { super(LightPushCodec, libp2p.components); - this.pubsubTopics = options?.pubsubTopics ?? [DefaultPubsubTopic]; + this.pubsubTopics = this.initializePubsubTopic(options?.shardInfo); } private async preparePushMessage( diff --git a/packages/core/src/lib/message/version_0.ts b/packages/core/src/lib/message/version_0.ts index 8a43007b57..a67b16a0dc 100644 --- a/packages/core/src/lib/message/version_0.ts +++ b/packages/core/src/lib/message/version_0.ts @@ -7,10 +7,11 @@ import type { IMetaSetter, IProtoMessage, IRateLimitProof, - PubsubTopic + PubsubTopic, + SingleShardInfo } from "@waku/interfaces"; import { proto_message as proto } from "@waku/proto"; -import { Logger } from "@waku/utils"; +import { Logger, singleShardInfoToPubsubTopic } from "@waku/utils"; import { DefaultPubsubTopic } from "../constants.js"; @@ -119,12 +120,19 @@ export class Encoder implements IEncoder { * messages. */ export function createEncoder({ - pubsubTopic = DefaultPubsubTopic, + pubsubTopicShardInfo, contentTopic, ephemeral, metaSetter }: EncoderOptions): Encoder { - return new Encoder(contentTopic, ephemeral, pubsubTopic, metaSetter); + return new Encoder( + contentTopic, + ephemeral, + pubsubTopicShardInfo?.index + ? singleShardInfoToPubsubTopic(pubsubTopicShardInfo) + : DefaultPubsubTopic, + metaSetter + ); } export class Decoder implements IDecoder { @@ -182,7 +190,12 @@ export class Decoder implements IDecoder { */ export function createDecoder( contentTopic: string, - pubsubTopic: PubsubTopic = DefaultPubsubTopic + pubsubTopicShardInfo?: SingleShardInfo ): Decoder { - return new Decoder(pubsubTopic, contentTopic); + return new Decoder( + pubsubTopicShardInfo?.index + ? singleShardInfoToPubsubTopic(pubsubTopicShardInfo) + : DefaultPubsubTopic, + contentTopic + ); } diff --git a/packages/core/src/lib/store/index.ts b/packages/core/src/lib/store/index.ts index 5647e68d72..53fb0b0d19 100644 --- a/packages/core/src/lib/store/index.ts +++ b/packages/core/src/lib/store/index.ts @@ -19,7 +19,6 @@ import { pipe } from "it-pipe"; import { Uint8ArrayList } from "uint8arraylist"; import { BaseProtocol } from "../base_protocol.js"; -import { DefaultPubsubTopic } from "../constants.js"; import { toProtoMessage } from "../to_proto_message.js"; import { HistoryRpc, PageDirection, Params } from "./history_rpc.js"; @@ -80,7 +79,7 @@ class Store extends BaseProtocol implements IStore { constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) { super(StoreCodec, libp2p.components); - this.pubsubTopics = options?.pubsubTopics ?? [DefaultPubsubTopic]; + this.pubsubTopics = this.initializePubsubTopic(options?.shardInfo); } /** diff --git a/packages/core/src/lib/waku.ts b/packages/core/src/lib/waku.ts index 0affa301cd..4d6b7fa038 100644 --- a/packages/core/src/lib/waku.ts +++ b/packages/core/src/lib/waku.ts @@ -8,12 +8,14 @@ import type { IStore, Libp2p, PubsubTopic, + ShardInfo, Waku } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces"; -import { Logger } from "@waku/utils"; +import { Logger, shardInfoToPubsubTopics } from "@waku/utils"; import { ConnectionManager } from "./connection_manager.js"; +import { DefaultPubsubTopic } from "./constants.js"; export const DefaultPingKeepAliveValueSecs = 5 * 60; export const DefaultRelayKeepAliveValueSecs = 5 * 60; @@ -50,16 +52,23 @@ export class WakuNode implements Waku { public filter?: IFilter; public lightPush?: ILightPush; public connectionManager: ConnectionManager; + public readonly pubsubTopics: PubsubTopic[]; constructor( options: WakuOptions, - public readonly pubsubTopics: PubsubTopic[], libp2p: Libp2p, + pubsubShardInfo?: ShardInfo, store?: (libp2p: Libp2p) => IStore, lightPush?: (libp2p: Libp2p) => ILightPush, filter?: (libp2p: Libp2p) => IFilter, relay?: (libp2p: Libp2p) => IRelay ) { + if (!pubsubShardInfo) { + this.pubsubTopics = [DefaultPubsubTopic]; + } else { + this.pubsubTopics = shardInfoToPubsubTopics(pubsubShardInfo); + } + this.libp2p = libp2p; if (store) { @@ -88,7 +97,7 @@ export class WakuNode implements Waku { peerId, libp2p, { pingKeepAlive, relayKeepAlive }, - pubsubTopics, + this.pubsubTopics, this.relay ); diff --git a/packages/interfaces/src/filter.ts b/packages/interfaces/src/filter.ts index 34dad0f87c..a5d6798858 100644 --- a/packages/interfaces/src/filter.ts +++ b/packages/interfaces/src/filter.ts @@ -1,6 +1,6 @@ import type { PeerId } from "@libp2p/interface/peer-id"; -import type { IDecodedMessage, IDecoder } from "./message.js"; +import type { IDecodedMessage, IDecoder, SingleShardInfo } from "./message.js"; import type { ContentTopic } from "./misc.js"; import type { Callback, IBaseProtocol } from "./protocols.js"; import type { IReceiver } from "./receiver.js"; @@ -25,7 +25,7 @@ export interface IFilterSubscription { export type IFilter = IReceiver & IBaseProtocol & { createSubscription( - pubsubTopic?: string, + pubsubTopicShardInfo?: SingleShardInfo, peerId?: PeerId ): Promise; }; diff --git a/packages/interfaces/src/message.ts b/packages/interfaces/src/message.ts index 060a80fe8c..137225cd58 100644 --- a/packages/interfaces/src/message.ts +++ b/packages/interfaces/src/message.ts @@ -1,5 +1,10 @@ import type { PubsubTopic } from "./misc.js"; +export interface SingleShardInfo { + cluster: number; + index: number; +} + export interface IRateLimitProof { proof: Uint8Array; merkleRoot: Uint8Array; @@ -38,7 +43,7 @@ export interface IMetaSetter { } export interface EncoderOptions { - pubsubTopic?: PubsubTopic; + pubsubTopicShardInfo?: SingleShardInfo; /** The content topic to set on outgoing messages. */ contentTopic: string; /** diff --git a/packages/interfaces/src/protocols.ts b/packages/interfaces/src/protocols.ts index dd81c484e2..5a9d733274 100644 --- a/packages/interfaces/src/protocols.ts +++ b/packages/interfaces/src/protocols.ts @@ -2,9 +2,9 @@ import type { Libp2p } from "@libp2p/interface"; import type { PeerId } from "@libp2p/interface/peer-id"; import type { Peer, PeerStore } from "@libp2p/interface/peer-store"; +import type { ShardInfo } from "./enr.js"; import type { CreateLibp2pOptions } from "./libp2p.js"; import type { IDecodedMessage } from "./message.js"; -import type { PubsubTopic } from "./misc.js"; export enum Protocols { Relay = "relay", @@ -23,9 +23,9 @@ export interface IBaseProtocol { export type ProtocolCreateOptions = { /** - * Waku supports usage of multiple pubsub topics, but this is still in early stages. - * Waku implements sharding to achieve scalability - * The format of the sharded topic is `/waku/2/rs//` + * Waku supports usage of multiple pubsub topics. This is achieved through static sharding for now, and auto-sharding in the future. + * The format to specify a shard is: + * clusterId: number, shards: number[] * To learn more about the sharding specifications implemented, see [Relay Sharding](https://rfc.vac.dev/spec/51/). * The Pubsub Topic to use. Defaults to {@link @waku/core!DefaultPubsubTopic }. * @@ -39,7 +39,7 @@ export type ProtocolCreateOptions = { * See [Waku v2 Topic Usage Recommendations](https://rfc.vac.dev/spec/23/) for details. * */ - pubsubTopics?: PubsubTopic[]; + shardInfo?: ShardInfo; /** * You can pass options to the `Libp2p` instance used by {@link @waku/core!WakuNode} using the `libp2p` property. * This property is the same type as the one passed to [`Libp2p.create`](https://github.com/libp2p/js-libp2p/blob/master/doc/API.md#create) diff --git a/packages/message-encryption/src/ecies.ts b/packages/message-encryption/src/ecies.ts index d9c56fea0b..af393cef14 100644 --- a/packages/message-encryption/src/ecies.ts +++ b/packages/message-encryption/src/ecies.ts @@ -7,10 +7,11 @@ import type { IMessage, IMetaSetter, IProtoMessage, - PubsubTopic + PubsubTopic, + SingleShardInfo } from "@waku/interfaces"; import { WakuMessage } from "@waku/proto"; -import { Logger } from "@waku/utils"; +import { Logger, singleShardInfoToPubsubTopic } from "@waku/utils"; import { generatePrivateKey } from "./crypto/utils.js"; import { DecodedMessage } from "./decoded_message.js"; @@ -98,7 +99,7 @@ export interface EncoderOptions extends BaseEncoderOptions { * in [26/WAKU2-PAYLOAD](https://rfc.vac.dev/spec/26/). */ export function createEncoder({ - pubsubTopic = DefaultPubsubTopic, + pubsubTopicShardInfo, contentTopic, publicKey, sigPrivKey, @@ -106,7 +107,9 @@ export function createEncoder({ metaSetter }: EncoderOptions): Encoder { return new Encoder( - pubsubTopic, + pubsubTopicShardInfo?.index + ? singleShardInfoToPubsubTopic(pubsubTopicShardInfo) + : DefaultPubsubTopic, contentTopic, publicKey, sigPrivKey, @@ -194,7 +197,13 @@ class Decoder extends DecoderV0 implements IDecoder { export function createDecoder( contentTopic: string, privateKey: Uint8Array, - pubsubTopic: PubsubTopic = DefaultPubsubTopic + pubsubTopicShardInfo?: SingleShardInfo ): Decoder { - return new Decoder(pubsubTopic, contentTopic, privateKey); + return new Decoder( + pubsubTopicShardInfo?.index + ? singleShardInfoToPubsubTopic(pubsubTopicShardInfo) + : DefaultPubsubTopic, + contentTopic, + privateKey + ); } diff --git a/packages/message-encryption/src/symmetric.ts b/packages/message-encryption/src/symmetric.ts index 2d2ccabbab..37f8a52cb2 100644 --- a/packages/message-encryption/src/symmetric.ts +++ b/packages/message-encryption/src/symmetric.ts @@ -7,10 +7,11 @@ import type { IMessage, IMetaSetter, IProtoMessage, - PubsubTopic + PubsubTopic, + SingleShardInfo } from "@waku/interfaces"; import { WakuMessage } from "@waku/proto"; -import { Logger } from "@waku/utils"; +import { Logger, singleShardInfoToPubsubTopic } from "@waku/utils"; import { generateSymmetricKey } from "./crypto/utils.js"; import { DecodedMessage } from "./decoded_message.js"; @@ -98,7 +99,7 @@ export interface EncoderOptions extends BaseEncoderOptions { * in [26/WAKU2-PAYLOAD](https://rfc.vac.dev/spec/26/). */ export function createEncoder({ - pubsubTopic = DefaultPubsubTopic, + pubsubTopicShardInfo, contentTopic, symKey, sigPrivKey, @@ -106,7 +107,9 @@ export function createEncoder({ metaSetter }: EncoderOptions): Encoder { return new Encoder( - pubsubTopic, + pubsubTopicShardInfo?.index + ? singleShardInfoToPubsubTopic(pubsubTopicShardInfo) + : DefaultPubsubTopic, contentTopic, symKey, sigPrivKey, @@ -194,7 +197,13 @@ class Decoder extends DecoderV0 implements IDecoder { export function createDecoder( contentTopic: string, symKey: Uint8Array, - pubsubTopic: PubsubTopic = DefaultPubsubTopic + pubsubTopicShardInfo?: SingleShardInfo ): Decoder { - return new Decoder(pubsubTopic, contentTopic, symKey); + return new Decoder( + pubsubTopicShardInfo?.index + ? singleShardInfoToPubsubTopic(pubsubTopicShardInfo) + : DefaultPubsubTopic, + contentTopic, + symKey + ); } diff --git a/packages/relay/src/index.ts b/packages/relay/src/index.ts index 82f4fedbe7..8e7cce499e 100644 --- a/packages/relay/src/index.ts +++ b/packages/relay/src/index.ts @@ -25,7 +25,11 @@ import { SendError, SendResult } from "@waku/interfaces"; -import { isWireSizeUnderCap, toAsyncIterator } from "@waku/utils"; +import { + isWireSizeUnderCap, + shardInfoToPubsubTopics, + toAsyncIterator +} from "@waku/utils"; import { pushOrInitMapSet } from "@waku/utils"; import { Logger } from "@waku/utils"; @@ -68,7 +72,9 @@ class Relay implements IRelay { } this.gossipSub = libp2p.services.pubsub as GossipSub; - this.pubsubTopics = new Set(options?.pubsubTopics ?? [DefaultPubsubTopic]); + this.pubsubTopics = options?.shardInfo + ? new Set(shardInfoToPubsubTopics(options.shardInfo)) + : new Set([DefaultPubsubTopic]); if (this.gossipSub.isStarted()) { this.subscribeToAllTopics(); diff --git a/packages/sdk/src/create.ts b/packages/sdk/src/create.ts index 67a2ca81b0..c3ed95ead6 100644 --- a/packages/sdk/src/create.ts +++ b/packages/sdk/src/create.ts @@ -5,7 +5,6 @@ import { mplex } from "@libp2p/mplex"; import { webSockets } from "@libp2p/websockets"; import { all as filterAll } from "@libp2p/websockets/filters"; import { - DefaultPubsubTopic, DefaultUserAgent, wakuFilter, wakuLightPush, @@ -47,10 +46,6 @@ export async function createLightNode( ): Promise { options = options ?? {}; - if (!options.pubsubTopics) { - options.pubsubTopics = [DefaultPubsubTopic]; - } - const libp2pOptions = options?.libp2p ?? {}; const peerDiscovery = libp2pOptions.peerDiscovery ?? []; if (options?.defaultBootstrap) { @@ -70,8 +65,8 @@ export async function createLightNode( return new WakuNode( options ?? {}, - options.pubsubTopics, libp2p, + options.shardInfo, store, lightPush, filter @@ -87,10 +82,6 @@ export async function createRelayNode( ): Promise { options = options ?? {}; - if (!options.pubsubTopics) { - options.pubsubTopics = [DefaultPubsubTopic]; - } - const libp2pOptions = options?.libp2p ?? {}; const peerDiscovery = libp2pOptions.peerDiscovery ?? []; if (options?.defaultBootstrap) { @@ -108,8 +99,8 @@ export async function createRelayNode( return new WakuNode( options, - options.pubsubTopics, libp2p, + options.shardInfo, undefined, undefined, undefined, @@ -135,10 +126,6 @@ export async function createFullNode( ): Promise { options = options ?? {}; - if (!options.pubsubTopics) { - options.pubsubTopics = [DefaultPubsubTopic]; - } - const libp2pOptions = options?.libp2p ?? {}; const peerDiscovery = libp2pOptions.peerDiscovery ?? []; if (options?.defaultBootstrap) { @@ -159,8 +146,8 @@ export async function createFullNode( return new WakuNode( options ?? {}, - options.pubsubTopics, libp2p, + options.shardInfo, store, lightPush, filter, diff --git a/packages/tests/src/node/dockerode.ts b/packages/tests/src/node/dockerode.ts index 6339d66e85..8942d98654 100644 --- a/packages/tests/src/node/dockerode.ts +++ b/packages/tests/src/node/dockerode.ts @@ -208,8 +208,16 @@ export function argsToArray(args: Args): Array { return "-" + capital.toLowerCase(); }); - const arg = `--${kebabKey}=${value}`; - array.push(arg); + if (Array.isArray(value)) { + // If the value is an array, create separate arguments for each element + value.forEach((val) => { + array.push(`--${kebabKey}=${val}`); + }); + } else { + // Handle non-array values as before + const arg = `--${kebabKey}=${value}`; + array.push(arg); + } } return array; diff --git a/packages/tests/src/node/interfaces.ts b/packages/tests/src/node/interfaces.ts index 253158b410..3a19a85134 100644 --- a/packages/tests/src/node/interfaces.ts +++ b/packages/tests/src/node/interfaces.ts @@ -14,7 +14,7 @@ export interface Args { peerExchange?: boolean; discv5Discovery?: boolean; storeMessageDbUrl?: string; - topic?: Array; + pubsubTopic?: Array; rpcPrivate?: boolean; websocketSupport?: boolean; tcpPort?: number; diff --git a/packages/tests/tests/filter/multiple_pubsub.node.spec.ts b/packages/tests/tests/filter/multiple_pubsub.node.spec.ts index 429177cc92..a8401fadeb 100644 --- a/packages/tests/tests/filter/multiple_pubsub.node.spec.ts +++ b/packages/tests/tests/filter/multiple_pubsub.node.spec.ts @@ -1,11 +1,15 @@ -import { - createDecoder, - createEncoder, - DefaultPubsubTopic, - waitForRemotePeer -} from "@waku/core"; -import type { IFilterSubscription, LightNode } from "@waku/interfaces"; +import { createDecoder, createEncoder, waitForRemotePeer } from "@waku/core"; +import type { + IFilterSubscription, + LightNode, + ShardInfo, + SingleShardInfo +} from "@waku/interfaces"; import { Protocols } from "@waku/interfaces"; +import { + pubsubTopicToSingleShardInfo, + singleShardInfoToPubsubTopic +} from "@waku/utils"; import { utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; @@ -16,12 +20,7 @@ import { tearDownNodes } from "../../src/index.js"; -import { - runNodes, - TestContentTopic, - TestDecoder, - TestEncoder -} from "./utils.js"; +import { runNodes } from "./utils.js"; describe("Waku Filter V2: Multiple PubsubTopics", function () { // Set the timeout for all tests in this suite. Can be overwritten at test level @@ -31,21 +30,41 @@ describe("Waku Filter V2: Multiple PubsubTopics", function () { let nwaku2: NimGoNode; let subscription: IFilterSubscription; let messageCollector: MessageCollector; - const customPubsubTopic = "/waku/2/custom-dapp/proto"; - const customContentTopic = "/test/2/waku-filter"; - const newEncoder = createEncoder({ - pubsubTopic: customPubsubTopic, - contentTopic: customContentTopic + + const customPubsubTopic1 = singleShardInfoToPubsubTopic({ + cluster: 3, + index: 1 }); - const newDecoder = createDecoder(customContentTopic, customPubsubTopic); + const customPubsubTopic2 = singleShardInfoToPubsubTopic({ + cluster: 3, + index: 2 + }); + const shardInfo: ShardInfo = { cluster: 3, indexList: [1, 2] }; + const singleShardInfo1: SingleShardInfo = { cluster: 3, index: 1 }; + const singleShardInfo2: SingleShardInfo = { cluster: 3, index: 2 }; + const customContentTopic1 = "/test/2/waku-filter"; + const customContentTopic2 = "/test/3/waku-filter"; + const customEncoder1 = createEncoder({ + pubsubTopicShardInfo: singleShardInfo1, + contentTopic: customContentTopic1 + }); + const customDecoder1 = createDecoder(customContentTopic1, singleShardInfo1); + const customEncoder2 = createEncoder({ + pubsubTopicShardInfo: singleShardInfo2, + contentTopic: customContentTopic2 + }); + const customDecoder2 = createDecoder(customContentTopic2, singleShardInfo2); this.beforeEach(async function () { this.timeout(15000); - [nwaku, waku] = await runNodes(this, [ - customPubsubTopic, - DefaultPubsubTopic - ]); - subscription = await waku.filter.createSubscription(customPubsubTopic); + [nwaku, waku] = await runNodes( + this, + [customPubsubTopic1, customPubsubTopic2], + shardInfo + ); + subscription = await waku.filter.createSubscription( + pubsubTopicToSingleShardInfo(customPubsubTopic1) + ); messageCollector = new MessageCollector(); }); @@ -55,94 +74,95 @@ describe("Waku Filter V2: Multiple PubsubTopics", function () { }); it("Subscribe and receive messages on custom pubsubtopic", async function () { - await subscription.subscribe([newDecoder], messageCollector.callback); - await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M1") }); + await subscription.subscribe([customDecoder1], messageCollector.callback); + await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") }); expect(await messageCollector.waitForMessages(1)).to.eq(true); messageCollector.verifyReceivedMessage(0, { - expectedContentTopic: customContentTopic, - expectedPubsubTopic: customPubsubTopic, + expectedContentTopic: customContentTopic1, + expectedPubsubTopic: customPubsubTopic1, expectedMessageText: "M1" }); }); it("Subscribe and receive messages on 2 different pubsubtopics", async function () { - await subscription.subscribe([newDecoder], messageCollector.callback); + await subscription.subscribe([customDecoder1], messageCollector.callback); // Subscribe from the same lightnode to the 2nd pubsubtopic - const subscription2 = - await waku.filter.createSubscription(DefaultPubsubTopic); + const subscription2 = await waku.filter.createSubscription( + pubsubTopicToSingleShardInfo(customPubsubTopic2) + ); const messageCollector2 = new MessageCollector(); - await subscription2.subscribe([TestDecoder], messageCollector2.callback); + await subscription2.subscribe([customDecoder2], messageCollector2.callback); - await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M1") }); - await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") }); + await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") }); + await waku.lightPush.send(customEncoder2, { payload: utf8ToBytes("M2") }); expect(await messageCollector.waitForMessages(1)).to.eq(true); expect(await messageCollector2.waitForMessages(1)).to.eq(true); messageCollector.verifyReceivedMessage(0, { - expectedContentTopic: customContentTopic, - expectedPubsubTopic: customPubsubTopic, + expectedContentTopic: customContentTopic1, + expectedPubsubTopic: customPubsubTopic1, expectedMessageText: "M1" }); messageCollector2.verifyReceivedMessage(0, { - expectedContentTopic: TestContentTopic, - expectedPubsubTopic: DefaultPubsubTopic, + expectedContentTopic: customContentTopic2, + expectedPubsubTopic: customPubsubTopic2, expectedMessageText: "M2" }); }); it("Subscribe and receive messages from 2 nwaku nodes each with different pubsubtopics", async function () { - await subscription.subscribe([newDecoder], messageCollector.callback); + await subscription.subscribe([customDecoder1], messageCollector.callback); - // Set up and start a new nwaku node with Default Pubsubtopic + // Set up and start a new nwaku node with customPubsubTopic1 nwaku2 = new NimGoNode(makeLogFileName(this) + "2"); await nwaku2.start({ filter: true, lightpush: true, relay: true, - topic: [DefaultPubsubTopic] + pubsubTopic: [customPubsubTopic2] }); await waku.dial(await nwaku2.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]); // Subscribe from the same lightnode to the new nwaku on the new pubsubtopic const subscription2 = await waku.filter.createSubscription( - DefaultPubsubTopic, + pubsubTopicToSingleShardInfo(customPubsubTopic2), await nwaku2.getPeerId() ); - await nwaku2.ensureSubscriptions([DefaultPubsubTopic]); + await nwaku2.ensureSubscriptions([customPubsubTopic2]); const messageCollector2 = new MessageCollector(); - await subscription2.subscribe([TestDecoder], messageCollector2.callback); + await subscription2.subscribe([customDecoder2], messageCollector2.callback); // Making sure that messages are send and reveiced for both subscriptions // While loop is done because of https://github.com/waku-org/js-waku/issues/1606 while ( !(await messageCollector.waitForMessages(1, { - pubsubTopic: customPubsubTopic + pubsubTopic: customPubsubTopic1 })) || !(await messageCollector2.waitForMessages(1, { - pubsubTopic: DefaultPubsubTopic + pubsubTopic: customPubsubTopic2 })) ) { - await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M1") }); - await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") }); + await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") }); + await waku.lightPush.send(customEncoder2, { payload: utf8ToBytes("M2") }); } messageCollector.verifyReceivedMessage(0, { - expectedContentTopic: customContentTopic, - expectedPubsubTopic: customPubsubTopic, + expectedContentTopic: customContentTopic1, + expectedPubsubTopic: customPubsubTopic1, expectedMessageText: "M1" }); messageCollector2.verifyReceivedMessage(0, { - expectedContentTopic: TestContentTopic, - expectedPubsubTopic: DefaultPubsubTopic, + expectedContentTopic: customContentTopic2, + expectedPubsubTopic: customPubsubTopic2, expectedMessageText: "M2" }); }); @@ -150,7 +170,7 @@ describe("Waku Filter V2: Multiple PubsubTopics", function () { it("Should fail to subscribe with decoder with wrong pubsubTopic", async function () { // this subscription object is set up with the `customPubsubTopic` but we're passing it a Decoder with the `DefaultPubsubTopic` try { - await subscription.subscribe([TestDecoder], messageCollector.callback); + await subscription.subscribe([customDecoder2], messageCollector.callback); } catch (error) { expect((error as Error).message).to.include( "Pubsub topic not configured" diff --git a/packages/tests/tests/filter/subscribe.node.spec.ts b/packages/tests/tests/filter/subscribe.node.spec.ts index a191264347..68b89e3d72 100644 --- a/packages/tests/tests/filter/subscribe.node.spec.ts +++ b/packages/tests/tests/filter/subscribe.node.spec.ts @@ -383,7 +383,7 @@ describe("Waku Filter V2: Subscribe", function () { await waku.dial(await nwaku2.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]); const subscription2 = await waku.filter.createSubscription( - DefaultPubsubTopic, + undefined, await nwaku2.getPeerId() ); await nwaku2.ensureSubscriptions([DefaultPubsubTopic]); diff --git a/packages/tests/tests/filter/utils.ts b/packages/tests/tests/filter/utils.ts index 20f56e699f..d9cb4f0135 100644 --- a/packages/tests/tests/filter/utils.ts +++ b/packages/tests/tests/filter/utils.ts @@ -1,5 +1,15 @@ -import { createDecoder, createEncoder, waitForRemotePeer } from "@waku/core"; -import { IFilterSubscription, LightNode, Protocols } from "@waku/interfaces"; +import { + createDecoder, + createEncoder, + DefaultPubsubTopic, + waitForRemotePeer +} from "@waku/core"; +import { + IFilterSubscription, + LightNode, + Protocols, + ShardInfo +} from "@waku/interfaces"; import { createLightNode } from "@waku/sdk"; import { Logger } from "@waku/utils"; import { utf8ToBytes } from "@waku/utils/bytes"; @@ -38,7 +48,9 @@ export async function validatePingError( export async function runNodes( context: Context, - pubsubTopics: string[] + //TODO: change this to use `ShardInfo` instead of `string[]` + pubsubTopics: string[], + shardInfo?: ShardInfo ): Promise<[NimGoNode, LightNode]> { const nwaku = new NimGoNode(makeLogFileName(context)); @@ -47,18 +59,24 @@ export async function runNodes( filter: true, lightpush: true, relay: true, - topic: pubsubTopics + pubsubTopic: pubsubTopics }, { retries: 3 } ); + const waku_options = { + staticNoiseKey: NOISE_KEY_1, + libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }, + ...((pubsubTopics.length !== 1 || + pubsubTopics[0] !== DefaultPubsubTopic) && { + shardInfo: shardInfo + }) + }; + + log.info("Starting js waku node with :", JSON.stringify(waku_options)); let waku: LightNode | undefined; try { - waku = await createLightNode({ - pubsubTopics: pubsubTopics, - staticNoiseKey: NOISE_KEY_1, - libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } } - }); + waku = await createLightNode(waku_options); await waku.start(); } catch (error) { log.error("jswaku node failed to start:", error); diff --git a/packages/tests/tests/light-push/multiple_pubsub.node.spec.ts b/packages/tests/tests/light-push/multiple_pubsub.node.spec.ts index 6ab663e6ff..5017239c4a 100644 --- a/packages/tests/tests/light-push/multiple_pubsub.node.spec.ts +++ b/packages/tests/tests/light-push/multiple_pubsub.node.spec.ts @@ -1,10 +1,13 @@ import type { PeerId } from "@libp2p/interface/peer-id"; +import { createEncoder, waitForRemotePeer } from "@waku/core"; import { - createEncoder, - DefaultPubsubTopic, - waitForRemotePeer -} from "@waku/core"; -import { LightNode, Protocols, SendResult } from "@waku/interfaces"; + LightNode, + Protocols, + SendResult, + ShardInfo, + SingleShardInfo +} from "@waku/interfaces"; +import { singleShardInfoToPubsubTopic } from "@waku/utils"; import { utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; @@ -15,12 +18,7 @@ import { tearDownNodes } from "../../src/index.js"; -import { - messageText, - runNodes, - TestContentTopic, - TestEncoder -} from "./utils.js"; +import { messageText, runNodes } from "./utils.js"; describe("Waku Light Push : Multiple PubsubTopics", function () { this.timeout(30000); @@ -28,20 +26,37 @@ describe("Waku Light Push : Multiple PubsubTopics", function () { let nwaku: NimGoNode; let nwaku2: NimGoNode; let messageCollector: MessageCollector; - const customPubsubTopic = "/waku/2/custom-dapp/proto"; - const customContentTopic = "/test/2/waku-light-push/utf8"; - const customEncoder = createEncoder({ - contentTopic: customContentTopic, - pubsubTopic: customPubsubTopic + const customPubsubTopic1 = singleShardInfoToPubsubTopic({ + cluster: 3, + index: 1 }); + const customPubsubTopic2 = singleShardInfoToPubsubTopic({ + cluster: 3, + index: 2 + }); + const shardInfo: ShardInfo = { cluster: 3, indexList: [1, 2] }; + const singleShardInfo1: SingleShardInfo = { cluster: 3, index: 1 }; + const singleShardInfo2: SingleShardInfo = { cluster: 3, index: 2 }; + const customContentTopic1 = "/test/2/waku-light-push/utf8"; + const customContentTopic2 = "/test/3/waku-light-push/utf8"; + const customEncoder1 = createEncoder({ + pubsubTopicShardInfo: singleShardInfo1, + contentTopic: customContentTopic1 + }); + const customEncoder2 = createEncoder({ + pubsubTopicShardInfo: singleShardInfo2, + contentTopic: customContentTopic2 + }); + let nimPeerId: PeerId; this.beforeEach(async function () { this.timeout(15000); - [nwaku, waku] = await runNodes(this, [ - customPubsubTopic, - DefaultPubsubTopic - ]); + [nwaku, waku] = await runNodes( + this, + [customPubsubTopic1, customPubsubTopic2], + shardInfo + ); messageCollector = new MessageCollector(nwaku); nimPeerId = await nwaku.getPeerId(); }); @@ -52,7 +67,7 @@ describe("Waku Light Push : Multiple PubsubTopics", function () { }); it("Push message on custom pubsubTopic", async function () { - const pushResponse = await waku.lightPush.send(customEncoder, { + const pushResponse = await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes(messageText) }); @@ -60,20 +75,20 @@ describe("Waku Light Push : Multiple PubsubTopics", function () { expect( await messageCollector.waitForMessages(1, { - pubsubTopic: customPubsubTopic + pubsubTopic: customPubsubTopic1 }) ).to.eq(true); messageCollector.verifyReceivedMessage(0, { expectedMessageText: messageText, - expectedContentTopic: customContentTopic + expectedContentTopic: customContentTopic1 }); }); it("Subscribe and receive messages on 2 different pubsubtopics", async function () { - const pushResponse1 = await waku.lightPush.send(customEncoder, { + const pushResponse1 = await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") }); - const pushResponse2 = await waku.lightPush.send(TestEncoder, { + const pushResponse2 = await waku.lightPush.send(customEncoder2, { payload: utf8ToBytes("M2") }); expect(pushResponse1.recipients[0].toString()).to.eq(nimPeerId.toString()); @@ -83,25 +98,25 @@ describe("Waku Light Push : Multiple PubsubTopics", function () { expect( await messageCollector.waitForMessages(1, { - pubsubTopic: customPubsubTopic + pubsubTopic: customPubsubTopic1 }) ).to.eq(true); expect( await messageCollector2.waitForMessages(1, { - pubsubTopic: DefaultPubsubTopic + pubsubTopic: customPubsubTopic2 }) ).to.eq(true); messageCollector.verifyReceivedMessage(0, { expectedMessageText: "M1", - expectedContentTopic: customContentTopic, - expectedPubsubTopic: customPubsubTopic + expectedContentTopic: customContentTopic1, + expectedPubsubTopic: customPubsubTopic1 }); messageCollector2.verifyReceivedMessage(0, { expectedMessageText: "M2", - expectedContentTopic: TestContentTopic, - expectedPubsubTopic: DefaultPubsubTopic + expectedContentTopic: customContentTopic2, + expectedPubsubTopic: customPubsubTopic2 }); }); @@ -112,9 +127,9 @@ describe("Waku Light Push : Multiple PubsubTopics", function () { filter: true, lightpush: true, relay: true, - topic: [DefaultPubsubTopic] + pubsubTopic: [customPubsubTopic2] }); - await nwaku2.ensureSubscriptions([DefaultPubsubTopic]); + await nwaku2.ensureSubscriptions([customPubsubTopic2]); await waku.dial(await nwaku2.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.LightPush]); @@ -126,31 +141,31 @@ describe("Waku Light Push : Multiple PubsubTopics", function () { // While loop is done because of https://github.com/waku-org/js-waku/issues/1606 while ( !(await messageCollector.waitForMessages(1, { - pubsubTopic: customPubsubTopic + pubsubTopic: customPubsubTopic1 })) || !(await messageCollector2.waitForMessages(1, { - pubsubTopic: DefaultPubsubTopic + pubsubTopic: customPubsubTopic2 })) || pushResponse1!.recipients[0].toString() === pushResponse2!.recipients[0].toString() ) { - pushResponse1 = await waku.lightPush.send(customEncoder, { + pushResponse1 = await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") }); - pushResponse2 = await waku.lightPush.send(TestEncoder, { + pushResponse2 = await waku.lightPush.send(customEncoder2, { payload: utf8ToBytes("M2") }); } messageCollector.verifyReceivedMessage(0, { expectedMessageText: "M1", - expectedContentTopic: customContentTopic, - expectedPubsubTopic: customPubsubTopic + expectedContentTopic: customContentTopic1, + expectedPubsubTopic: customPubsubTopic1 }); messageCollector2.verifyReceivedMessage(0, { expectedMessageText: "M2", - expectedContentTopic: TestContentTopic, - expectedPubsubTopic: DefaultPubsubTopic + expectedContentTopic: customContentTopic2, + expectedPubsubTopic: customPubsubTopic2 }); }); }); diff --git a/packages/tests/tests/light-push/utils.ts b/packages/tests/tests/light-push/utils.ts index 4d15e7e963..4f30082c09 100644 --- a/packages/tests/tests/light-push/utils.ts +++ b/packages/tests/tests/light-push/utils.ts @@ -1,5 +1,9 @@ -import { createEncoder, waitForRemotePeer } from "@waku/core"; -import { LightNode, Protocols } from "@waku/interfaces"; +import { + createEncoder, + DefaultPubsubTopic, + waitForRemotePeer +} from "@waku/core"; +import { LightNode, Protocols, ShardInfo } from "@waku/interfaces"; import { createLightNode, utf8ToBytes } from "@waku/sdk"; import { Logger } from "@waku/utils"; @@ -14,18 +18,22 @@ export const messagePayload = { payload: utf8ToBytes(messageText) }; export async function runNodes( context: Mocha.Context, - pubsubTopics: string[] + pubsubTopics: string[], + shardInfo?: ShardInfo ): Promise<[NimGoNode, LightNode]> { const nwaku = new NimGoNode(makeLogFileName(context)); await nwaku.start( - { lightpush: true, relay: true, topic: pubsubTopics }, + { lightpush: true, relay: true, pubsubTopic: pubsubTopics }, { retries: 3 } ); let waku: LightNode | undefined; try { waku = await createLightNode({ - pubsubTopics: pubsubTopics, + ...((pubsubTopics.length !== 1 || + pubsubTopics[0] !== DefaultPubsubTopic) && { + shardInfo: shardInfo + }), staticNoiseKey: NOISE_KEY_1 }); await waku.start(); diff --git a/packages/tests/tests/relay/multiple_pubsub.node.spec.ts b/packages/tests/tests/relay/multiple_pubsub.node.spec.ts index 3be71a6911..97bffe0cb9 100644 --- a/packages/tests/tests/relay/multiple_pubsub.node.spec.ts +++ b/packages/tests/tests/relay/multiple_pubsub.node.spec.ts @@ -1,11 +1,13 @@ import { + createDecoder, + createEncoder, DecodedMessage, - DefaultPubsubTopic, waitForRemotePeer } from "@waku/core"; -import { RelayNode } from "@waku/interfaces"; +import { RelayNode, ShardInfo, SingleShardInfo } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces"; import { createRelayNode } from "@waku/sdk"; +import { singleShardInfoToPubsubTopic } from "@waku/utils"; import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; @@ -16,16 +18,7 @@ import { NOISE_KEY_3, tearDownNodes } from "../../src/index.js"; - -import { - CustomContentTopic, - CustomDecoder, - CustomEncoder, - CustomPubsubTopic, - TestContentTopic, - TestDecoder, - TestEncoder -} from "./utils.js"; +import { TestDecoder } from "../filter/utils.js"; describe("Waku Relay, multiple pubsub topics", function () { this.timeout(15000); @@ -33,6 +26,38 @@ describe("Waku Relay, multiple pubsub topics", function () { let waku2: RelayNode; let waku3: RelayNode; + const customPubsubTopic1 = singleShardInfoToPubsubTopic({ + cluster: 3, + index: 1 + }); + const customPubsubTopic2 = singleShardInfoToPubsubTopic({ + cluster: 3, + index: 2 + }); + const shardInfo1: ShardInfo = { cluster: 3, indexList: [1] }; + const singleShardInfo1: SingleShardInfo = { + cluster: 3, + index: 1 + }; + const customContentTopic1 = "/test/2/waku-relay/utf8"; + const customContentTopic2 = "/test/3/waku-relay/utf8"; + const shardInfo2: ShardInfo = { cluster: 3, indexList: [2] }; + const singleShardInfo2: SingleShardInfo = { + cluster: 3, + index: 2 + }; + const customEncoder1 = createEncoder({ + pubsubTopicShardInfo: singleShardInfo1, + contentTopic: customContentTopic1 + }); + const customDecoder1 = createDecoder(customContentTopic1, singleShardInfo1); + const customEncoder2 = createEncoder({ + pubsubTopicShardInfo: singleShardInfo2, + contentTopic: customContentTopic2 + }); + const customDecoder2 = createDecoder(customContentTopic2, singleShardInfo2); + const shardInfoBothShards: ShardInfo = { cluster: 3, indexList: [1, 2] }; + afterEach(async function () { this.timeout(15000); await tearDownNodes([], [waku1, waku2, waku3]); @@ -40,14 +65,16 @@ describe("Waku Relay, multiple pubsub topics", function () { [ { - pubsub: CustomPubsubTopic, - encoder: CustomEncoder, - decoder: CustomDecoder + pubsub: customPubsubTopic1, + shardInfo: shardInfo1, + encoder: customEncoder1, + decoder: customDecoder1 }, { - pubsub: DefaultPubsubTopic, - encoder: TestEncoder, - decoder: TestDecoder + pubsub: customPubsubTopic2, + shardInfo: shardInfo2, + encoder: customEncoder2, + decoder: customDecoder2 } ].forEach((testItem) => { it(`3 nodes on ${testItem.pubsub} topic`, async function () { @@ -57,16 +84,16 @@ describe("Waku Relay, multiple pubsub topics", function () { [waku1, waku2, waku3] = await Promise.all([ createRelayNode({ - pubsubTopics: [testItem.pubsub], + shardInfo: testItem.shardInfo, staticNoiseKey: NOISE_KEY_1 }).then((waku) => waku.start().then(() => waku)), createRelayNode({ - pubsubTopics: [testItem.pubsub], + shardInfo: testItem.shardInfo, staticNoiseKey: NOISE_KEY_2, libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } } }).then((waku) => waku.start().then(() => waku)), createRelayNode({ - pubsubTopics: [testItem.pubsub], + shardInfo: testItem.shardInfo, staticNoiseKey: NOISE_KEY_3 }).then((waku) => waku.start().then(() => waku)) ]); @@ -155,16 +182,16 @@ describe("Waku Relay, multiple pubsub topics", function () { // Waku1 and waku2 are using multiple pubsub topis [waku1, waku2, waku3] = await Promise.all([ createRelayNode({ - pubsubTopics: [DefaultPubsubTopic, CustomPubsubTopic], + shardInfo: shardInfoBothShards, staticNoiseKey: NOISE_KEY_1 }).then((waku) => waku.start().then(() => waku)), createRelayNode({ - pubsubTopics: [DefaultPubsubTopic, CustomPubsubTopic], + shardInfo: shardInfoBothShards, staticNoiseKey: NOISE_KEY_2, libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } } }).then((waku) => waku.start().then(() => waku)), createRelayNode({ - pubsubTopics: [DefaultPubsubTopic], + shardInfo: shardInfo1, staticNoiseKey: NOISE_KEY_3 }).then((waku) => waku.start().then(() => waku)) ]); @@ -187,45 +214,45 @@ describe("Waku Relay, multiple pubsub topics", function () { ]); await waku1.relay.subscribe( - [TestDecoder, CustomDecoder], + [customDecoder1, customDecoder2], msgCollector1.callback ); await waku2.relay.subscribe( - [TestDecoder, CustomDecoder], + [customDecoder1, customDecoder2], msgCollector2.callback ); - await waku3.relay.subscribe([TestDecoder], msgCollector3.callback); + await waku3.relay.subscribe([customDecoder1], msgCollector3.callback); // The nodes are setup in such a way that all messages send should be relayed to the other nodes in the network - // However onlt waku1 and waku2 are receiving messages on the CustomPubsubTopic - await waku1.relay.send(TestEncoder, { payload: utf8ToBytes("M1") }); - await waku1.relay.send(CustomEncoder, { payload: utf8ToBytes("M2") }); - await waku2.relay.send(TestEncoder, { payload: utf8ToBytes("M3") }); - await waku2.relay.send(CustomEncoder, { payload: utf8ToBytes("M4") }); - await waku3.relay.send(TestEncoder, { payload: utf8ToBytes("M5") }); - await waku3.relay.send(CustomEncoder, { payload: utf8ToBytes("M6") }); + // However onlt waku1 and waku2 are receiving messages on the CustomPubSubTopic + await waku1.relay.send(customEncoder1, { payload: utf8ToBytes("M1") }); + await waku1.relay.send(customEncoder2, { payload: utf8ToBytes("M2") }); + await waku2.relay.send(customEncoder1, { payload: utf8ToBytes("M3") }); + await waku2.relay.send(customEncoder2, { payload: utf8ToBytes("M4") }); + await waku3.relay.send(customEncoder1, { payload: utf8ToBytes("M5") }); + await waku3.relay.send(customEncoder2, { payload: utf8ToBytes("M6") }); expect(await msgCollector1.waitForMessages(3, { exact: true })).to.eq(true); expect(await msgCollector2.waitForMessages(3, { exact: true })).to.eq(true); expect(await msgCollector3.waitForMessages(2, { exact: true })).to.eq(true); - expect(msgCollector1.hasMessage(TestContentTopic, "M3")).to.eq(true); - expect(msgCollector1.hasMessage(CustomContentTopic, "M4")).to.eq(true); - expect(msgCollector1.hasMessage(TestContentTopic, "M5")).to.eq(true); - expect(msgCollector2.hasMessage(TestContentTopic, "M1")).to.eq(true); - expect(msgCollector2.hasMessage(CustomContentTopic, "M2")).to.eq(true); - expect(msgCollector2.hasMessage(TestContentTopic, "M5")).to.eq(true); - expect(msgCollector3.hasMessage(TestContentTopic, "M1")).to.eq(true); - expect(msgCollector3.hasMessage(TestContentTopic, "M3")).to.eq(true); + expect(msgCollector1.hasMessage(customContentTopic1, "M3")).to.eq(true); + expect(msgCollector1.hasMessage(customContentTopic2, "M4")).to.eq(true); + expect(msgCollector1.hasMessage(customContentTopic1, "M5")).to.eq(true); + expect(msgCollector2.hasMessage(customContentTopic1, "M1")).to.eq(true); + expect(msgCollector2.hasMessage(customContentTopic2, "M2")).to.eq(true); + expect(msgCollector2.hasMessage(customContentTopic1, "M5")).to.eq(true); + expect(msgCollector3.hasMessage(customContentTopic1, "M1")).to.eq(true); + expect(msgCollector3.hasMessage(customContentTopic1, "M3")).to.eq(true); }); it("n1 and n2 uses a custom pubsub, n3 uses the default pubsub", async function () { [waku1, waku2, waku3] = await Promise.all([ createRelayNode({ - pubsubTopics: [CustomPubsubTopic], + shardInfo: shardInfo1, staticNoiseKey: NOISE_KEY_1 }).then((waku) => waku.start().then(() => waku)), createRelayNode({ - pubsubTopics: [CustomPubsubTopic], + shardInfo: shardInfo1, staticNoiseKey: NOISE_KEY_2, libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } } }).then((waku) => waku.start().then(() => waku)), @@ -254,7 +281,7 @@ describe("Waku Relay, multiple pubsub topics", function () { const waku2ReceivedMsgPromise: Promise = new Promise( (resolve) => { - void waku2.relay.subscribe([CustomDecoder], resolve); + void waku2.relay.subscribe([customDecoder1], resolve); } ); @@ -267,7 +294,7 @@ describe("Waku Relay, multiple pubsub topics", function () { } ); - await waku1.relay.send(CustomEncoder, { + await waku1.relay.send(customEncoder1, { payload: utf8ToBytes(messageText) }); @@ -275,6 +302,6 @@ describe("Waku Relay, multiple pubsub topics", function () { await waku3NoMsgPromise; expect(bytesToUtf8(waku2ReceivedMsg.payload!)).to.eq(messageText); - expect(waku2ReceivedMsg.pubsubTopic).to.eq(CustomPubsubTopic); + expect(waku2ReceivedMsg.pubsubTopic).to.eq(customPubsubTopic1); }); }); diff --git a/packages/tests/tests/relay/publish.node.spec.ts b/packages/tests/tests/relay/publish.node.spec.ts index fbafb0cdd3..5a243397f6 100644 --- a/packages/tests/tests/relay/publish.node.spec.ts +++ b/packages/tests/tests/relay/publish.node.spec.ts @@ -1,4 +1,4 @@ -import { createEncoder, DefaultPubsubTopic } from "@waku/core"; +import { createEncoder } from "@waku/core"; import { IRateLimitProof, RelayNode, SendError } from "@waku/interfaces"; import { createRelayNode } from "@waku/sdk"; import { utf8ToBytes } from "@waku/utils/bytes"; @@ -34,11 +34,9 @@ describe("Waku Relay, Publish", function () { log.info("Starting JS Waku instances"); [waku1, waku2] = await Promise.all([ createRelayNode({ - pubsubTopics: [DefaultPubsubTopic], staticNoiseKey: NOISE_KEY_1 }).then((waku) => waku.start().then(() => waku)), createRelayNode({ - pubsubTopics: [DefaultPubsubTopic], staticNoiseKey: NOISE_KEY_2, libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } } }).then((waku) => waku.start().then(() => waku)) @@ -130,7 +128,7 @@ describe("Waku Relay, Publish", function () { it("Fails to publish message with wrong pubsubtopic", async function () { const wrong_encoder = createEncoder({ - pubsubTopic: "wrong", + pubsubTopicShardInfo: { cluster: 3, index: 1 }, contentTopic: TestContentTopic }); const pushResponse = await waku1.relay.send(wrong_encoder, { diff --git a/packages/tests/tests/relay/subscribe.node.spec.ts b/packages/tests/tests/relay/subscribe.node.spec.ts index ac4d58927b..d2c5fd59b9 100644 --- a/packages/tests/tests/relay/subscribe.node.spec.ts +++ b/packages/tests/tests/relay/subscribe.node.spec.ts @@ -33,11 +33,9 @@ describe("Waku Relay, Subscribe", function () { log.info("Starting JS Waku instances"); [waku1, waku2] = await Promise.all([ createRelayNode({ - pubsubTopics: [DefaultPubsubTopic], staticNoiseKey: NOISE_KEY_1 }).then((waku) => waku.start().then(() => waku)), createRelayNode({ - pubsubTopics: [DefaultPubsubTopic], staticNoiseKey: NOISE_KEY_2, libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } } }).then((waku) => waku.start().then(() => waku)) diff --git a/packages/tests/tests/relay/utils.ts b/packages/tests/tests/relay/utils.ts index 8cc7819a00..e35110fd67 100644 --- a/packages/tests/tests/relay/utils.ts +++ b/packages/tests/tests/relay/utils.ts @@ -6,17 +6,6 @@ export const messageText = "Relay works!"; export const TestContentTopic = "/test/1/waku-relay/utf8"; export const TestEncoder = createEncoder({ contentTopic: TestContentTopic }); export const TestDecoder = createDecoder(TestContentTopic); -export const CustomContentTopic = "/test/2/waku-relay/utf8"; -export const CustomPubsubTopic = "/some/pubsub/topic"; -export const CustomEncoder = createEncoder({ - contentTopic: CustomContentTopic, - pubsubTopic: CustomPubsubTopic -}); -export const CustomDecoder = createDecoder( - CustomContentTopic, - CustomPubsubTopic -); - export const log = new Logger("test:relay"); export async function waitForAllRemotePeers( diff --git a/packages/tests/tests/sharding/peer_management.spec.ts b/packages/tests/tests/sharding/peer_management.spec.ts index ad2c268b99..13fe53d68d 100644 --- a/packages/tests/tests/sharding/peer_management.spec.ts +++ b/packages/tests/tests/sharding/peer_management.spec.ts @@ -1,7 +1,8 @@ import { bootstrap } from "@libp2p/bootstrap"; import type { PeerId } from "@libp2p/interface/peer-id"; import { wakuPeerExchangeDiscovery } from "@waku/peer-exchange"; -import { createLightNode, LightNode, Tags } from "@waku/sdk"; +import { createLightNode, LightNode, ShardInfo, Tags } from "@waku/sdk"; +import { singleShardInfoToPubsubTopic } from "@waku/utils"; import chai, { expect } from "chai"; import chaiAsPromised from "chai-as-promised"; import Sinon, { SinonSpy } from "sinon"; @@ -38,10 +39,13 @@ describe("Static Sharding: Peer Management", function () { it("all px service nodes subscribed to the shard topic should be dialed", async function () { this.timeout(100_000); - const pubsubTopics = ["/waku/2/rs/18/2"]; + const pubsubTopics = [ + singleShardInfoToPubsubTopic({ cluster: 18, index: 2 }) + ]; + const shardInfo: ShardInfo = { cluster: 18, indexList: [2] }; await nwaku1.start({ - topic: pubsubTopics, + pubsubTopic: pubsubTopics, discv5Discovery: true, peerExchange: true, relay: true @@ -50,7 +54,7 @@ describe("Static Sharding: Peer Management", function () { const enr1 = (await nwaku1.info()).enrUri; await nwaku2.start({ - topic: pubsubTopics, + pubsubTopic: pubsubTopics, discv5Discovery: true, peerExchange: true, discv5BootstrapNode: enr1, @@ -60,7 +64,7 @@ describe("Static Sharding: Peer Management", function () { const enr2 = (await nwaku2.info()).enrUri; await nwaku3.start({ - topic: pubsubTopics, + pubsubTopic: pubsubTopics, discv5Discovery: true, peerExchange: true, discv5BootstrapNode: enr2, @@ -69,7 +73,7 @@ describe("Static Sharding: Peer Management", function () { const nwaku3Ma = await nwaku3.getMultiaddrWithId(); waku = await createLightNode({ - pubsubTopics, + shardInfo: shardInfo, libp2p: { peerDiscovery: [ bootstrap({ list: [nwaku3Ma.toString()] }), @@ -107,12 +111,17 @@ describe("Static Sharding: Peer Management", function () { it("px service nodes not subscribed to the shard should not be dialed", async function () { this.timeout(100_000); - const pubsubTopicsToDial = ["/waku/2/rs/18/2"]; - const pubsubTopicsToIgnore = ["/waku/2/rs/18/3"]; + const pubsubTopicsToDial = [ + singleShardInfoToPubsubTopic({ cluster: 18, index: 2 }) + ]; + const shardInfoToDial: ShardInfo = { cluster: 18, indexList: [2] }; + const pubsubTopicsToIgnore = [ + singleShardInfoToPubsubTopic({ cluster: 18, index: 1 }) + ]; // this service node is not subscribed to the shard await nwaku1.start({ - topic: pubsubTopicsToIgnore, + pubsubTopic: pubsubTopicsToIgnore, relay: true, discv5Discovery: true, peerExchange: true @@ -121,7 +130,7 @@ describe("Static Sharding: Peer Management", function () { const enr1 = (await nwaku1.info()).enrUri; await nwaku2.start({ - topic: pubsubTopicsToDial, + pubsubTopic: pubsubTopicsToDial, relay: true, discv5Discovery: true, peerExchange: true, @@ -139,7 +148,7 @@ describe("Static Sharding: Peer Management", function () { const nwaku3Ma = await nwaku3.getMultiaddrWithId(); waku = await createLightNode({ - pubsubTopics: pubsubTopicsToDial, + shardInfo: shardInfoToDial, libp2p: { peerDiscovery: [ bootstrap({ list: [nwaku3Ma.toString()] }), diff --git a/packages/tests/tests/sharding/running_nodes.spec.ts b/packages/tests/tests/sharding/running_nodes.spec.ts index 7cab303228..6fedb6da02 100644 --- a/packages/tests/tests/sharding/running_nodes.spec.ts +++ b/packages/tests/tests/sharding/running_nodes.spec.ts @@ -1,14 +1,24 @@ -import { LightNode } from "@waku/interfaces"; +import { LightNode, ShardInfo, SingleShardInfo } from "@waku/interfaces"; import { createEncoder, createLightNode, utf8ToBytes } from "@waku/sdk"; +import { singleShardInfoToPubsubTopic } from "@waku/utils"; import { expect } from "chai"; import { tearDownNodes } from "../../src/index.js"; import { makeLogFileName } from "../../src/log_file.js"; import { NimGoNode } from "../../src/node/node.js"; -const PubsubTopic1 = "/waku/2/rs/0/2"; -const PubsubTopic2 = "/waku/2/rs/0/3"; - +const PubsubTopic1 = singleShardInfoToPubsubTopic({ + cluster: 0, + index: 2 +}); +const PubsubTopic2 = singleShardInfoToPubsubTopic({ + cluster: 0, + index: 3 +}); +const shardInfoFirstShard: ShardInfo = { cluster: 0, indexList: [2] }; +const shardInfoBothShards: ShardInfo = { cluster: 0, indexList: [2, 3] }; +const singleShardInfo1: SingleShardInfo = { cluster: 0, index: 2 }; +const singleShardInfo2: SingleShardInfo = { cluster: 0, index: 3 }; const ContentTopic = "/waku/2/content/test.js"; describe("Static Sharding: Running Nodes", () => { @@ -29,17 +39,17 @@ describe("Static Sharding: Running Nodes", () => { it("configure the node with multiple pubsub topics", async function () { this.timeout(15_000); waku = await createLightNode({ - pubsubTopics: [PubsubTopic1, PubsubTopic2] + shardInfo: shardInfoBothShards }); const encoder1 = createEncoder({ contentTopic: ContentTopic, - pubsubTopic: PubsubTopic1 + pubsubTopicShardInfo: singleShardInfo1 }); const encoder2 = createEncoder({ contentTopic: ContentTopic, - pubsubTopic: PubsubTopic2 + pubsubTopicShardInfo: singleShardInfo2 }); const request1 = await waku.lightPush.send(encoder1, { @@ -57,13 +67,13 @@ describe("Static Sharding: Running Nodes", () => { it("using a protocol with unconfigured pubsub topic should fail", async function () { this.timeout(15_000); waku = await createLightNode({ - pubsubTopics: [PubsubTopic1] + shardInfo: shardInfoFirstShard }); // use a pubsub topic that is not configured const encoder = createEncoder({ contentTopic: ContentTopic, - pubsubTopic: PubsubTopic2 + pubsubTopicShardInfo: singleShardInfo2 }); try { diff --git a/packages/tests/tests/store/cursor.node.spec.ts b/packages/tests/tests/store/cursor.node.spec.ts index 882475dc17..cddf6602af 100644 --- a/packages/tests/tests/store/cursor.node.spec.ts +++ b/packages/tests/tests/store/cursor.node.spec.ts @@ -6,7 +6,7 @@ import { expect } from "chai"; import { makeLogFileName, NimGoNode, tearDownNodes } from "../../src/index.js"; import { - customPubsubTopic, + customShardedPubsubTopic1, sendMessages, startAndConnectLightNode, TestContentTopic, @@ -179,7 +179,7 @@ describe("Waku Store, cursor", function () { messages.push(msg as DecodedMessage); } } - messages[5].pubsubTopic = customPubsubTopic; + messages[5].pubsubTopic = customShardedPubsubTopic1; const cursor = await createCursor(messages[5]); try { @@ -193,7 +193,7 @@ describe("Waku Store, cursor", function () { if ( !(err instanceof Error) || !err.message.includes( - `Cursor pubsub topic (${customPubsubTopic}) does not match decoder pubsub topic (${DefaultPubsubTopic})` + `Cursor pubsub topic (${customShardedPubsubTopic1}) does not match decoder pubsub topic (${DefaultPubsubTopic})` ) ) { throw err; diff --git a/packages/tests/tests/store/error_handling.node.spec.ts b/packages/tests/tests/store/error_handling.node.spec.ts index e3ddd62ea4..e18f9bcc8f 100644 --- a/packages/tests/tests/store/error_handling.node.spec.ts +++ b/packages/tests/tests/store/error_handling.node.spec.ts @@ -5,8 +5,8 @@ import { expect } from "chai"; import { makeLogFileName, NimGoNode, tearDownNodes } from "../../src/index.js"; import { - customPubsubTopic, - customTestDecoder, + customDecoder1, + customShardedPubsubTopic1, processQueriedMessages, startAndConnectLightNode, TestDecoder @@ -33,7 +33,7 @@ describe("Waku Store, error handling", function () { it("Query Generator, Wrong PubsubTopic", async function () { try { for await (const msgPromises of waku.store.queryGenerator([ - customTestDecoder + customDecoder1 ])) { msgPromises; } @@ -42,7 +42,7 @@ describe("Waku Store, error handling", function () { if ( !(err instanceof Error) || !err.message.includes( - `Pubsub topic ${customPubsubTopic} has not been configured on this instance. Configured topics are: ${DefaultPubsubTopic}` + `Pubsub topic ${customShardedPubsubTopic1} has not been configured on this instance. Configured topics are: ${DefaultPubsubTopic}` ) ) { throw err; @@ -54,7 +54,7 @@ describe("Waku Store, error handling", function () { try { for await (const msgPromises of waku.store.queryGenerator([ TestDecoder, - customTestDecoder + customDecoder1 ])) { msgPromises; } @@ -99,7 +99,7 @@ describe("Waku Store, error handling", function () { it("Query with Ordered Callback, Wrong PubsubTopic", async function () { try { await waku.store.queryWithOrderedCallback( - [customTestDecoder], + [customDecoder1], async () => {} ); throw new Error("QueryGenerator was successful but was expected to fail"); @@ -107,7 +107,7 @@ describe("Waku Store, error handling", function () { if ( !(err instanceof Error) || !err.message.includes( - `Pubsub topic ${customPubsubTopic} has not been configured on this instance. Configured topics are: ${DefaultPubsubTopic}` + `Pubsub topic ${customShardedPubsubTopic1} has not been configured on this instance. Configured topics are: ${DefaultPubsubTopic}` ) ) { throw err; @@ -118,7 +118,7 @@ describe("Waku Store, error handling", function () { it("Query with Ordered Callback, Multiple PubsubTopics", async function () { try { await waku.store.queryWithOrderedCallback( - [TestDecoder, customTestDecoder], + [TestDecoder, customDecoder1], async () => {} ); throw new Error("QueryGenerator was successful but was expected to fail"); @@ -159,7 +159,7 @@ describe("Waku Store, error handling", function () { it("Query with Promise Callback, Wrong PubsubTopic", async function () { try { await waku.store.queryWithPromiseCallback( - [customTestDecoder], + [customDecoder1], async () => {} ); throw new Error("QueryGenerator was successful but was expected to fail"); @@ -167,7 +167,7 @@ describe("Waku Store, error handling", function () { if ( !(err instanceof Error) || !err.message.includes( - `Pubsub topic ${customPubsubTopic} has not been configured on this instance. Configured topics are: ${DefaultPubsubTopic}` + `Pubsub topic ${customShardedPubsubTopic1} has not been configured on this instance. Configured topics are: ${DefaultPubsubTopic}` ) ) { throw err; @@ -178,7 +178,7 @@ describe("Waku Store, error handling", function () { it("Query with Promise Callback, Multiple PubsubTopics", async function () { try { await waku.store.queryWithPromiseCallback( - [TestDecoder, customTestDecoder], + [TestDecoder, customDecoder1], async () => {} ); throw new Error("QueryGenerator was successful but was expected to fail"); diff --git a/packages/tests/tests/store/index.node.spec.ts b/packages/tests/tests/store/index.node.spec.ts index b446bd7829..c2347e54e0 100644 --- a/packages/tests/tests/store/index.node.spec.ts +++ b/packages/tests/tests/store/index.node.spec.ts @@ -33,7 +33,7 @@ import { } from "../../src/index.js"; import { - customContentTopic, + customContentTopic1, log, messageText, processQueriedMessages, @@ -45,7 +45,7 @@ import { totalMsgs } from "./utils.js"; -const secondDecoder = createDecoder(customContentTopic, DefaultPubsubTopic); +const secondDecoder = createDecoder(customContentTopic1); describe("Waku Store, general", function () { this.timeout(15000); @@ -124,7 +124,7 @@ describe("Waku Store, general", function () { await nwaku.sendMessage( NimGoNode.toMessageRpcQuery({ payload: utf8ToBytes("M2"), - contentTopic: customContentTopic + contentTopic: customContentTopic1 }), DefaultPubsubTopic ); @@ -137,7 +137,7 @@ describe("Waku Store, general", function () { DefaultPubsubTopic ); expect(messageCollector.hasMessage(TestContentTopic, "M1")).to.eq(true); - expect(messageCollector.hasMessage(customContentTopic, "M2")).to.eq(true); + expect(messageCollector.hasMessage(customContentTopic1, "M2")).to.eq(true); }); it("Query generator for multiple messages with different content topic format", async function () { diff --git a/packages/tests/tests/store/multiple_pubsub.spec.ts b/packages/tests/tests/store/multiple_pubsub.spec.ts index 6a2af659b4..d476ebf0b3 100644 --- a/packages/tests/tests/store/multiple_pubsub.spec.ts +++ b/packages/tests/tests/store/multiple_pubsub.spec.ts @@ -1,4 +1,4 @@ -import { DefaultPubsubTopic, waitForRemotePeer } from "@waku/core"; +import { waitForRemotePeer } from "@waku/core"; import type { IMessage, LightNode } from "@waku/interfaces"; import { createLightNode, Protocols } from "@waku/sdk"; import { expect } from "chai"; @@ -11,14 +11,17 @@ import { } from "../../src/index.js"; import { - customContentTopic, - customPubsubTopic, - customTestDecoder, + customContentTopic1, + customContentTopic2, + customDecoder1, + customDecoder2, + customShardedPubsubTopic1, + customShardedPubsubTopic2, processQueriedMessages, sendMessages, + shardInfo1, + shardInfoBothShards, startAndConnectLightNode, - TestContentTopic, - TestDecoder, totalMsgs } from "./utils.js"; @@ -33,10 +36,13 @@ describe("Waku Store, custom pubsub topic", function () { nwaku = new NimGoNode(makeLogFileName(this)); await nwaku.start({ store: true, - topic: [customPubsubTopic, DefaultPubsubTopic], + pubsubTopic: [customShardedPubsubTopic1, customShardedPubsubTopic2], relay: true }); - await nwaku.ensureSubscriptions([customPubsubTopic, DefaultPubsubTopic]); + await nwaku.ensureSubscriptions([ + customShardedPubsubTopic1, + customShardedPubsubTopic2 + ]); }); afterEach(async function () { @@ -45,12 +51,17 @@ describe("Waku Store, custom pubsub topic", function () { }); it("Generator, custom pubsub topic", async function () { - await sendMessages(nwaku, totalMsgs, customContentTopic, customPubsubTopic); - waku = await startAndConnectLightNode(nwaku, [customPubsubTopic]); + await sendMessages( + nwaku, + totalMsgs, + customContentTopic1, + customShardedPubsubTopic1 + ); + waku = await startAndConnectLightNode(nwaku, [], shardInfo1); const messages = await processQueriedMessages( waku, - [customTestDecoder], - customPubsubTopic + [customDecoder1], + customShardedPubsubTopic1 ); expect(messages?.length).eq(totalMsgs); @@ -64,18 +75,25 @@ describe("Waku Store, custom pubsub topic", function () { this.timeout(10000); const totalMsgs = 10; - await sendMessages(nwaku, totalMsgs, customContentTopic, customPubsubTopic); - await sendMessages(nwaku, totalMsgs, TestContentTopic, DefaultPubsubTopic); + await sendMessages( + nwaku, + totalMsgs, + customContentTopic1, + customShardedPubsubTopic1 + ); + await sendMessages( + nwaku, + totalMsgs, + customContentTopic2, + customShardedPubsubTopic2 + ); - waku = await startAndConnectLightNode(nwaku, [ - customPubsubTopic, - DefaultPubsubTopic - ]); + waku = await startAndConnectLightNode(nwaku, [], shardInfoBothShards); const customMessages = await processQueriedMessages( waku, - [customTestDecoder], - customPubsubTopic + [customDecoder1], + customShardedPubsubTopic1 ); expect(customMessages?.length).eq(totalMsgs); const result1 = customMessages?.findIndex((msg) => { @@ -85,8 +103,8 @@ describe("Waku Store, custom pubsub topic", function () { const testMessages = await processQueriedMessages( waku, - [TestDecoder], - DefaultPubsubTopic + [customDecoder2], + customShardedPubsubTopic2 ); expect(testMessages?.length).eq(totalMsgs); const result2 = testMessages?.findIndex((msg) => { @@ -102,18 +120,28 @@ describe("Waku Store, custom pubsub topic", function () { nwaku2 = new NimGoNode(makeLogFileName(this) + "2"); await nwaku2.start({ store: true, - topic: [DefaultPubsubTopic], + pubsubTopic: [customShardedPubsubTopic2], relay: true }); - await nwaku2.ensureSubscriptions([DefaultPubsubTopic]); + await nwaku2.ensureSubscriptions([customShardedPubsubTopic2]); const totalMsgs = 10; - await sendMessages(nwaku, totalMsgs, customContentTopic, customPubsubTopic); - await sendMessages(nwaku2, totalMsgs, TestContentTopic, DefaultPubsubTopic); + await sendMessages( + nwaku, + totalMsgs, + customContentTopic1, + customShardedPubsubTopic1 + ); + await sendMessages( + nwaku2, + totalMsgs, + customContentTopic2, + customShardedPubsubTopic2 + ); waku = await createLightNode({ staticNoiseKey: NOISE_KEY_1, - pubsubTopics: [customPubsubTopic, DefaultPubsubTopic] + shardInfo: shardInfoBothShards }); await waku.start(); @@ -130,13 +158,13 @@ describe("Waku Store, custom pubsub topic", function () { ) { customMessages = await processQueriedMessages( waku, - [customTestDecoder], - customPubsubTopic + [customDecoder1], + customShardedPubsubTopic1 ); testMessages = await processQueriedMessages( waku, - [TestDecoder], - DefaultPubsubTopic + [customDecoder2], + customShardedPubsubTopic2 ); } }); diff --git a/packages/tests/tests/store/utils.ts b/packages/tests/tests/store/utils.ts index c742018399..1bc5ba4fe9 100644 --- a/packages/tests/tests/store/utils.ts +++ b/packages/tests/tests/store/utils.ts @@ -6,9 +6,9 @@ import { DefaultPubsubTopic, waitForRemotePeer } from "@waku/core"; -import { LightNode, Protocols } from "@waku/interfaces"; +import { LightNode, Protocols, ShardInfo } from "@waku/interfaces"; import { createLightNode } from "@waku/sdk"; -import { Logger } from "@waku/utils"; +import { Logger, singleShardInfoToPubsubTopic } from "@waku/utils"; import { expect } from "chai"; import { delay, NimGoNode, NOISE_KEY_1 } from "../../src"; @@ -18,12 +18,26 @@ export const log = new Logger("test:store"); export const TestContentTopic = "/test/1/waku-store/utf8"; export const TestEncoder = createEncoder({ contentTopic: TestContentTopic }); export const TestDecoder = createDecoder(TestContentTopic); -export const customContentTopic = "/test/2/waku-store/utf8"; -export const customPubsubTopic = "/waku/2/custom-dapp/proto"; -export const customTestDecoder = createDecoder( - customContentTopic, - customPubsubTopic -); +export const customShardedPubsubTopic1 = singleShardInfoToPubsubTopic({ + cluster: 3, + index: 1 +}); +export const customShardedPubsubTopic2 = singleShardInfoToPubsubTopic({ + cluster: 3, + index: 2 +}); +export const shardInfo1: ShardInfo = { cluster: 3, indexList: [1] }; +export const customContentTopic1 = "/test/2/waku-store/utf8"; +export const customContentTopic2 = "/test/3/waku-store/utf8"; +export const customDecoder1 = createDecoder(customContentTopic1, { + cluster: 3, + index: 1 +}); +export const customDecoder2 = createDecoder(customContentTopic2, { + cluster: 3, + index: 2 +}); +export const shardInfoBothShards: ShardInfo = { cluster: 3, indexList: [1, 2] }; export const totalMsgs = 20; export const messageText = "Store Push works!"; @@ -66,10 +80,14 @@ export async function processQueriedMessages( export async function startAndConnectLightNode( instance: NimGoNode, - pubsubTopics: string[] = [DefaultPubsubTopic] + pubsubTopics: string[] = [DefaultPubsubTopic], + shardInfo?: ShardInfo ): Promise { const waku = await createLightNode({ - pubsubTopics: pubsubTopics, + ...((pubsubTopics.length !== 1 || + pubsubTopics[0] !== DefaultPubsubTopic) && { + shardInfo: shardInfo + }), staticNoiseKey: NOISE_KEY_1 }); await waku.start(); diff --git a/packages/utils/src/common/sharding.ts b/packages/utils/src/common/sharding.ts index e935ed11c6..afaecb2b67 100644 --- a/packages/utils/src/common/sharding.ts +++ b/packages/utils/src/common/sharding.ts @@ -1,16 +1,41 @@ import { sha256 } from "@noble/hashes/sha256"; -import type { PubsubTopic, ShardInfo } from "@waku/interfaces"; +import type { PubsubTopic, ShardInfo, SingleShardInfo } from "@waku/interfaces"; import { concat, utf8ToBytes } from "../bytes/index.js"; +export const singleShardInfoToPubsubTopic = ( + shardInfo: SingleShardInfo +): PubsubTopic => { + if (shardInfo.cluster === undefined || shardInfo.index === undefined) + throw new Error("Invalid shard"); + + return `/waku/2/rs/${shardInfo.cluster}/${shardInfo.index}`; +}; + export const shardInfoToPubsubTopics = ( shardInfo: ShardInfo ): PubsubTopic[] => { + if (shardInfo.cluster === undefined || shardInfo.indexList === undefined) + throw new Error("Invalid shard"); + return shardInfo.indexList.map( (index) => `/waku/2/rs/${shardInfo.cluster}/${index}` ); }; +export const pubsubTopicToSingleShardInfo = ( + pubsubTopics: PubsubTopic +): SingleShardInfo => { + const parts = pubsubTopics.split("/"); + if (parts.length != 6) throw new Error("Invalid pubsub topic"); + + const cluster = parseInt(parts[4]); + const index = parseInt(parts[5]); + if (isNaN(cluster) || isNaN(index)) throw new Error("Invalid pubsub topic"); + + return { cluster, index }; +}; + export function ensurePubsubTopicIsConfigured( pubsubTopic: PubsubTopic, configuredTopics: PubsubTopic[]