diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 04e37bd19a..f51901a109 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -1,5 +1,4 @@ export { DefaultUserAgent } from "./lib/waku.js"; -export { DefaultPubsubTopic } from "./lib/constants.js"; export { createEncoder, createDecoder } from "./lib/message/version_0.js"; export type { Encoder, diff --git a/packages/core/src/lib/base_protocol.ts b/packages/core/src/lib/base_protocol.ts index f6fcd4663c..22d2ded86e 100644 --- a/packages/core/src/lib/base_protocol.ts +++ b/packages/core/src/lib/base_protocol.ts @@ -6,12 +6,12 @@ import type { IBaseProtocol, Libp2pComponents, PubsubTopic, - ShardInfo + ShardingParams } from "@waku/interfaces"; +import { DefaultPubsubTopic } from "@waku/interfaces"; import { shardInfoToPubsubTopics } from "@waku/utils"; import { getPeersForProtocol, selectPeerForProtocol } from "@waku/utils/libp2p"; -import { DefaultPubsubTopic } from "./constants.js"; import { filterPeers } from "./filterPeers.js"; import { StreamManager } from "./stream_manager.js"; @@ -97,7 +97,7 @@ export class BaseProtocol implements IBaseProtocol { return filterPeers(allPeersForProtocol, numPeers, maxBootstrapPeers); } - initializePubsubTopic(shardInfo?: ShardInfo): PubsubTopic[] { + initializePubsubTopic(shardInfo?: ShardingParams): PubsubTopic[] { return shardInfo ? shardInfoToPubsubTopics(shardInfo) : [DefaultPubsubTopic]; diff --git a/packages/core/src/lib/filter/index.ts b/packages/core/src/lib/filter/index.ts index 93c365dcd8..c54d1df39a 100644 --- a/packages/core/src/lib/filter/index.ts +++ b/packages/core/src/lib/filter/index.ts @@ -17,6 +17,7 @@ import type { SingleShardInfo, Unsubscribe } from "@waku/interfaces"; +import { DefaultPubsubTopic } from "@waku/interfaces"; import { WakuMessage } from "@waku/proto"; import { ensurePubsubTopicIsConfigured, @@ -30,7 +31,6 @@ import * as lp from "it-length-prefixed"; import { pipe } from "it-pipe"; import { BaseProtocol } from "../base_protocol.js"; -import { DefaultPubsubTopic } from "../constants.js"; import { FilterPushRpc, diff --git a/packages/core/src/lib/message/version_0.ts b/packages/core/src/lib/message/version_0.ts index bb37c50358..a55b82059f 100644 --- a/packages/core/src/lib/message/version_0.ts +++ b/packages/core/src/lib/message/version_0.ts @@ -11,9 +11,7 @@ import type { SingleShardInfo } from "@waku/interfaces"; import { proto_message as proto } from "@waku/proto"; -import { Logger, singleShardInfoToPubsubTopic } from "@waku/utils"; - -import { DefaultPubsubTopic } from "../constants.js"; +import { determinePubsubTopic, Logger } from "@waku/utils"; const log = new Logger("message:version-0"); const OneMillion = BigInt(1_000_000); @@ -128,9 +126,7 @@ export function createEncoder({ return new Encoder( contentTopic, ephemeral, - pubsubTopicShardInfo - ? singleShardInfoToPubsubTopic(pubsubTopicShardInfo) - : DefaultPubsubTopic, + determinePubsubTopic(contentTopic, pubsubTopicShardInfo), metaSetter ); } @@ -193,9 +189,7 @@ export function createDecoder( pubsubTopicShardInfo?: SingleShardInfo ): Decoder { return new Decoder( - pubsubTopicShardInfo - ? singleShardInfoToPubsubTopic(pubsubTopicShardInfo) - : DefaultPubsubTopic, + determinePubsubTopic(contentTopic, pubsubTopicShardInfo), contentTopic ); } diff --git a/packages/core/src/lib/metadata/index.ts b/packages/core/src/lib/metadata/index.ts index b3c6892ab0..9f95b5ba24 100644 --- a/packages/core/src/lib/metadata/index.ts +++ b/packages/core/src/lib/metadata/index.ts @@ -1,7 +1,12 @@ import type { PeerId } from "@libp2p/interface/peer-id"; import { IncomingStreamData } from "@libp2p/interface/stream-handler"; import { encodeRelayShard } from "@waku/enr"; -import type { IMetadata, Libp2pComponents, ShardInfo } from "@waku/interfaces"; +import type { + IMetadata, + Libp2pComponents, + ShardInfo, + ShardingParams +} from "@waku/interfaces"; import { proto_metadata } from "@waku/proto"; import { Logger } from "@waku/utils"; import all from "it-all"; @@ -16,9 +21,9 @@ const log = new Logger("metadata"); export const MetadataCodec = "/vac/waku/metadata/1.0.0"; class Metadata extends BaseProtocol { - private readonly shardInfo: ShardInfo; + private readonly shardInfo: ShardingParams; private libp2pComponents: Libp2pComponents; - constructor(shardInfo: ShardInfo, libp2p: Libp2pComponents) { + constructor(shardInfo: ShardingParams, libp2p: Libp2pComponents) { super(MetadataCodec, libp2p.components); this.libp2pComponents = libp2p; this.shardInfo = shardInfo; @@ -99,7 +104,7 @@ class Metadata extends BaseProtocol { } export function wakuMetadata( - shardInfo: ShardInfo + shardInfo: ShardingParams ): (components: Libp2pComponents) => IMetadata { return (components: Libp2pComponents) => new Metadata(shardInfo, components); } diff --git a/packages/core/src/lib/waku.ts b/packages/core/src/lib/waku.ts index 4d6b7fa038..c452cf6db6 100644 --- a/packages/core/src/lib/waku.ts +++ b/packages/core/src/lib/waku.ts @@ -8,14 +8,13 @@ import type { IStore, Libp2p, PubsubTopic, - ShardInfo, + ShardingParams, Waku } from "@waku/interfaces"; -import { Protocols } from "@waku/interfaces"; +import { DefaultPubsubTopic, Protocols } from "@waku/interfaces"; import { Logger, shardInfoToPubsubTopics } from "@waku/utils"; import { ConnectionManager } from "./connection_manager.js"; -import { DefaultPubsubTopic } from "./constants.js"; export const DefaultPingKeepAliveValueSecs = 5 * 60; export const DefaultRelayKeepAliveValueSecs = 5 * 60; @@ -57,7 +56,7 @@ export class WakuNode implements Waku { constructor( options: WakuOptions, libp2p: Libp2p, - pubsubShardInfo?: ShardInfo, + pubsubShardInfo?: ShardingParams, store?: (libp2p: Libp2p) => IStore, lightPush?: (libp2p: Libp2p) => ILightPush, filter?: (libp2p: Libp2p) => IFilter, diff --git a/packages/core/src/lib/constants.ts b/packages/interfaces/src/constants.ts similarity index 100% rename from packages/core/src/lib/constants.ts rename to packages/interfaces/src/constants.ts diff --git a/packages/interfaces/src/index.ts b/packages/interfaces/src/index.ts index 6ce8738a0d..03ff627af8 100644 --- a/packages/interfaces/src/index.ts +++ b/packages/interfaces/src/index.ts @@ -15,3 +15,4 @@ export * from "./libp2p.js"; export * from "./keep_alive_manager.js"; export * from "./dns_discovery.js"; export * from "./metadata.js"; +export * from "./constants.js"; diff --git a/packages/interfaces/src/message.ts b/packages/interfaces/src/message.ts index 40de174026..1a4dedeac9 100644 --- a/packages/interfaces/src/message.ts +++ b/packages/interfaces/src/message.ts @@ -2,7 +2,10 @@ import type { PubsubTopic } from "./misc.js"; export interface SingleShardInfo { clusterId: number; - shard: number; + /** + * Specifying this field indicates to the encoder/decoder that static sharding must be used. + */ + shard?: number; } export interface IRateLimitProof { diff --git a/packages/interfaces/src/protocols.ts b/packages/interfaces/src/protocols.ts index 5a9d733274..33650580da 100644 --- a/packages/interfaces/src/protocols.ts +++ b/packages/interfaces/src/protocols.ts @@ -21,6 +21,13 @@ export interface IBaseProtocol { removeLibp2pEventListener: Libp2p["removeEventListener"]; } +export type ContentTopicInfo = { + clusterId: number; + contentTopics: string[]; +}; + +export type ShardingParams = ShardInfo | ContentTopicInfo; + export type ProtocolCreateOptions = { /** * Waku supports usage of multiple pubsub topics. This is achieved through static sharding for now, and auto-sharding in the future. @@ -39,7 +46,7 @@ export type ProtocolCreateOptions = { * See [Waku v2 Topic Usage Recommendations](https://rfc.vac.dev/spec/23/) for details. * */ - shardInfo?: ShardInfo; + shardInfo?: ShardingParams; /** * You can pass options to the `Libp2p` instance used by {@link @waku/core!WakuNode} using the `libp2p` property. * This property is the same type as the one passed to [`Libp2p.create`](https://github.com/libp2p/js-libp2p/blob/master/doc/API.md#create) diff --git a/packages/message-encryption/src/ecies.ts b/packages/message-encryption/src/ecies.ts index 24d7aa0fee..6a892034eb 100644 --- a/packages/message-encryption/src/ecies.ts +++ b/packages/message-encryption/src/ecies.ts @@ -1,4 +1,3 @@ -import { DefaultPubsubTopic } from "@waku/core"; import { Decoder as DecoderV0 } from "@waku/core/lib/message/version_0"; import type { EncoderOptions as BaseEncoderOptions, @@ -11,7 +10,7 @@ import type { SingleShardInfo } from "@waku/interfaces"; import { WakuMessage } from "@waku/proto"; -import { Logger, singleShardInfoToPubsubTopic } from "@waku/utils"; +import { determinePubsubTopic, Logger } from "@waku/utils"; import { generatePrivateKey } from "./crypto/utils.js"; import { DecodedMessage } from "./decoded_message.js"; @@ -107,9 +106,7 @@ export function createEncoder({ metaSetter }: EncoderOptions): Encoder { return new Encoder( - pubsubTopicShardInfo - ? singleShardInfoToPubsubTopic(pubsubTopicShardInfo) - : DefaultPubsubTopic, + determinePubsubTopic(contentTopic, pubsubTopicShardInfo), contentTopic, publicKey, sigPrivKey, @@ -200,9 +197,7 @@ export function createDecoder( pubsubTopicShardInfo?: SingleShardInfo ): Decoder { return new Decoder( - pubsubTopicShardInfo - ? singleShardInfoToPubsubTopic(pubsubTopicShardInfo) - : DefaultPubsubTopic, + determinePubsubTopic(contentTopic, pubsubTopicShardInfo), contentTopic, privateKey ); diff --git a/packages/message-encryption/src/symmetric.ts b/packages/message-encryption/src/symmetric.ts index 9854f5c665..841784db55 100644 --- a/packages/message-encryption/src/symmetric.ts +++ b/packages/message-encryption/src/symmetric.ts @@ -1,4 +1,3 @@ -import { DefaultPubsubTopic } from "@waku/core"; import { Decoder as DecoderV0 } from "@waku/core/lib/message/version_0"; import type { EncoderOptions as BaseEncoderOptions, @@ -11,7 +10,7 @@ import type { SingleShardInfo } from "@waku/interfaces"; import { WakuMessage } from "@waku/proto"; -import { Logger, singleShardInfoToPubsubTopic } from "@waku/utils"; +import { determinePubsubTopic, Logger } from "@waku/utils"; import { generateSymmetricKey } from "./crypto/utils.js"; import { DecodedMessage } from "./decoded_message.js"; @@ -107,9 +106,7 @@ export function createEncoder({ metaSetter }: EncoderOptions): Encoder { return new Encoder( - pubsubTopicShardInfo - ? singleShardInfoToPubsubTopic(pubsubTopicShardInfo) - : DefaultPubsubTopic, + determinePubsubTopic(contentTopic, pubsubTopicShardInfo), contentTopic, symKey, sigPrivKey, @@ -200,9 +197,7 @@ export function createDecoder( pubsubTopicShardInfo?: SingleShardInfo ): Decoder { return new Decoder( - pubsubTopicShardInfo - ? singleShardInfoToPubsubTopic(pubsubTopicShardInfo) - : DefaultPubsubTopic, + determinePubsubTopic(contentTopic, pubsubTopicShardInfo), contentTopic, symKey ); diff --git a/packages/relay/src/index.ts b/packages/relay/src/index.ts index 8e7cce499e..26b5055722 100644 --- a/packages/relay/src/index.ts +++ b/packages/relay/src/index.ts @@ -9,7 +9,7 @@ import { SignaturePolicy } from "@chainsafe/libp2p-gossipsub/types"; import type { PeerId } from "@libp2p/interface/peer-id"; import type { PubSub as Libp2pPubsub } from "@libp2p/interface/pubsub"; import { sha256 } from "@noble/hashes/sha256"; -import { DefaultPubsubTopic } from "@waku/core"; +import { DefaultPubsubTopic } from "@waku/interfaces"; import { ActiveSubscriptions, Callback, diff --git a/packages/relay/src/topic_only_message.ts b/packages/relay/src/topic_only_message.ts index cd0916fe2b..2335ee7320 100644 --- a/packages/relay/src/topic_only_message.ts +++ b/packages/relay/src/topic_only_message.ts @@ -1,4 +1,4 @@ -import { DefaultPubsubTopic } from "@waku/core"; +import { DefaultPubsubTopic } from "@waku/interfaces"; import type { IDecodedMessage, IDecoder, diff --git a/packages/sdk/src/create.ts b/packages/sdk/src/create.ts index 11668689ee..cb96f5947c 100644 --- a/packages/sdk/src/create.ts +++ b/packages/sdk/src/create.ts @@ -23,7 +23,7 @@ import type { LightNode, ProtocolCreateOptions, RelayNode, - ShardInfo + ShardingParams } from "@waku/interfaces"; import { wakuPeerExchangeDiscovery } from "@waku/peer-exchange"; import { RelayCreateOptions, wakuGossipSub, wakuRelay } from "@waku/relay"; @@ -180,7 +180,7 @@ type MetadataService = { }; export async function defaultLibp2p( - shardInfo?: ShardInfo, + shardInfo?: ShardingParams, wakuGossipSub?: PubsubService["pubsub"], options?: Partial, userAgent?: string diff --git a/packages/tests/src/message_collector.ts b/packages/tests/src/message_collector.ts index 91f2a80454..2c60b9f679 100644 --- a/packages/tests/src/message_collector.ts +++ b/packages/tests/src/message_collector.ts @@ -1,4 +1,5 @@ -import { DecodedMessage, DefaultPubsubTopic } from "@waku/core"; +import { DecodedMessage } from "@waku/core"; +import { DefaultPubsubTopic } from "@waku/interfaces"; import { Logger } from "@waku/utils"; import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes"; import { AssertionError, expect } from "chai"; @@ -103,6 +104,49 @@ export class MessageCollector { } } + async waitForMessagesAutosharding( + numMessages: number, + options?: { + contentTopic: string; + timeoutDuration?: number; + exact?: boolean; + } + ): Promise { + const startTime = Date.now(); + const timeoutDuration = options?.timeoutDuration || 400; + const exact = options?.exact || false; + + while (this.count < numMessages) { + if (this.nwaku) { + try { + this.list = await this.nwaku.messagesAutosharding( + options!.contentTopic + ); + } catch (error) { + log.error(`Can't retrieve messages because of ${error}`); + await delay(10); + } + } + + if (Date.now() - startTime > timeoutDuration * numMessages) { + return false; + } + + await delay(10); + } + + if (exact) { + if (this.count == numMessages) { + return true; + } else { + log.warn(`Was expecting exactly ${numMessages} messages`); + return false; + } + } else { + return true; + } + } + // Verifies a received message against expected values. verifyReceivedMessage( index: number, diff --git a/packages/tests/src/node/node.ts b/packages/tests/src/node/node.ts index f14a0de279..54be7a908d 100644 --- a/packages/tests/src/node/node.ts +++ b/packages/tests/src/node/node.ts @@ -1,7 +1,7 @@ import type { PeerId } from "@libp2p/interface/peer-id"; import { peerIdFromString } from "@libp2p/peer-id"; import { Multiaddr, multiaddr } from "@multiformats/multiaddr"; -import { DefaultPubsubTopic } from "@waku/core"; +import { DefaultPubsubTopic } from "@waku/interfaces"; import { isDefined } from "@waku/utils"; import { Logger } from "@waku/utils"; import { bytesToHex, hexToBytes } from "@waku/utils/bytes"; @@ -216,6 +216,16 @@ export class NimGoNode { ]); } + async ensureSubscriptionsAutosharding( + contentTopics: string[] + ): Promise { + this.checkProcess(); + + return this.rpcCall("post_waku_v2_relay_v1_auto_subscriptions", [ + contentTopics + ]); + } + async sendMessage( message: MessageRpcQuery, pubsubTopic: string = DefaultPubsubTopic @@ -232,6 +242,18 @@ export class NimGoNode { ]); } + async sendMessageAutosharding(message: MessageRpcQuery): Promise { + this.checkProcess(); + + if (typeof message.timestamp === "undefined") { + message.timestamp = BigInt(new Date().valueOf()) * OneMillion; + } + + return this.rpcCall("post_waku_v2_relay_v1_auto_message", [ + message + ]); + } + async messages( pubsubTopic: string = DefaultPubsubTopic ): Promise { @@ -245,6 +267,19 @@ export class NimGoNode { return msgs.filter(isDefined); } + async messagesAutosharding( + contentTopic: string + ): Promise { + this.checkProcess(); + + const msgs = await this.rpcCall( + "get_waku_v2_relay_v1_auto_messages", + [contentTopic] + ); + + return msgs.filter(isDefined); + } + async getAsymmetricKeyPair(): Promise { this.checkProcess(); diff --git a/packages/tests/tests/filter/multiple_pubsub.node.spec.ts b/packages/tests/tests/filter/multiple_pubsub.node.spec.ts index 4be4cac9d3..9875e12d49 100644 --- a/packages/tests/tests/filter/multiple_pubsub.node.spec.ts +++ b/packages/tests/tests/filter/multiple_pubsub.node.spec.ts @@ -1,5 +1,6 @@ import { createDecoder, createEncoder, waitForRemotePeer } from "@waku/core"; import type { + ContentTopicInfo, IFilterSubscription, LightNode, ShardInfo, @@ -7,6 +8,7 @@ import type { } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces"; import { + contentTopicToPubsubTopic, pubsubTopicToSingleShardInfo, singleShardInfoToPubsubTopic } from "@waku/utils"; @@ -178,3 +180,177 @@ describe("Waku Filter V2: Multiple PubsubTopics", function () { } }); }); + +describe("Waku Filter V2 (Autosharding): Multiple PubsubTopics", function () { + // Set the timeout for all tests in this suite. Can be overwritten at test level + this.timeout(30000); + let waku: LightNode; + let nwaku: NimGoNode; + let nwaku2: NimGoNode; + let subscription: IFilterSubscription; + let messageCollector: MessageCollector; + + const customContentTopic1 = "/waku/2/content/utf8"; + const customContentTopic2 = "/myapp/1/latest/proto"; + const autoshardingPubsubTopic1 = contentTopicToPubsubTopic( + customContentTopic1, + 3 + ); + const autoshardingPubsubTopic2 = contentTopicToPubsubTopic( + customContentTopic2, + 3 + ); + const contentTopicInfo: ContentTopicInfo = { + clusterId: 3, + contentTopics: [customContentTopic1, customContentTopic2] + }; + const customEncoder1 = createEncoder({ + contentTopic: customContentTopic1, + pubsubTopicShardInfo: { + clusterId: 3 + } + }); + const customDecoder1 = createDecoder(customContentTopic1, { clusterId: 3 }); + const customEncoder2 = createEncoder({ + contentTopic: customContentTopic2, + pubsubTopicShardInfo: { + clusterId: 3 + } + }); + const customDecoder2 = createDecoder(customContentTopic2, { clusterId: 3 }); + + this.beforeEach(async function () { + this.timeout(15000); + [nwaku, waku] = await runNodes( + this, + [autoshardingPubsubTopic1, autoshardingPubsubTopic2], + contentTopicInfo + ); + subscription = await waku.filter.createSubscription( + pubsubTopicToSingleShardInfo(autoshardingPubsubTopic1) + ); + messageCollector = new MessageCollector(); + }); + + this.afterEach(async function () { + this.timeout(15000); + await tearDownNodes([nwaku, nwaku2], waku); + }); + + it("Subscribe and receive messages on autosharded pubsubtopic", async function () { + await subscription.subscribe([customDecoder1], messageCollector.callback); + await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") }); + expect( + await messageCollector.waitForMessagesAutosharding(1, { + contentTopic: customContentTopic1 + }) + ).to.eq(true); + messageCollector.verifyReceivedMessage(0, { + expectedContentTopic: customContentTopic1, + expectedPubsubTopic: autoshardingPubsubTopic1, + expectedMessageText: "M1" + }); + }); + + it("Subscribe and receive messages on 2 different pubsubtopics", async function () { + await subscription.subscribe([customDecoder1], messageCollector.callback); + + // Subscribe from the same lightnode to the 2nd pubsubtopic + const subscription2 = await waku.filter.createSubscription( + pubsubTopicToSingleShardInfo(autoshardingPubsubTopic2) + ); + + const messageCollector2 = new MessageCollector(); + + await subscription2.subscribe([customDecoder2], messageCollector2.callback); + + await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") }); + await waku.lightPush.send(customEncoder2, { payload: utf8ToBytes("M2") }); + + expect( + await messageCollector.waitForMessagesAutosharding(1, { + contentTopic: customContentTopic1 + }) + ).to.eq(true); + expect( + await messageCollector2.waitForMessagesAutosharding(1, { + contentTopic: customContentTopic2 + }) + ).to.eq(true); + + messageCollector.verifyReceivedMessage(0, { + expectedContentTopic: customContentTopic1, + expectedPubsubTopic: autoshardingPubsubTopic1, + expectedMessageText: "M1" + }); + + messageCollector2.verifyReceivedMessage(0, { + expectedContentTopic: customContentTopic2, + expectedPubsubTopic: autoshardingPubsubTopic2, + expectedMessageText: "M2" + }); + }); + + it("Subscribe and receive messages from 2 nwaku nodes each with different pubsubtopics", async function () { + await subscription.subscribe([customDecoder1], messageCollector.callback); + + // Set up and start a new nwaku node with customPubsubTopic1 + nwaku2 = new NimGoNode(makeLogFileName(this) + "2"); + await nwaku2.start({ + filter: true, + lightpush: true, + relay: true, + pubsubTopic: [autoshardingPubsubTopic2] + }); + await waku.dial(await nwaku2.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]); + + // Subscribe from the same lightnode to the new nwaku on the new pubsubtopic + const subscription2 = await waku.filter.createSubscription( + pubsubTopicToSingleShardInfo(autoshardingPubsubTopic2), + await nwaku2.getPeerId() + ); + await nwaku2.ensureSubscriptionsAutosharding([customContentTopic2]); + + const messageCollector2 = new MessageCollector(); + + await subscription2.subscribe([customDecoder2], messageCollector2.callback); + + // Making sure that messages are send and reveiced for both subscriptions + // While loop is done because of https://github.com/waku-org/js-waku/issues/1606 + while ( + !(await messageCollector.waitForMessagesAutosharding(1, { + contentTopic: customContentTopic1 + })) || + !(await messageCollector2.waitForMessagesAutosharding(1, { + contentTopic: customContentTopic2 + })) + ) { + await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") }); + await waku.lightPush.send(customEncoder2, { payload: utf8ToBytes("M2") }); + } + + messageCollector.verifyReceivedMessage(0, { + expectedContentTopic: customContentTopic1, + expectedPubsubTopic: autoshardingPubsubTopic1, + expectedMessageText: "M1" + }); + + messageCollector2.verifyReceivedMessage(0, { + expectedContentTopic: customContentTopic2, + expectedPubsubTopic: autoshardingPubsubTopic2, + expectedMessageText: "M2" + }); + }); + + it("Should fail to subscribe with decoder with wrong pubsubTopic", async function () { + // this subscription object is set up with the `customPubsubTopic` but we're passing it a Decoder with the `DefaultPubsubTopic` + try { + await subscription.subscribe([customDecoder2], messageCollector.callback); + } catch (error) { + expect((error as Error).message).to.include( + "Pubsub topic not configured" + ); + } + }); +}); diff --git a/packages/tests/tests/filter/ping.node.spec.ts b/packages/tests/tests/filter/ping.node.spec.ts index 2ec5cd8e1f..8a447f9360 100644 --- a/packages/tests/tests/filter/ping.node.spec.ts +++ b/packages/tests/tests/filter/ping.node.spec.ts @@ -1,4 +1,4 @@ -import { DefaultPubsubTopic } from "@waku/core"; +import { DefaultPubsubTopic } from "@waku/interfaces"; import type { IFilterSubscription, LightNode } from "@waku/interfaces"; import { utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; diff --git a/packages/tests/tests/filter/push.node.spec.ts b/packages/tests/tests/filter/push.node.spec.ts index d58b373934..db72aba606 100644 --- a/packages/tests/tests/filter/push.node.spec.ts +++ b/packages/tests/tests/filter/push.node.spec.ts @@ -1,5 +1,6 @@ -import { DefaultPubsubTopic, waitForRemotePeer } from "@waku/core"; +import { waitForRemotePeer } from "@waku/core"; import type { IFilterSubscription, LightNode } from "@waku/interfaces"; +import { DefaultPubsubTopic } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces"; import { utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; diff --git a/packages/tests/tests/filter/subscribe.node.spec.ts b/packages/tests/tests/filter/subscribe.node.spec.ts index 68b89e3d72..4e382a47e8 100644 --- a/packages/tests/tests/filter/subscribe.node.spec.ts +++ b/packages/tests/tests/filter/subscribe.node.spec.ts @@ -1,10 +1,6 @@ -import { - createDecoder, - createEncoder, - DefaultPubsubTopic, - waitForRemotePeer -} from "@waku/core"; +import { createDecoder, createEncoder, waitForRemotePeer } from "@waku/core"; import type { IFilterSubscription, LightNode } from "@waku/interfaces"; +import { DefaultPubsubTopic } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces"; import { ecies, diff --git a/packages/tests/tests/filter/unsubscribe.node.spec.ts b/packages/tests/tests/filter/unsubscribe.node.spec.ts index f2d4a405df..2ec1588b29 100644 --- a/packages/tests/tests/filter/unsubscribe.node.spec.ts +++ b/packages/tests/tests/filter/unsubscribe.node.spec.ts @@ -1,5 +1,6 @@ -import { createDecoder, createEncoder, DefaultPubsubTopic } from "@waku/core"; +import { createDecoder, createEncoder } from "@waku/core"; import type { IFilterSubscription, LightNode } from "@waku/interfaces"; +import { DefaultPubsubTopic } from "@waku/interfaces"; import { utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; diff --git a/packages/tests/tests/filter/utils.ts b/packages/tests/tests/filter/utils.ts index d9cb4f0135..2541184101 100644 --- a/packages/tests/tests/filter/utils.ts +++ b/packages/tests/tests/filter/utils.ts @@ -1,14 +1,10 @@ +import { createDecoder, createEncoder, waitForRemotePeer } from "@waku/core"; import { - createDecoder, - createEncoder, DefaultPubsubTopic, - waitForRemotePeer -} from "@waku/core"; -import { IFilterSubscription, LightNode, Protocols, - ShardInfo + ShardingParams } from "@waku/interfaces"; import { createLightNode } from "@waku/sdk"; import { Logger } from "@waku/utils"; @@ -50,7 +46,7 @@ export async function runNodes( context: Context, //TODO: change this to use `ShardInfo` instead of `string[]` pubsubTopics: string[], - shardInfo?: ShardInfo + shardInfo?: ShardingParams ): Promise<[NimGoNode, LightNode]> { const nwaku = new NimGoNode(makeLogFileName(context)); diff --git a/packages/tests/tests/light-push/index.node.spec.ts b/packages/tests/tests/light-push/index.node.spec.ts index 7e569cba47..cf00748fb2 100644 --- a/packages/tests/tests/light-push/index.node.spec.ts +++ b/packages/tests/tests/light-push/index.node.spec.ts @@ -1,5 +1,10 @@ -import { createEncoder, DefaultPubsubTopic } from "@waku/core"; -import { IRateLimitProof, LightNode, SendError } from "@waku/interfaces"; +import { createEncoder } from "@waku/core"; +import { + DefaultPubsubTopic, + IRateLimitProof, + LightNode, + SendError +} from "@waku/interfaces"; import { utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; diff --git a/packages/tests/tests/light-push/multiple_pubsub.node.spec.ts b/packages/tests/tests/light-push/multiple_pubsub.node.spec.ts index c598122ab4..32d7bad791 100644 --- a/packages/tests/tests/light-push/multiple_pubsub.node.spec.ts +++ b/packages/tests/tests/light-push/multiple_pubsub.node.spec.ts @@ -1,13 +1,17 @@ import type { PeerId } from "@libp2p/interface/peer-id"; import { createEncoder, waitForRemotePeer } from "@waku/core"; import { + ContentTopicInfo, LightNode, Protocols, SendResult, ShardInfo, SingleShardInfo } from "@waku/interfaces"; -import { singleShardInfoToPubsubTopic } from "@waku/utils"; +import { + contentTopicToPubsubTopic, + singleShardInfoToPubsubTopic +} from "@waku/utils"; import { utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; @@ -116,7 +120,7 @@ describe("Waku Light Push : Multiple PubsubTopics", function () { messageCollector2.verifyReceivedMessage(0, { expectedMessageText: "M2", expectedContentTopic: customContentTopic2, - expectedPubsubTopic: customPubsubTopic2 + expectedPubsubTopic: customPubsubTopic1 }); }); @@ -169,3 +173,161 @@ describe("Waku Light Push : Multiple PubsubTopics", function () { }); }); }); + +describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () { + this.timeout(30000); + let waku: LightNode; + let nwaku: NimGoNode; + let nwaku2: NimGoNode; + let messageCollector: MessageCollector; + + // When using lightpush, we have to use a cluster id of 1 because that is the default cluster id for autosharding + // With a different cluster id, we never find a viable peer + const clusterId = 1; + const customContentTopic1 = "/waku/2/content/utf8"; + const customContentTopic2 = "/myapp/1/latest/proto"; + const autoshardingPubsubTopic1 = contentTopicToPubsubTopic( + customContentTopic1, + clusterId + ); + const autoshardingPubsubTopic2 = contentTopicToPubsubTopic( + customContentTopic2, + clusterId + ); + const contentTopicInfo: ContentTopicInfo = { + clusterId, + contentTopics: [customContentTopic1, customContentTopic2] + }; + const customEncoder1 = createEncoder({ + contentTopic: customContentTopic1, + pubsubTopicShardInfo: { + clusterId + } + }); + const customEncoder2 = createEncoder({ + contentTopic: customContentTopic2, + pubsubTopicShardInfo: { clusterId } + }); + + let nimPeerId: PeerId; + + this.beforeEach(async function () { + this.timeout(15000); + [nwaku, waku] = await runNodes( + this, + [autoshardingPubsubTopic1, autoshardingPubsubTopic2], + contentTopicInfo + ); + messageCollector = new MessageCollector(nwaku); + nimPeerId = await nwaku.getPeerId(); + }); + + this.afterEach(async function () { + this.timeout(15000); + await tearDownNodes([nwaku, nwaku2], waku); + }); + + it("Push message on custom pubsubTopic", async function () { + const pushResponse = await waku.lightPush.send(customEncoder1, { + payload: utf8ToBytes(messageText) + }); + + expect(pushResponse.errors).to.be.empty; + expect(pushResponse.recipients[0].toString()).to.eq(nimPeerId.toString()); + + expect( + await messageCollector.waitForMessagesAutosharding(1, { + contentTopic: customContentTopic1 + }) + ).to.eq(true); + messageCollector.verifyReceivedMessage(0, { + expectedMessageText: messageText, + expectedContentTopic: customContentTopic1 + }); + }); + + it("Subscribe and receive messages on 2 different pubsubtopics", async function () { + const pushResponse1 = await waku.lightPush.send(customEncoder1, { + payload: utf8ToBytes("M1") + }); + const pushResponse2 = await waku.lightPush.send(customEncoder2, { + payload: utf8ToBytes("M2") + }); + expect(pushResponse1.recipients[0].toString()).to.eq(nimPeerId.toString()); + expect(pushResponse2.recipients[0].toString()).to.eq(nimPeerId.toString()); + + const messageCollector2 = new MessageCollector(nwaku); + + expect( + await messageCollector.waitForMessagesAutosharding(1, { + contentTopic: customContentTopic1 + }) + ).to.eq(true); + + expect( + await messageCollector2.waitForMessagesAutosharding(1, { + contentTopic: customContentTopic2 + }) + ).to.eq(true); + + messageCollector.verifyReceivedMessage(0, { + expectedMessageText: "M1", + expectedContentTopic: customContentTopic1, + expectedPubsubTopic: autoshardingPubsubTopic1 + }); + messageCollector2.verifyReceivedMessage(0, { + expectedMessageText: "M2", + expectedContentTopic: customContentTopic2, + expectedPubsubTopic: autoshardingPubsubTopic2 + }); + }); + + it("Light push messages to 2 nwaku nodes each with different pubsubtopics", async function () { + // Set up and start a new nwaku node with Default PubsubTopic + nwaku2 = new NimGoNode(makeLogFileName(this) + "2"); + await nwaku2.start({ + filter: true, + lightpush: true, + relay: true, + pubsubTopic: [autoshardingPubsubTopic2] + }); + await nwaku2.ensureSubscriptionsAutosharding([customContentTopic2]); + await waku.dial(await nwaku2.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.LightPush]); + + const messageCollector2 = new MessageCollector(nwaku2); + + let pushResponse1: SendResult; + let pushResponse2: SendResult; + // Making sure that we send messages to both nwaku nodes + // While loop is done because of https://github.com/waku-org/js-waku/issues/1606 + while ( + !(await messageCollector.waitForMessagesAutosharding(1, { + contentTopic: customContentTopic1 + })) || + !(await messageCollector2.waitForMessagesAutosharding(1, { + contentTopic: customContentTopic2 + })) || + pushResponse1!.recipients[0].toString() === + pushResponse2!.recipients[0].toString() + ) { + pushResponse1 = await waku.lightPush.send(customEncoder1, { + payload: utf8ToBytes("M1") + }); + pushResponse2 = await waku.lightPush.send(customEncoder2, { + payload: utf8ToBytes("M2") + }); + } + + messageCollector.verifyReceivedMessage(0, { + expectedMessageText: "M1", + expectedContentTopic: customContentTopic1, + expectedPubsubTopic: autoshardingPubsubTopic1 + }); + messageCollector2.verifyReceivedMessage(0, { + expectedMessageText: "M2", + expectedContentTopic: customContentTopic2, + expectedPubsubTopic: autoshardingPubsubTopic2 + }); + }); +}); diff --git a/packages/tests/tests/light-push/utils.ts b/packages/tests/tests/light-push/utils.ts index 4f30082c09..635d8e3989 100644 --- a/packages/tests/tests/light-push/utils.ts +++ b/packages/tests/tests/light-push/utils.ts @@ -1,9 +1,10 @@ +import { createEncoder, waitForRemotePeer } from "@waku/core"; import { - createEncoder, DefaultPubsubTopic, - waitForRemotePeer -} from "@waku/core"; -import { LightNode, Protocols, ShardInfo } from "@waku/interfaces"; + LightNode, + Protocols, + ShardingParams +} from "@waku/interfaces"; import { createLightNode, utf8ToBytes } from "@waku/sdk"; import { Logger } from "@waku/utils"; @@ -19,7 +20,7 @@ export const messagePayload = { payload: utf8ToBytes(messageText) }; export async function runNodes( context: Mocha.Context, pubsubTopics: string[], - shardInfo?: ShardInfo + shardInfo?: ShardingParams ): Promise<[NimGoNode, LightNode]> { const nwaku = new NimGoNode(makeLogFileName(context)); await nwaku.start( @@ -44,6 +45,13 @@ export async function runNodes( if (waku) { await waku.dial(await nwaku.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.LightPush]); + if ( + shardInfo && + "contentTopics" in shardInfo && + shardInfo.contentTopics.length > 0 + ) { + await nwaku.ensureSubscriptionsAutosharding(shardInfo.contentTopics); + } await nwaku.ensureSubscriptions(pubsubTopics); return [nwaku, waku]; } else { diff --git a/packages/tests/tests/relay/interop.node.spec.ts b/packages/tests/tests/relay/interop.node.spec.ts index ff6030b5a6..47cf2eeeba 100644 --- a/packages/tests/tests/relay/interop.node.spec.ts +++ b/packages/tests/tests/relay/interop.node.spec.ts @@ -1,11 +1,6 @@ import type { PeerId } from "@libp2p/interface/peer-id"; -import { - DecodedMessage, - DefaultPubsubTopic, - waitForRemotePeer -} from "@waku/core"; -import { RelayNode } from "@waku/interfaces"; -import { Protocols } from "@waku/interfaces"; +import { DecodedMessage, waitForRemotePeer } from "@waku/core"; +import { DefaultPubsubTopic, Protocols, RelayNode } from "@waku/interfaces"; import { createRelayNode } from "@waku/sdk"; import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; diff --git a/packages/tests/tests/relay/multiple_pubsub.node.spec.ts b/packages/tests/tests/relay/multiple_pubsub.node.spec.ts index a870f0cba2..f1cfaf886f 100644 --- a/packages/tests/tests/relay/multiple_pubsub.node.spec.ts +++ b/packages/tests/tests/relay/multiple_pubsub.node.spec.ts @@ -4,10 +4,18 @@ import { DecodedMessage, waitForRemotePeer } from "@waku/core"; -import { RelayNode, ShardInfo, SingleShardInfo } from "@waku/interfaces"; +import { + ContentTopicInfo, + RelayNode, + ShardInfo, + SingleShardInfo +} from "@waku/interfaces"; import { Protocols } from "@waku/interfaces"; import { createRelayNode } from "@waku/sdk"; -import { singleShardInfoToPubsubTopic } from "@waku/utils"; +import { + contentTopicToPubsubTopic, + singleShardInfoToPubsubTopic +} from "@waku/utils"; import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; @@ -305,3 +313,328 @@ describe("Waku Relay, multiple pubsub topics", function () { expect(waku2ReceivedMsg.pubsubTopic).to.eq(customPubsubTopic1); }); }); + +describe("Waku Relay (Autosharding), multiple pubsub topics", function () { + this.timeout(15000); + let waku1: RelayNode; + let waku2: RelayNode; + let waku3: RelayNode; + + const customContentTopic1 = "/waku/2/content/utf8"; + const customContentTopic2 = "/myapp/1/latest/proto"; + const autoshardingPubsubTopic1 = contentTopicToPubsubTopic( + customContentTopic1, + 3 + ); + const autoshardingPubsubTopic2 = contentTopicToPubsubTopic( + customContentTopic2, + 3 + ); + const contentTopicInfo1: ContentTopicInfo = { + clusterId: 3, + contentTopics: [customContentTopic1] + }; + const contentTopicInfo2: ContentTopicInfo = { + clusterId: 3, + contentTopics: [customContentTopic2] + }; + const customEncoder1 = createEncoder({ + contentTopic: customContentTopic1, + pubsubTopicShardInfo: { + clusterId: 3 + } + }); + const customDecoder1 = createDecoder(customContentTopic1, { clusterId: 3 }); + const customEncoder2 = createEncoder({ + contentTopic: customContentTopic2, + pubsubTopicShardInfo: { clusterId: 3 } + }); + const customDecoder2 = createDecoder(customContentTopic2, { clusterId: 3 }); + const contentTopicInfoBothShards: ContentTopicInfo = { + clusterId: 3, + contentTopics: [customContentTopic1, customContentTopic2] + }; + + afterEach(async function () { + this.timeout(15000); + await tearDownNodes([], [waku1, waku2, waku3]); + }); + + [ + { + pubsub: autoshardingPubsubTopic1, + shardInfo: contentTopicInfo1, + encoder: customEncoder1, + decoder: customDecoder1 + }, + { + pubsub: autoshardingPubsubTopic2, + shardInfo: contentTopicInfo2, + encoder: customEncoder2, + decoder: customDecoder2 + } + ].forEach((testItem) => { + it(`3 nodes on ${testItem.pubsub} topic`, async function () { + const [msgCollector1, msgCollector2, msgCollector3] = Array(3) + .fill(null) + .map(() => new MessageCollector()); + + [waku1, waku2, waku3] = await Promise.all([ + createRelayNode({ + shardInfo: testItem.shardInfo, + staticNoiseKey: NOISE_KEY_1 + }).then((waku) => waku.start().then(() => waku)), + createRelayNode({ + shardInfo: testItem.shardInfo, + staticNoiseKey: NOISE_KEY_2, + libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } } + }).then((waku) => waku.start().then(() => waku)), + createRelayNode({ + shardInfo: testItem.shardInfo, + staticNoiseKey: NOISE_KEY_3 + }).then((waku) => waku.start().then(() => waku)) + ]); + + await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, { + multiaddrs: waku2.libp2p.getMultiaddrs() + }); + await waku3.libp2p.peerStore.merge(waku2.libp2p.peerId, { + multiaddrs: waku2.libp2p.getMultiaddrs() + }); + await Promise.all([ + waku1.dial(waku2.libp2p.peerId), + waku3.dial(waku2.libp2p.peerId) + ]); + + await Promise.all([ + waitForRemotePeer(waku1, [Protocols.Relay]), + waitForRemotePeer(waku2, [Protocols.Relay]), + waitForRemotePeer(waku3, [Protocols.Relay]) + ]); + + await waku1.relay.subscribe([testItem.decoder], msgCollector1.callback); + await waku2.relay.subscribe([testItem.decoder], msgCollector2.callback); + await waku3.relay.subscribe([testItem.decoder], msgCollector3.callback); + + // The nodes are setup in such a way that all messages send should be relayed to the other nodes in the network + const relayResponse1 = await waku1.relay.send(testItem.encoder, { + payload: utf8ToBytes("M1") + }); + const relayResponse2 = await waku2.relay.send(testItem.encoder, { + payload: utf8ToBytes("M2") + }); + const relayResponse3 = await waku3.relay.send(testItem.encoder, { + payload: utf8ToBytes("M3") + }); + + expect(relayResponse1.recipients[0].toString()).to.eq( + waku2.libp2p.peerId.toString() + ); + expect(relayResponse3.recipients[0].toString()).to.eq( + waku2.libp2p.peerId.toString() + ); + expect(relayResponse2.recipients.map((r) => r.toString())).to.include( + waku1.libp2p.peerId.toString() + ); + expect(relayResponse2.recipients.map((r) => r.toString())).to.include( + waku3.libp2p.peerId.toString() + ); + + expect( + await msgCollector1.waitForMessagesAutosharding(2, { + contentTopic: testItem.encoder.contentTopic, + exact: true + }) + ).to.eq(true); + expect( + await msgCollector2.waitForMessagesAutosharding(2, { + contentTopic: testItem.encoder.contentTopic, + exact: true + }) + ).to.eq(true); + expect( + await msgCollector3.waitForMessagesAutosharding(2, { + contentTopic: testItem.encoder.contentTopic, + exact: true + }) + ).to.eq(true); + + expect( + msgCollector1.hasMessage(testItem.encoder.contentTopic, "M2") + ).to.eq(true); + expect( + msgCollector1.hasMessage(testItem.encoder.contentTopic, "M3") + ).to.eq(true); + expect( + msgCollector2.hasMessage(testItem.encoder.contentTopic, "M1") + ).to.eq(true); + expect( + msgCollector2.hasMessage(testItem.encoder.contentTopic, "M3") + ).to.eq(true); + expect( + msgCollector3.hasMessage(testItem.encoder.contentTopic, "M1") + ).to.eq(true); + expect( + msgCollector3.hasMessage(testItem.encoder.contentTopic, "M2") + ).to.eq(true); + }); + }); + + it("Nodes with multiple pubsub topic", async function () { + const [msgCollector1, msgCollector2, msgCollector3] = Array(3) + .fill(null) + .map(() => new MessageCollector()); + + // Waku1 and waku2 are using multiple pubsub topis + [waku1, waku2, waku3] = await Promise.all([ + createRelayNode({ + shardInfo: contentTopicInfoBothShards, + staticNoiseKey: NOISE_KEY_1 + }).then((waku) => waku.start().then(() => waku)), + createRelayNode({ + shardInfo: contentTopicInfoBothShards, + staticNoiseKey: NOISE_KEY_2, + libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } } + }).then((waku) => waku.start().then(() => waku)), + createRelayNode({ + shardInfo: contentTopicInfo1, + staticNoiseKey: NOISE_KEY_3 + }).then((waku) => waku.start().then(() => waku)) + ]); + + await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, { + multiaddrs: waku2.libp2p.getMultiaddrs() + }); + await waku3.libp2p.peerStore.merge(waku2.libp2p.peerId, { + multiaddrs: waku2.libp2p.getMultiaddrs() + }); + await Promise.all([ + waku1.dial(waku2.libp2p.peerId), + waku3.dial(waku2.libp2p.peerId) + ]); + + await Promise.all([ + waitForRemotePeer(waku1, [Protocols.Relay]), + waitForRemotePeer(waku2, [Protocols.Relay]), + waitForRemotePeer(waku3, [Protocols.Relay]) + ]); + + await waku1.relay.subscribe( + [customDecoder1, customDecoder2], + msgCollector1.callback + ); + await waku2.relay.subscribe( + [customDecoder1, customDecoder2], + msgCollector2.callback + ); + await waku3.relay.subscribe([customDecoder1], msgCollector3.callback); + + // The nodes are setup in such a way that all messages send should be relayed to the other nodes in the network + // However onlt waku1 and waku2 are receiving messages on the CustomPubSubTopic + await waku1.relay.send(customEncoder1, { payload: utf8ToBytes("M1") }); + await waku1.relay.send(customEncoder2, { payload: utf8ToBytes("M2") }); + await waku2.relay.send(customEncoder1, { payload: utf8ToBytes("M3") }); + await waku2.relay.send(customEncoder2, { payload: utf8ToBytes("M4") }); + await waku3.relay.send(customEncoder1, { payload: utf8ToBytes("M5") }); + await waku3.relay.send(customEncoder2, { payload: utf8ToBytes("M6") }); + + expect( + await msgCollector1.waitForMessagesAutosharding(3, { + contentTopic: customContentTopic1, + exact: true + }) + ).to.eq(true); + expect( + await msgCollector1.waitForMessagesAutosharding(3, { + contentTopic: customContentTopic2, + exact: true + }) + ).to.eq(true); + expect( + await msgCollector2.waitForMessagesAutosharding(3, { + contentTopic: customContentTopic1, + exact: true + }) + ).to.eq(true); + expect( + await msgCollector2.waitForMessagesAutosharding(3, { + contentTopic: customContentTopic2, + exact: true + }) + ).to.eq(true); + expect( + await msgCollector3.waitForMessagesAutosharding(2, { + contentTopic: customContentTopic1, + exact: true + }) + ).to.eq(true); + expect(msgCollector1.hasMessage(customContentTopic1, "M3")).to.eq(true); + expect(msgCollector1.hasMessage(customContentTopic2, "M4")).to.eq(true); + expect(msgCollector1.hasMessage(customContentTopic1, "M5")).to.eq(true); + expect(msgCollector2.hasMessage(customContentTopic1, "M1")).to.eq(true); + expect(msgCollector2.hasMessage(customContentTopic2, "M2")).to.eq(true); + expect(msgCollector2.hasMessage(customContentTopic1, "M5")).to.eq(true); + expect(msgCollector3.hasMessage(customContentTopic1, "M1")).to.eq(true); + expect(msgCollector3.hasMessage(customContentTopic1, "M3")).to.eq(true); + }); + + it("n1 and n2 uses a custom pubsub, n3 uses the default pubsub", async function () { + [waku1, waku2, waku3] = await Promise.all([ + createRelayNode({ + shardInfo: contentTopicInfo1, + staticNoiseKey: NOISE_KEY_1 + }).then((waku) => waku.start().then(() => waku)), + createRelayNode({ + shardInfo: contentTopicInfo1, + staticNoiseKey: NOISE_KEY_2, + libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } } + }).then((waku) => waku.start().then(() => waku)), + createRelayNode({ + staticNoiseKey: NOISE_KEY_3 + }).then((waku) => waku.start().then(() => waku)) + ]); + + await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, { + multiaddrs: waku2.libp2p.getMultiaddrs() + }); + await waku3.libp2p.peerStore.merge(waku2.libp2p.peerId, { + multiaddrs: waku2.libp2p.getMultiaddrs() + }); + await Promise.all([ + waku1.dial(waku2.libp2p.peerId), + waku3.dial(waku2.libp2p.peerId) + ]); + + await Promise.all([ + waitForRemotePeer(waku1, [Protocols.Relay]), + waitForRemotePeer(waku2, [Protocols.Relay]) + ]); + + const messageText = "Communicating using a custom pubsub topic"; + + const waku2ReceivedMsgPromise: Promise = new Promise( + (resolve) => { + void waku2.relay.subscribe([customDecoder1], resolve); + } + ); + + // The promise **fails** if we receive a message on the default + // pubsub topic. + const waku3NoMsgPromise: Promise = new Promise( + (resolve, reject) => { + void waku3.relay.subscribe([TestDecoder], reject); + setTimeout(resolve, 1000); + } + ); + + await waku1.relay.send(customEncoder1, { + payload: utf8ToBytes(messageText) + }); + + const waku2ReceivedMsg = await waku2ReceivedMsgPromise; + await waku3NoMsgPromise; + + expect(bytesToUtf8(waku2ReceivedMsg.payload!)).to.eq(messageText); + expect(waku2ReceivedMsg.pubsubTopic).to.eq(autoshardingPubsubTopic1); + }); +}); diff --git a/packages/tests/tests/relay/subscribe.node.spec.ts b/packages/tests/tests/relay/subscribe.node.spec.ts index d2c5fd59b9..bb67659cf3 100644 --- a/packages/tests/tests/relay/subscribe.node.spec.ts +++ b/packages/tests/tests/relay/subscribe.node.spec.ts @@ -1,5 +1,5 @@ -import { createDecoder, createEncoder, DefaultPubsubTopic } from "@waku/core"; -import { RelayNode } from "@waku/interfaces"; +import { createDecoder, createEncoder } from "@waku/core"; +import { DefaultPubsubTopic, RelayNode } from "@waku/interfaces"; import { createRelayNode } from "@waku/sdk"; import { utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; diff --git a/packages/tests/tests/sharding/peer_management.spec.ts b/packages/tests/tests/sharding/peer_management.spec.ts index 833795eb18..fcaacf097b 100644 --- a/packages/tests/tests/sharding/peer_management.spec.ts +++ b/packages/tests/tests/sharding/peer_management.spec.ts @@ -1,8 +1,17 @@ import { bootstrap } from "@libp2p/bootstrap"; import type { PeerId } from "@libp2p/interface/peer-id"; import { wakuPeerExchangeDiscovery } from "@waku/peer-exchange"; -import { createLightNode, LightNode, ShardInfo, Tags } from "@waku/sdk"; -import { singleShardInfoToPubsubTopic } from "@waku/utils"; +import { + ContentTopicInfo, + createLightNode, + LightNode, + ShardInfo, + Tags +} from "@waku/sdk"; +import { + contentTopicToPubsubTopic, + singleShardInfoToPubsubTopic +} from "@waku/utils"; import chai, { expect } from "chai"; import chaiAsPromised from "chai-as-promised"; import Sinon, { SinonSpy } from "sinon"; @@ -184,3 +193,176 @@ describe("Static Sharding: Peer Management", function () { }); }); }); + +describe("Autosharding: Peer Management", function () { + const ContentTopic = "/waku/2/content/test.js"; + + describe("Peer Exchange", function () { + let waku: LightNode; + let nwaku1: NimGoNode; + let nwaku2: NimGoNode; + let nwaku3: NimGoNode; + + let dialPeerSpy: SinonSpy; + + beforeEach(async function () { + this.timeout(15000); + nwaku1 = new NimGoNode(makeLogFileName(this) + "1_auto"); + nwaku2 = new NimGoNode(makeLogFileName(this) + "2_auto"); + nwaku3 = new NimGoNode(makeLogFileName(this) + "3_auto"); + }); + + afterEach(async function () { + this.timeout(15000); + await tearDownNodes([nwaku1, nwaku2, nwaku3], waku); + dialPeerSpy && dialPeerSpy.restore(); + }); + + it("all px service nodes subscribed to the shard topic should be dialed", async function () { + this.timeout(100_000); + + const pubsubTopics = [contentTopicToPubsubTopic(ContentTopic, 1)]; + const contentTopicInfo: ContentTopicInfo = { + clusterId: 1, + contentTopics: [ContentTopic] + }; + + await nwaku1.start({ + pubsubTopic: pubsubTopics, + discv5Discovery: true, + peerExchange: true, + relay: true + }); + + const enr1 = (await nwaku1.info()).enrUri; + + await nwaku2.start({ + pubsubTopic: pubsubTopics, + discv5Discovery: true, + peerExchange: true, + discv5BootstrapNode: enr1, + relay: true + }); + + const enr2 = (await nwaku2.info()).enrUri; + + await nwaku3.start({ + pubsubTopic: pubsubTopics, + discv5Discovery: true, + peerExchange: true, + discv5BootstrapNode: enr2, + relay: true + }); + const nwaku3Ma = await nwaku3.getMultiaddrWithId(); + + waku = await createLightNode({ + shardInfo: contentTopicInfo, + libp2p: { + peerDiscovery: [ + bootstrap({ list: [nwaku3Ma.toString()] }), + wakuPeerExchangeDiscovery() + ] + } + }); + + await waku.start(); + + dialPeerSpy = Sinon.spy((waku as any).connectionManager, "dialPeer"); + + const pxPeersDiscovered = new Set(); + + await new Promise((resolve) => { + waku.libp2p.addEventListener("peer:discovery", (evt) => { + return void (async () => { + const peerId = evt.detail.id; + const peer = await waku.libp2p.peerStore.get(peerId); + const tags = Array.from(peer.tags.keys()); + if (tags.includes(Tags.PEER_EXCHANGE)) { + pxPeersDiscovered.add(peerId); + if (pxPeersDiscovered.size === 2) { + resolve(); + } + } + })(); + }); + }); + + await delay(1000); + + expect(dialPeerSpy.callCount).to.equal(3); + }); + + it("px service nodes not subscribed to the shard should not be dialed", async function () { + this.timeout(100_000); + const pubsubTopicsToDial = [contentTopicToPubsubTopic(ContentTopic, 1)]; + const contentTopicInfoToDial: ContentTopicInfo = { + clusterId: 1, + contentTopics: [ContentTopic] + }; + const pubsubTopicsToIgnore = [contentTopicToPubsubTopic(ContentTopic, 2)]; + + // this service node is not subscribed to the shard + await nwaku1.start({ + pubsubTopic: pubsubTopicsToIgnore, + relay: true, + discv5Discovery: true, + peerExchange: true + }); + + const enr1 = (await nwaku1.info()).enrUri; + + await nwaku2.start({ + pubsubTopic: pubsubTopicsToDial, + relay: true, + discv5Discovery: true, + peerExchange: true, + discv5BootstrapNode: enr1 + }); + + const enr2 = (await nwaku2.info()).enrUri; + + await nwaku3.start({ + relay: true, + discv5Discovery: true, + peerExchange: true, + discv5BootstrapNode: enr2 + }); + const nwaku3Ma = await nwaku3.getMultiaddrWithId(); + + waku = await createLightNode({ + shardInfo: contentTopicInfoToDial, + libp2p: { + peerDiscovery: [ + bootstrap({ list: [nwaku3Ma.toString()] }), + wakuPeerExchangeDiscovery() + ] + } + }); + + dialPeerSpy = Sinon.spy((waku as any).connectionManager, "dialPeer"); + + await waku.start(); + + const pxPeersDiscovered = new Set(); + + await new Promise((resolve) => { + waku.libp2p.addEventListener("peer:discovery", (evt) => { + return void (async () => { + const peerId = evt.detail.id; + const peer = await waku.libp2p.peerStore.get(peerId); + const tags = Array.from(peer.tags.keys()); + if (tags.includes(Tags.PEER_EXCHANGE)) { + pxPeersDiscovered.add(peerId); + if (pxPeersDiscovered.size === 1) { + resolve(); + } + } + })(); + }); + }); + + await delay(1000); + expect(dialPeerSpy.callCount).to.equal(2); + }); + }); +}); diff --git a/packages/tests/tests/sharding/running_nodes.spec.ts b/packages/tests/tests/sharding/running_nodes.spec.ts index 043c0db423..41cbaefa86 100644 --- a/packages/tests/tests/sharding/running_nodes.spec.ts +++ b/packages/tests/tests/sharding/running_nodes.spec.ts @@ -20,6 +20,7 @@ const shardInfoBothShards: ShardInfo = { clusterId: 0, shards: [2, 3] }; const singleShardInfo1: SingleShardInfo = { clusterId: 0, shard: 2 }; const singleShardInfo2: SingleShardInfo = { clusterId: 0, shard: 3 }; const ContentTopic = "/waku/2/content/test.js"; +const ContentTopic2 = "/myapp/1/latest/proto"; describe("Static Sharding: Running Nodes", () => { let waku: LightNode; @@ -93,3 +94,51 @@ describe("Static Sharding: Running Nodes", () => { } }); }); + +describe("Autosharding: Running Nodes", () => { + let waku: LightNode; + let nwaku: NimGoNode; + + beforeEach(async function () { + this.timeout(15_000); + nwaku = new NimGoNode(makeLogFileName(this)); + await nwaku.start({ store: true, lightpush: true, relay: true }); + }); + + afterEach(async function () { + this.timeout(15000); + await tearDownNodes(nwaku, waku); + }); + + it("configure the node with multiple pubsub topics", async function () { + this.timeout(15_000); + waku = await createLightNode({ + shardInfo: { + ...shardInfoBothShards, + // For autosharding, we configure multiple pubsub topics by using two content topics that hash to different shards + contentTopics: [ContentTopic, ContentTopic2] + } + }); + + const encoder1 = createEncoder({ + contentTopic: ContentTopic, + pubsubTopicShardInfo: { clusterId: 0 } + }); + + const encoder2 = createEncoder({ + contentTopic: ContentTopic, + pubsubTopicShardInfo: { clusterId: 0 } + }); + + const request1 = await waku.lightPush.send(encoder1, { + payload: utf8ToBytes("Hello World") + }); + + const request2 = await waku.lightPush.send(encoder2, { + payload: utf8ToBytes("Hello World") + }); + + expect(request1.recipients.length).to.eq(0); + expect(request2.recipients.length).to.eq(0); + }); +}); diff --git a/packages/tests/tests/store/cursor.node.spec.ts b/packages/tests/tests/store/cursor.node.spec.ts index cddf6602af..55bfe97d2a 100644 --- a/packages/tests/tests/store/cursor.node.spec.ts +++ b/packages/tests/tests/store/cursor.node.spec.ts @@ -1,5 +1,6 @@ -import { createCursor, DecodedMessage, DefaultPubsubTopic } from "@waku/core"; +import { createCursor, DecodedMessage } from "@waku/core"; import type { LightNode } from "@waku/interfaces"; +import { DefaultPubsubTopic } from "@waku/interfaces"; import { bytesToUtf8 } from "@waku/utils/bytes"; import { expect } from "chai"; diff --git a/packages/tests/tests/store/error_handling.node.spec.ts b/packages/tests/tests/store/error_handling.node.spec.ts index e18f9bcc8f..148f76a6a8 100644 --- a/packages/tests/tests/store/error_handling.node.spec.ts +++ b/packages/tests/tests/store/error_handling.node.spec.ts @@ -1,4 +1,4 @@ -import { DefaultPubsubTopic } from "@waku/core"; +import { DefaultPubsubTopic } from "@waku/interfaces"; import { IMessage, type LightNode } from "@waku/interfaces"; import { expect } from "chai"; diff --git a/packages/tests/tests/store/index.node.spec.ts b/packages/tests/tests/store/index.node.spec.ts index c2347e54e0..c5e2c8e127 100644 --- a/packages/tests/tests/store/index.node.spec.ts +++ b/packages/tests/tests/store/index.node.spec.ts @@ -1,11 +1,6 @@ -import { - createDecoder, - DecodedMessage, - DefaultPubsubTopic, - waitForRemotePeer -} from "@waku/core"; +import { createDecoder, DecodedMessage, waitForRemotePeer } from "@waku/core"; import type { IMessage, LightNode } from "@waku/interfaces"; -import { Protocols } from "@waku/interfaces"; +import { DefaultPubsubTopic, Protocols } from "@waku/interfaces"; import { generatePrivateKey, generateSymmetricKey, diff --git a/packages/tests/tests/store/multiple_pubsub.spec.ts b/packages/tests/tests/store/multiple_pubsub.spec.ts index d476ebf0b3..e8c3fc9665 100644 --- a/packages/tests/tests/store/multiple_pubsub.spec.ts +++ b/packages/tests/tests/store/multiple_pubsub.spec.ts @@ -1,6 +1,7 @@ -import { waitForRemotePeer } from "@waku/core"; -import type { IMessage, LightNode } from "@waku/interfaces"; +import { createDecoder, waitForRemotePeer } from "@waku/core"; +import type { ContentTopicInfo, IMessage, LightNode } from "@waku/interfaces"; import { createLightNode, Protocols } from "@waku/sdk"; +import { contentTopicToPubsubTopic } from "@waku/utils"; import { expect } from "chai"; import { @@ -19,6 +20,7 @@ import { customShardedPubsubTopic2, processQueriedMessages, sendMessages, + sendMessagesAutosharding, shardInfo1, shardInfoBothShards, startAndConnectLightNode, @@ -169,3 +171,153 @@ describe("Waku Store, custom pubsub topic", function () { } }); }); + +describe("Waku Store (Autosharding), custom pubsub topic", function () { + this.timeout(15000); + let waku: LightNode; + let nwaku: NimGoNode; + let nwaku2: NimGoNode; + + const customContentTopic1 = "/waku/2/content/utf8"; + const customContentTopic2 = "/myapp/1/latest/proto"; + const clusterId = 1; + const autoshardingPubsubTopic1 = contentTopicToPubsubTopic( + customContentTopic1, + clusterId + ); + const autoshardingPubsubTopic2 = contentTopicToPubsubTopic( + customContentTopic2, + clusterId + ); + const contentTopicInfo1: ContentTopicInfo = { + clusterId, + contentTopics: [customContentTopic1] + }; + const customDecoder1 = createDecoder(customContentTopic1, { + clusterId + }); + const customDecoder2 = createDecoder(customContentTopic2, { + clusterId + }); + const contentTopicInfoBothShards: ContentTopicInfo = { + clusterId, + contentTopics: [customContentTopic1, customContentTopic2] + }; + + beforeEach(async function () { + this.timeout(15000); + nwaku = new NimGoNode(makeLogFileName(this)); + await nwaku.start({ + store: true, + pubsubTopic: [autoshardingPubsubTopic1, autoshardingPubsubTopic2], + relay: true + }); + await nwaku.ensureSubscriptionsAutosharding([ + customContentTopic1, + customContentTopic2 + ]); + }); + + afterEach(async function () { + this.timeout(15000); + await tearDownNodes([nwaku, nwaku2], waku); + }); + + it("Generator, custom pubsub topic", async function () { + await sendMessagesAutosharding(nwaku, totalMsgs, customContentTopic1); + waku = await startAndConnectLightNode(nwaku, [], contentTopicInfo1); + const messages = await processQueriedMessages( + waku, + [customDecoder1], + autoshardingPubsubTopic1 + ); + + expect(messages?.length).eq(totalMsgs); + const result = messages?.findIndex((msg) => { + return msg.payload![0]! === 0; + }); + expect(result).to.not.eq(-1); + }); + + it("Generator, 2 different pubsubtopics", async function () { + this.timeout(10000); + + const totalMsgs = 10; + await sendMessagesAutosharding(nwaku, totalMsgs, customContentTopic1); + await sendMessagesAutosharding(nwaku, totalMsgs, customContentTopic2); + + waku = await startAndConnectLightNode( + nwaku, + [], + contentTopicInfoBothShards + ); + + const customMessages = await processQueriedMessages( + waku, + [customDecoder1], + autoshardingPubsubTopic1 + ); + expect(customMessages?.length).eq(totalMsgs); + const result1 = customMessages?.findIndex((msg) => { + return msg.payload![0]! === 0; + }); + expect(result1).to.not.eq(-1); + + const testMessages = await processQueriedMessages( + waku, + [customDecoder2], + autoshardingPubsubTopic2 + ); + expect(testMessages?.length).eq(totalMsgs); + const result2 = testMessages?.findIndex((msg) => { + return msg.payload![0]! === 0; + }); + expect(result2).to.not.eq(-1); + }); + + it("Generator, 2 nwaku nodes each with different pubsubtopics", async function () { + this.timeout(10000); + + // Set up and start a new nwaku node with Default Pubsubtopic + nwaku2 = new NimGoNode(makeLogFileName(this) + "2"); + await nwaku2.start({ + store: true, + pubsubTopic: [autoshardingPubsubTopic2], + relay: true + }); + await nwaku2.ensureSubscriptionsAutosharding([customContentTopic2]); + + const totalMsgs = 10; + await sendMessagesAutosharding(nwaku, totalMsgs, customContentTopic1); + await sendMessagesAutosharding(nwaku2, totalMsgs, customContentTopic2); + + waku = await createLightNode({ + staticNoiseKey: NOISE_KEY_1, + shardInfo: contentTopicInfoBothShards + }); + await waku.start(); + + await waku.dial(await nwaku.getMultiaddrWithId()); + await waku.dial(await nwaku2.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.Store]); + + let customMessages: IMessage[] = []; + let testMessages: IMessage[] = []; + + while ( + customMessages.length != totalMsgs || + testMessages.length != totalMsgs + ) { + customMessages = await processQueriedMessages( + waku, + [customDecoder1], + autoshardingPubsubTopic1 + ); + testMessages = await processQueriedMessages( + waku, + [customDecoder2], + autoshardingPubsubTopic2 + ); + } + }); +}); diff --git a/packages/tests/tests/store/order.node.spec.ts b/packages/tests/tests/store/order.node.spec.ts index f4836f51db..9701807a05 100644 --- a/packages/tests/tests/store/order.node.spec.ts +++ b/packages/tests/tests/store/order.node.spec.ts @@ -1,5 +1,6 @@ -import { DecodedMessage, DefaultPubsubTopic, PageDirection } from "@waku/core"; +import { DecodedMessage, PageDirection } from "@waku/core"; import type { IMessage, LightNode } from "@waku/interfaces"; +import { DefaultPubsubTopic } from "@waku/interfaces"; import { expect } from "chai"; import { makeLogFileName, NimGoNode, tearDownNodes } from "../../src/index.js"; diff --git a/packages/tests/tests/store/page_size.node.spec.ts b/packages/tests/tests/store/page_size.node.spec.ts index 60092fd1f4..c4bfee0757 100644 --- a/packages/tests/tests/store/page_size.node.spec.ts +++ b/packages/tests/tests/store/page_size.node.spec.ts @@ -1,4 +1,4 @@ -import { DefaultPubsubTopic } from "@waku/core"; +import { DefaultPubsubTopic } from "@waku/interfaces"; import type { LightNode } from "@waku/interfaces"; import { expect } from "chai"; diff --git a/packages/tests/tests/store/sorting.node.spec.ts b/packages/tests/tests/store/sorting.node.spec.ts index a403febf69..7fa3ca6410 100644 --- a/packages/tests/tests/store/sorting.node.spec.ts +++ b/packages/tests/tests/store/sorting.node.spec.ts @@ -1,5 +1,6 @@ -import { DecodedMessage, DefaultPubsubTopic, PageDirection } from "@waku/core"; +import { DecodedMessage, PageDirection } from "@waku/core"; import type { IMessage, LightNode } from "@waku/interfaces"; +import { DefaultPubsubTopic } from "@waku/interfaces"; import { makeLogFileName, NimGoNode, tearDownNodes } from "../../src/index.js"; diff --git a/packages/tests/tests/store/utils.ts b/packages/tests/tests/store/utils.ts index 4a52290f66..76ce57f09f 100644 --- a/packages/tests/tests/store/utils.ts +++ b/packages/tests/tests/store/utils.ts @@ -3,10 +3,15 @@ import { createEncoder, DecodedMessage, Decoder, - DefaultPubsubTopic, waitForRemotePeer } from "@waku/core"; -import { LightNode, Protocols, ShardInfo } from "@waku/interfaces"; +import { + DefaultPubsubTopic, + LightNode, + Protocols, + ShardInfo, + ShardingParams +} from "@waku/interfaces"; import { createLightNode } from "@waku/sdk"; import { Logger, singleShardInfoToPubsubTopic } from "@waku/utils"; import { expect } from "chai"; @@ -61,6 +66,24 @@ export async function sendMessages( } } +export async function sendMessagesAutosharding( + instance: NimGoNode, + numMessages: number, + contentTopic: string +): Promise { + for (let i = 0; i < numMessages; i++) { + expect( + await instance.sendMessageAutosharding( + NimGoNode.toMessageRpcQuery({ + payload: new Uint8Array([i]), + contentTopic: contentTopic + }) + ) + ).to.eq(true); + await delay(1); // to ensure each timestamp is unique. + } +} + export async function processQueriedMessages( instance: LightNode, decoders: Array, @@ -81,7 +104,7 @@ export async function processQueriedMessages( export async function startAndConnectLightNode( instance: NimGoNode, pubsubTopics: string[] = [DefaultPubsubTopic], - shardInfo?: ShardInfo + shardInfo?: ShardingParams ): Promise { const waku = await createLightNode({ ...((pubsubTopics.length !== 1 || diff --git a/packages/tests/tests/utils.spec.ts b/packages/tests/tests/utils.spec.ts index eab0bb35d4..ecb1f02bf5 100644 --- a/packages/tests/tests/utils.spec.ts +++ b/packages/tests/tests/utils.spec.ts @@ -1,14 +1,8 @@ import type { PeerStore } from "@libp2p/interface/peer-store"; import type { Peer } from "@libp2p/interface/peer-store"; import { createSecp256k1PeerId } from "@libp2p/peer-id-factory"; -import { - createDecoder, - createEncoder, - DefaultPubsubTopic, - waitForRemotePeer -} from "@waku/core"; -import { LightNode } from "@waku/interfaces"; -import { Protocols } from "@waku/interfaces"; +import { createDecoder, createEncoder, waitForRemotePeer } from "@waku/core"; +import { DefaultPubsubTopic, LightNode, Protocols } from "@waku/interfaces"; import { createLightNode } from "@waku/sdk"; import { toAsyncIterator } from "@waku/utils"; import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes"; diff --git a/packages/tests/tests/wait_for_remote_peer.node.spec.ts b/packages/tests/tests/wait_for_remote_peer.node.spec.ts index cd33cb78fb..586d2574c0 100644 --- a/packages/tests/tests/wait_for_remote_peer.node.spec.ts +++ b/packages/tests/tests/wait_for_remote_peer.node.spec.ts @@ -1,6 +1,6 @@ -import { DefaultPubsubTopic, waitForRemotePeer } from "@waku/core"; +import { waitForRemotePeer } from "@waku/core"; import type { LightNode, RelayNode } from "@waku/interfaces"; -import { Protocols } from "@waku/interfaces"; +import { DefaultPubsubTopic, Protocols } from "@waku/interfaces"; import { createLightNode, createRelayNode } from "@waku/sdk"; import { expect } from "chai"; diff --git a/packages/utils/package.json b/packages/utils/package.json index 7a22787da9..3af685e546 100644 --- a/packages/utils/package.json +++ b/packages/utils/package.json @@ -68,6 +68,7 @@ }, "dependencies": { "@noble/hashes": "^1.3.2", + "@waku/interfaces": "0.0.20", "chai": "^4.3.10", "debug": "^4.3.4", "uint8arrays": "^4.0.4" @@ -77,7 +78,6 @@ "@rollup/plugin-json": "^6.0.0", "@rollup/plugin-node-resolve": "^15.2.3", "@waku/build-utils": "*", - "@waku/interfaces": "0.0.20", "cspell": "^7.3.2", "npm-run-all": "^4.1.5", "rollup": "^4.6.0" diff --git a/packages/utils/src/common/sharding.spec.ts b/packages/utils/src/common/sharding.spec.ts index fd206dd788..06484615ac 100644 --- a/packages/utils/src/common/sharding.spec.ts +++ b/packages/utils/src/common/sharding.spec.ts @@ -1,6 +1,11 @@ import { expect } from "chai"; -import { contentTopicToShardIndex, ensureValidContentTopic } from "./sharding"; +import { + contentTopicsByPubsubTopic, + contentTopicToPubsubTopic, + contentTopicToShardIndex, + ensureValidContentTopic +} from "./sharding"; const testInvalidCases = ( contentTopics: string[], @@ -91,10 +96,35 @@ describe("contentTopicToShardIndex", () => { it("converts content topics to expected shard index", () => { const contentTopics: [string, number][] = [ ["/toychat/2/huilong/proto", 3], - ["/myapp/1/latest/proto", 0] + ["/myapp/1/latest/proto", 0], + ["/waku/2/content/test.js", 1] ]; for (const [topic, shard] of contentTopics) { expect(contentTopicToShardIndex(topic)).to.eq(shard); } }); + + it("topics with same application and version share the same shard", () => { + const contentTopics: [string, string][] = [ + ["/toychat/2/huilong/proto", "/toychat/2/othertopic/otherencoding"], + ["/myapp/1/latest/proto", "/myapp/1/new/proto"], + ["/waku/2/content/test.js", "/waku/2/users/proto"] + ]; + for (const [topic1, topic2] of contentTopics) { + expect(contentTopicToShardIndex(topic1)).to.eq( + contentTopicToShardIndex(topic2) + ); + } + }); +}); + +describe("contentTopicsByPubsubTopic", () => { + it("groups content topics by expected pubsub topic", () => { + const contentTopics = ["/toychat/2/huilong/proto", "/myapp/1/latest/proto"]; + const grouped = contentTopicsByPubsubTopic(contentTopics); + for (const contentTopic of contentTopics) { + const pubsubTopic = contentTopicToPubsubTopic(contentTopic); + expect(grouped.get(pubsubTopic)?.includes(contentTopic)).to.be.true; + } + }); }); diff --git a/packages/utils/src/common/sharding.ts b/packages/utils/src/common/sharding.ts index 648cb72b17..f4a8afb71b 100644 --- a/packages/utils/src/common/sharding.ts +++ b/packages/utils/src/common/sharding.ts @@ -1,5 +1,10 @@ import { sha256 } from "@noble/hashes/sha256"; -import type { PubsubTopic, ShardInfo, SingleShardInfo } from "@waku/interfaces"; +import { + DefaultPubsubTopic, + PubsubTopic, + ShardingParams, + SingleShardInfo +} from "@waku/interfaces"; import { concat, utf8ToBytes } from "../bytes/index.js"; @@ -13,14 +18,20 @@ export const singleShardInfoToPubsubTopic = ( }; export const shardInfoToPubsubTopics = ( - shardInfo: ShardInfo + shardInfo: ShardingParams ): PubsubTopic[] => { - if (shardInfo.clusterId === undefined || shardInfo.shards === undefined) - throw new Error("Invalid shard"); - - return shardInfo.shards.map( - (index) => `/waku/2/rs/${shardInfo.clusterId}/${index}` - ); + if (shardInfo.clusterId === undefined) + throw new Error("Cluster ID must be specified"); + if ("contentTopics" in shardInfo) { + return shardInfo.contentTopics.map((contentTopic) => + contentTopicToPubsubTopic(contentTopic, shardInfo.clusterId) + ); + } else { + if (shardInfo.shards === undefined) throw new Error("Invalid shard"); + return shardInfo.shards.map( + (index) => `/waku/2/rs/${shardInfo.clusterId}/${index}` + ); + } }; export const pubsubTopicToSingleShardInfo = ( @@ -140,3 +151,43 @@ export function contentTopicToPubsubTopic( const shardIndex = contentTopicToShardIndex(contentTopic, networkShards); return `/waku/2/rs/${clusterId}/${shardIndex}`; } + +/** + * Given an array of content topics, groups them together by their Pubsub topic as derived using the algorithm for autosharding. + * If any of the content topics are not properly formatted, the function will throw an error. + */ +export function contentTopicsByPubsubTopic( + contentTopics: string[], + clusterId: number = 1, + networkShards: number = 8 +): Map> { + const groupedContentTopics = new Map(); + for (const contentTopic of contentTopics) { + const pubsubTopic = contentTopicToPubsubTopic( + contentTopic, + clusterId, + networkShards + ); + let topics = groupedContentTopics.get(pubsubTopic); + if (!topics) { + groupedContentTopics.set(pubsubTopic, []); + topics = groupedContentTopics.get(pubsubTopic); + } + topics.push(contentTopic); + } + return groupedContentTopics; +} + +/** + * Used when creating encoders/decoders to determine which pubsub topic to use + */ +export function determinePubsubTopic( + contentTopic: string, + pubsubTopicShardInfo?: SingleShardInfo +): string { + return pubsubTopicShardInfo + ? pubsubTopicShardInfo.shard + ? singleShardInfoToPubsubTopic(pubsubTopicShardInfo) + : contentTopicToPubsubTopic(contentTopic, pubsubTopicShardInfo.clusterId) + : DefaultPubsubTopic; +}