From 2bc3735e4dcf85f06b3dee542024d7f20a40fac2 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Thu, 16 Nov 2023 15:17:17 +0300 Subject: [PATCH] feat: add support for autosharded pubsub topics tests: use a generator for sharded pubsub topics set pubsub topic in encoder/decoder based on sharding type add function for grouping content topics by pubsub topic add autosharding config to create options add autoshard rpc endpoints to nwaku and use in tests set autoshard pubsub topics in all protocols fix rebase with static sharding removes unused function remove console logs remove autosharding from ShardInfo, add to EncoderOptions fix enr and encoder/decoder options test that same application/version hashes to same shard index update comment on shard field fix spelling of autosharding fix content topic protocol in tests add sharding type alias and function to determine topic in encoders/decoders move DefaultPubsubTopic from core to interfaces --- packages/core/src/index.ts | 1 - packages/core/src/lib/base_protocol.ts | 6 +- packages/core/src/lib/filter/index.ts | 2 +- packages/core/src/lib/message/version_0.ts | 12 +- packages/core/src/lib/metadata/index.ts | 13 +- packages/core/src/lib/waku.ts | 7 +- .../src/lib => interfaces/src}/constants.ts | 0 packages/interfaces/src/index.ts | 1 + packages/interfaces/src/message.ts | 5 +- packages/interfaces/src/protocols.ts | 9 +- packages/message-encryption/src/ecies.ts | 11 +- packages/message-encryption/src/symmetric.ts | 11 +- packages/relay/src/index.ts | 2 +- packages/relay/src/topic_only_message.ts | 2 +- packages/sdk/src/create.ts | 4 +- packages/tests/src/message_collector.ts | 46 ++- packages/tests/src/node/node.ts | 37 +- .../tests/filter/multiple_pubsub.node.spec.ts | 176 +++++++++ packages/tests/tests/filter/ping.node.spec.ts | 2 +- packages/tests/tests/filter/push.node.spec.ts | 3 +- .../tests/tests/filter/subscribe.node.spec.ts | 8 +- .../tests/filter/unsubscribe.node.spec.ts | 3 +- packages/tests/tests/filter/utils.ts | 10 +- .../tests/tests/light-push/index.node.spec.ts | 9 +- .../light-push/multiple_pubsub.node.spec.ts | 166 ++++++++- packages/tests/tests/light-push/utils.ts | 18 +- .../tests/tests/relay/interop.node.spec.ts | 9 +- .../tests/relay/multiple_pubsub.node.spec.ts | 337 +++++++++++++++++- .../tests/tests/relay/subscribe.node.spec.ts | 4 +- .../tests/sharding/peer_management.spec.ts | 186 +++++++++- .../tests/sharding/running_nodes.spec.ts | 49 +++ .../tests/tests/store/cursor.node.spec.ts | 3 +- .../tests/store/error_handling.node.spec.ts | 2 +- packages/tests/tests/store/index.node.spec.ts | 9 +- .../tests/tests/store/multiple_pubsub.spec.ts | 156 +++++++- packages/tests/tests/store/order.node.spec.ts | 3 +- .../tests/tests/store/page_size.node.spec.ts | 2 +- .../tests/tests/store/sorting.node.spec.ts | 3 +- packages/tests/tests/store/utils.ts | 29 +- packages/tests/tests/utils.spec.ts | 10 +- .../tests/wait_for_remote_peer.node.spec.ts | 4 +- packages/utils/package.json | 2 +- packages/utils/src/common/sharding.spec.ts | 34 +- packages/utils/src/common/sharding.ts | 67 +++- 44 files changed, 1351 insertions(+), 122 deletions(-) rename packages/{core/src/lib => interfaces/src}/constants.ts (100%) diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 04e37bd19a..f51901a109 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -1,5 +1,4 @@ export { DefaultUserAgent } from "./lib/waku.js"; -export { DefaultPubsubTopic } from "./lib/constants.js"; export { createEncoder, createDecoder } from "./lib/message/version_0.js"; export type { Encoder, diff --git a/packages/core/src/lib/base_protocol.ts b/packages/core/src/lib/base_protocol.ts index f6fcd4663c..22d2ded86e 100644 --- a/packages/core/src/lib/base_protocol.ts +++ b/packages/core/src/lib/base_protocol.ts @@ -6,12 +6,12 @@ import type { IBaseProtocol, Libp2pComponents, PubsubTopic, - ShardInfo + ShardingParams } from "@waku/interfaces"; +import { DefaultPubsubTopic } from "@waku/interfaces"; import { shardInfoToPubsubTopics } from "@waku/utils"; import { getPeersForProtocol, selectPeerForProtocol } from "@waku/utils/libp2p"; -import { DefaultPubsubTopic } from "./constants.js"; import { filterPeers } from "./filterPeers.js"; import { StreamManager } from "./stream_manager.js"; @@ -97,7 +97,7 @@ export class BaseProtocol implements IBaseProtocol { return filterPeers(allPeersForProtocol, numPeers, maxBootstrapPeers); } - initializePubsubTopic(shardInfo?: ShardInfo): PubsubTopic[] { + initializePubsubTopic(shardInfo?: ShardingParams): PubsubTopic[] { return shardInfo ? shardInfoToPubsubTopics(shardInfo) : [DefaultPubsubTopic]; diff --git a/packages/core/src/lib/filter/index.ts b/packages/core/src/lib/filter/index.ts index 93c365dcd8..c54d1df39a 100644 --- a/packages/core/src/lib/filter/index.ts +++ b/packages/core/src/lib/filter/index.ts @@ -17,6 +17,7 @@ import type { SingleShardInfo, Unsubscribe } from "@waku/interfaces"; +import { DefaultPubsubTopic } from "@waku/interfaces"; import { WakuMessage } from "@waku/proto"; import { ensurePubsubTopicIsConfigured, @@ -30,7 +31,6 @@ import * as lp from "it-length-prefixed"; import { pipe } from "it-pipe"; import { BaseProtocol } from "../base_protocol.js"; -import { DefaultPubsubTopic } from "../constants.js"; import { FilterPushRpc, diff --git a/packages/core/src/lib/message/version_0.ts b/packages/core/src/lib/message/version_0.ts index bb37c50358..a55b82059f 100644 --- a/packages/core/src/lib/message/version_0.ts +++ b/packages/core/src/lib/message/version_0.ts @@ -11,9 +11,7 @@ import type { SingleShardInfo } from "@waku/interfaces"; import { proto_message as proto } from "@waku/proto"; -import { Logger, singleShardInfoToPubsubTopic } from "@waku/utils"; - -import { DefaultPubsubTopic } from "../constants.js"; +import { determinePubsubTopic, Logger } from "@waku/utils"; const log = new Logger("message:version-0"); const OneMillion = BigInt(1_000_000); @@ -128,9 +126,7 @@ export function createEncoder({ return new Encoder( contentTopic, ephemeral, - pubsubTopicShardInfo - ? singleShardInfoToPubsubTopic(pubsubTopicShardInfo) - : DefaultPubsubTopic, + determinePubsubTopic(contentTopic, pubsubTopicShardInfo), metaSetter ); } @@ -193,9 +189,7 @@ export function createDecoder( pubsubTopicShardInfo?: SingleShardInfo ): Decoder { return new Decoder( - pubsubTopicShardInfo - ? singleShardInfoToPubsubTopic(pubsubTopicShardInfo) - : DefaultPubsubTopic, + determinePubsubTopic(contentTopic, pubsubTopicShardInfo), contentTopic ); } diff --git a/packages/core/src/lib/metadata/index.ts b/packages/core/src/lib/metadata/index.ts index b3c6892ab0..9f95b5ba24 100644 --- a/packages/core/src/lib/metadata/index.ts +++ b/packages/core/src/lib/metadata/index.ts @@ -1,7 +1,12 @@ import type { PeerId } from "@libp2p/interface/peer-id"; import { IncomingStreamData } from "@libp2p/interface/stream-handler"; import { encodeRelayShard } from "@waku/enr"; -import type { IMetadata, Libp2pComponents, ShardInfo } from "@waku/interfaces"; +import type { + IMetadata, + Libp2pComponents, + ShardInfo, + ShardingParams +} from "@waku/interfaces"; import { proto_metadata } from "@waku/proto"; import { Logger } from "@waku/utils"; import all from "it-all"; @@ -16,9 +21,9 @@ const log = new Logger("metadata"); export const MetadataCodec = "/vac/waku/metadata/1.0.0"; class Metadata extends BaseProtocol { - private readonly shardInfo: ShardInfo; + private readonly shardInfo: ShardingParams; private libp2pComponents: Libp2pComponents; - constructor(shardInfo: ShardInfo, libp2p: Libp2pComponents) { + constructor(shardInfo: ShardingParams, libp2p: Libp2pComponents) { super(MetadataCodec, libp2p.components); this.libp2pComponents = libp2p; this.shardInfo = shardInfo; @@ -99,7 +104,7 @@ class Metadata extends BaseProtocol { } export function wakuMetadata( - shardInfo: ShardInfo + shardInfo: ShardingParams ): (components: Libp2pComponents) => IMetadata { return (components: Libp2pComponents) => new Metadata(shardInfo, components); } diff --git a/packages/core/src/lib/waku.ts b/packages/core/src/lib/waku.ts index 4d6b7fa038..c452cf6db6 100644 --- a/packages/core/src/lib/waku.ts +++ b/packages/core/src/lib/waku.ts @@ -8,14 +8,13 @@ import type { IStore, Libp2p, PubsubTopic, - ShardInfo, + ShardingParams, Waku } from "@waku/interfaces"; -import { Protocols } from "@waku/interfaces"; +import { DefaultPubsubTopic, Protocols } from "@waku/interfaces"; import { Logger, shardInfoToPubsubTopics } from "@waku/utils"; import { ConnectionManager } from "./connection_manager.js"; -import { DefaultPubsubTopic } from "./constants.js"; export const DefaultPingKeepAliveValueSecs = 5 * 60; export const DefaultRelayKeepAliveValueSecs = 5 * 60; @@ -57,7 +56,7 @@ export class WakuNode implements Waku { constructor( options: WakuOptions, libp2p: Libp2p, - pubsubShardInfo?: ShardInfo, + pubsubShardInfo?: ShardingParams, store?: (libp2p: Libp2p) => IStore, lightPush?: (libp2p: Libp2p) => ILightPush, filter?: (libp2p: Libp2p) => IFilter, diff --git a/packages/core/src/lib/constants.ts b/packages/interfaces/src/constants.ts similarity index 100% rename from packages/core/src/lib/constants.ts rename to packages/interfaces/src/constants.ts diff --git a/packages/interfaces/src/index.ts b/packages/interfaces/src/index.ts index 6ce8738a0d..03ff627af8 100644 --- a/packages/interfaces/src/index.ts +++ b/packages/interfaces/src/index.ts @@ -15,3 +15,4 @@ export * from "./libp2p.js"; export * from "./keep_alive_manager.js"; export * from "./dns_discovery.js"; export * from "./metadata.js"; +export * from "./constants.js"; diff --git a/packages/interfaces/src/message.ts b/packages/interfaces/src/message.ts index 40de174026..1a4dedeac9 100644 --- a/packages/interfaces/src/message.ts +++ b/packages/interfaces/src/message.ts @@ -2,7 +2,10 @@ import type { PubsubTopic } from "./misc.js"; export interface SingleShardInfo { clusterId: number; - shard: number; + /** + * Specifying this field indicates to the encoder/decoder that static sharding must be used. + */ + shard?: number; } export interface IRateLimitProof { diff --git a/packages/interfaces/src/protocols.ts b/packages/interfaces/src/protocols.ts index 5a9d733274..33650580da 100644 --- a/packages/interfaces/src/protocols.ts +++ b/packages/interfaces/src/protocols.ts @@ -21,6 +21,13 @@ export interface IBaseProtocol { removeLibp2pEventListener: Libp2p["removeEventListener"]; } +export type ContentTopicInfo = { + clusterId: number; + contentTopics: string[]; +}; + +export type ShardingParams = ShardInfo | ContentTopicInfo; + export type ProtocolCreateOptions = { /** * Waku supports usage of multiple pubsub topics. This is achieved through static sharding for now, and auto-sharding in the future. @@ -39,7 +46,7 @@ export type ProtocolCreateOptions = { * See [Waku v2 Topic Usage Recommendations](https://rfc.vac.dev/spec/23/) for details. * */ - shardInfo?: ShardInfo; + shardInfo?: ShardingParams; /** * You can pass options to the `Libp2p` instance used by {@link @waku/core!WakuNode} using the `libp2p` property. * This property is the same type as the one passed to [`Libp2p.create`](https://github.com/libp2p/js-libp2p/blob/master/doc/API.md#create) diff --git a/packages/message-encryption/src/ecies.ts b/packages/message-encryption/src/ecies.ts index 24d7aa0fee..6a892034eb 100644 --- a/packages/message-encryption/src/ecies.ts +++ b/packages/message-encryption/src/ecies.ts @@ -1,4 +1,3 @@ -import { DefaultPubsubTopic } from "@waku/core"; import { Decoder as DecoderV0 } from "@waku/core/lib/message/version_0"; import type { EncoderOptions as BaseEncoderOptions, @@ -11,7 +10,7 @@ import type { SingleShardInfo } from "@waku/interfaces"; import { WakuMessage } from "@waku/proto"; -import { Logger, singleShardInfoToPubsubTopic } from "@waku/utils"; +import { determinePubsubTopic, Logger } from "@waku/utils"; import { generatePrivateKey } from "./crypto/utils.js"; import { DecodedMessage } from "./decoded_message.js"; @@ -107,9 +106,7 @@ export function createEncoder({ metaSetter }: EncoderOptions): Encoder { return new Encoder( - pubsubTopicShardInfo - ? singleShardInfoToPubsubTopic(pubsubTopicShardInfo) - : DefaultPubsubTopic, + determinePubsubTopic(contentTopic, pubsubTopicShardInfo), contentTopic, publicKey, sigPrivKey, @@ -200,9 +197,7 @@ export function createDecoder( pubsubTopicShardInfo?: SingleShardInfo ): Decoder { return new Decoder( - pubsubTopicShardInfo - ? singleShardInfoToPubsubTopic(pubsubTopicShardInfo) - : DefaultPubsubTopic, + determinePubsubTopic(contentTopic, pubsubTopicShardInfo), contentTopic, privateKey ); diff --git a/packages/message-encryption/src/symmetric.ts b/packages/message-encryption/src/symmetric.ts index 9854f5c665..841784db55 100644 --- a/packages/message-encryption/src/symmetric.ts +++ b/packages/message-encryption/src/symmetric.ts @@ -1,4 +1,3 @@ -import { DefaultPubsubTopic } from "@waku/core"; import { Decoder as DecoderV0 } from "@waku/core/lib/message/version_0"; import type { EncoderOptions as BaseEncoderOptions, @@ -11,7 +10,7 @@ import type { SingleShardInfo } from "@waku/interfaces"; import { WakuMessage } from "@waku/proto"; -import { Logger, singleShardInfoToPubsubTopic } from "@waku/utils"; +import { determinePubsubTopic, Logger } from "@waku/utils"; import { generateSymmetricKey } from "./crypto/utils.js"; import { DecodedMessage } from "./decoded_message.js"; @@ -107,9 +106,7 @@ export function createEncoder({ metaSetter }: EncoderOptions): Encoder { return new Encoder( - pubsubTopicShardInfo - ? singleShardInfoToPubsubTopic(pubsubTopicShardInfo) - : DefaultPubsubTopic, + determinePubsubTopic(contentTopic, pubsubTopicShardInfo), contentTopic, symKey, sigPrivKey, @@ -200,9 +197,7 @@ export function createDecoder( pubsubTopicShardInfo?: SingleShardInfo ): Decoder { return new Decoder( - pubsubTopicShardInfo - ? singleShardInfoToPubsubTopic(pubsubTopicShardInfo) - : DefaultPubsubTopic, + determinePubsubTopic(contentTopic, pubsubTopicShardInfo), contentTopic, symKey ); diff --git a/packages/relay/src/index.ts b/packages/relay/src/index.ts index 8e7cce499e..26b5055722 100644 --- a/packages/relay/src/index.ts +++ b/packages/relay/src/index.ts @@ -9,7 +9,7 @@ import { SignaturePolicy } from "@chainsafe/libp2p-gossipsub/types"; import type { PeerId } from "@libp2p/interface/peer-id"; import type { PubSub as Libp2pPubsub } from "@libp2p/interface/pubsub"; import { sha256 } from "@noble/hashes/sha256"; -import { DefaultPubsubTopic } from "@waku/core"; +import { DefaultPubsubTopic } from "@waku/interfaces"; import { ActiveSubscriptions, Callback, diff --git a/packages/relay/src/topic_only_message.ts b/packages/relay/src/topic_only_message.ts index cd0916fe2b..2335ee7320 100644 --- a/packages/relay/src/topic_only_message.ts +++ b/packages/relay/src/topic_only_message.ts @@ -1,4 +1,4 @@ -import { DefaultPubsubTopic } from "@waku/core"; +import { DefaultPubsubTopic } from "@waku/interfaces"; import type { IDecodedMessage, IDecoder, diff --git a/packages/sdk/src/create.ts b/packages/sdk/src/create.ts index 11668689ee..cb96f5947c 100644 --- a/packages/sdk/src/create.ts +++ b/packages/sdk/src/create.ts @@ -23,7 +23,7 @@ import type { LightNode, ProtocolCreateOptions, RelayNode, - ShardInfo + ShardingParams } from "@waku/interfaces"; import { wakuPeerExchangeDiscovery } from "@waku/peer-exchange"; import { RelayCreateOptions, wakuGossipSub, wakuRelay } from "@waku/relay"; @@ -180,7 +180,7 @@ type MetadataService = { }; export async function defaultLibp2p( - shardInfo?: ShardInfo, + shardInfo?: ShardingParams, wakuGossipSub?: PubsubService["pubsub"], options?: Partial, userAgent?: string diff --git a/packages/tests/src/message_collector.ts b/packages/tests/src/message_collector.ts index 91f2a80454..2c60b9f679 100644 --- a/packages/tests/src/message_collector.ts +++ b/packages/tests/src/message_collector.ts @@ -1,4 +1,5 @@ -import { DecodedMessage, DefaultPubsubTopic } from "@waku/core"; +import { DecodedMessage } from "@waku/core"; +import { DefaultPubsubTopic } from "@waku/interfaces"; import { Logger } from "@waku/utils"; import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes"; import { AssertionError, expect } from "chai"; @@ -103,6 +104,49 @@ export class MessageCollector { } } + async waitForMessagesAutosharding( + numMessages: number, + options?: { + contentTopic: string; + timeoutDuration?: number; + exact?: boolean; + } + ): Promise { + const startTime = Date.now(); + const timeoutDuration = options?.timeoutDuration || 400; + const exact = options?.exact || false; + + while (this.count < numMessages) { + if (this.nwaku) { + try { + this.list = await this.nwaku.messagesAutosharding( + options!.contentTopic + ); + } catch (error) { + log.error(`Can't retrieve messages because of ${error}`); + await delay(10); + } + } + + if (Date.now() - startTime > timeoutDuration * numMessages) { + return false; + } + + await delay(10); + } + + if (exact) { + if (this.count == numMessages) { + return true; + } else { + log.warn(`Was expecting exactly ${numMessages} messages`); + return false; + } + } else { + return true; + } + } + // Verifies a received message against expected values. verifyReceivedMessage( index: number, diff --git a/packages/tests/src/node/node.ts b/packages/tests/src/node/node.ts index f14a0de279..54be7a908d 100644 --- a/packages/tests/src/node/node.ts +++ b/packages/tests/src/node/node.ts @@ -1,7 +1,7 @@ import type { PeerId } from "@libp2p/interface/peer-id"; import { peerIdFromString } from "@libp2p/peer-id"; import { Multiaddr, multiaddr } from "@multiformats/multiaddr"; -import { DefaultPubsubTopic } from "@waku/core"; +import { DefaultPubsubTopic } from "@waku/interfaces"; import { isDefined } from "@waku/utils"; import { Logger } from "@waku/utils"; import { bytesToHex, hexToBytes } from "@waku/utils/bytes"; @@ -216,6 +216,16 @@ export class NimGoNode { ]); } + async ensureSubscriptionsAutosharding( + contentTopics: string[] + ): Promise { + this.checkProcess(); + + return this.rpcCall("post_waku_v2_relay_v1_auto_subscriptions", [ + contentTopics + ]); + } + async sendMessage( message: MessageRpcQuery, pubsubTopic: string = DefaultPubsubTopic @@ -232,6 +242,18 @@ export class NimGoNode { ]); } + async sendMessageAutosharding(message: MessageRpcQuery): Promise { + this.checkProcess(); + + if (typeof message.timestamp === "undefined") { + message.timestamp = BigInt(new Date().valueOf()) * OneMillion; + } + + return this.rpcCall("post_waku_v2_relay_v1_auto_message", [ + message + ]); + } + async messages( pubsubTopic: string = DefaultPubsubTopic ): Promise { @@ -245,6 +267,19 @@ export class NimGoNode { return msgs.filter(isDefined); } + async messagesAutosharding( + contentTopic: string + ): Promise { + this.checkProcess(); + + const msgs = await this.rpcCall( + "get_waku_v2_relay_v1_auto_messages", + [contentTopic] + ); + + return msgs.filter(isDefined); + } + async getAsymmetricKeyPair(): Promise { this.checkProcess(); diff --git a/packages/tests/tests/filter/multiple_pubsub.node.spec.ts b/packages/tests/tests/filter/multiple_pubsub.node.spec.ts index 4be4cac9d3..9875e12d49 100644 --- a/packages/tests/tests/filter/multiple_pubsub.node.spec.ts +++ b/packages/tests/tests/filter/multiple_pubsub.node.spec.ts @@ -1,5 +1,6 @@ import { createDecoder, createEncoder, waitForRemotePeer } from "@waku/core"; import type { + ContentTopicInfo, IFilterSubscription, LightNode, ShardInfo, @@ -7,6 +8,7 @@ import type { } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces"; import { + contentTopicToPubsubTopic, pubsubTopicToSingleShardInfo, singleShardInfoToPubsubTopic } from "@waku/utils"; @@ -178,3 +180,177 @@ describe("Waku Filter V2: Multiple PubsubTopics", function () { } }); }); + +describe("Waku Filter V2 (Autosharding): Multiple PubsubTopics", function () { + // Set the timeout for all tests in this suite. Can be overwritten at test level + this.timeout(30000); + let waku: LightNode; + let nwaku: NimGoNode; + let nwaku2: NimGoNode; + let subscription: IFilterSubscription; + let messageCollector: MessageCollector; + + const customContentTopic1 = "/waku/2/content/utf8"; + const customContentTopic2 = "/myapp/1/latest/proto"; + const autoshardingPubsubTopic1 = contentTopicToPubsubTopic( + customContentTopic1, + 3 + ); + const autoshardingPubsubTopic2 = contentTopicToPubsubTopic( + customContentTopic2, + 3 + ); + const contentTopicInfo: ContentTopicInfo = { + clusterId: 3, + contentTopics: [customContentTopic1, customContentTopic2] + }; + const customEncoder1 = createEncoder({ + contentTopic: customContentTopic1, + pubsubTopicShardInfo: { + clusterId: 3 + } + }); + const customDecoder1 = createDecoder(customContentTopic1, { clusterId: 3 }); + const customEncoder2 = createEncoder({ + contentTopic: customContentTopic2, + pubsubTopicShardInfo: { + clusterId: 3 + } + }); + const customDecoder2 = createDecoder(customContentTopic2, { clusterId: 3 }); + + this.beforeEach(async function () { + this.timeout(15000); + [nwaku, waku] = await runNodes( + this, + [autoshardingPubsubTopic1, autoshardingPubsubTopic2], + contentTopicInfo + ); + subscription = await waku.filter.createSubscription( + pubsubTopicToSingleShardInfo(autoshardingPubsubTopic1) + ); + messageCollector = new MessageCollector(); + }); + + this.afterEach(async function () { + this.timeout(15000); + await tearDownNodes([nwaku, nwaku2], waku); + }); + + it("Subscribe and receive messages on autosharded pubsubtopic", async function () { + await subscription.subscribe([customDecoder1], messageCollector.callback); + await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") }); + expect( + await messageCollector.waitForMessagesAutosharding(1, { + contentTopic: customContentTopic1 + }) + ).to.eq(true); + messageCollector.verifyReceivedMessage(0, { + expectedContentTopic: customContentTopic1, + expectedPubsubTopic: autoshardingPubsubTopic1, + expectedMessageText: "M1" + }); + }); + + it("Subscribe and receive messages on 2 different pubsubtopics", async function () { + await subscription.subscribe([customDecoder1], messageCollector.callback); + + // Subscribe from the same lightnode to the 2nd pubsubtopic + const subscription2 = await waku.filter.createSubscription( + pubsubTopicToSingleShardInfo(autoshardingPubsubTopic2) + ); + + const messageCollector2 = new MessageCollector(); + + await subscription2.subscribe([customDecoder2], messageCollector2.callback); + + await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") }); + await waku.lightPush.send(customEncoder2, { payload: utf8ToBytes("M2") }); + + expect( + await messageCollector.waitForMessagesAutosharding(1, { + contentTopic: customContentTopic1 + }) + ).to.eq(true); + expect( + await messageCollector2.waitForMessagesAutosharding(1, { + contentTopic: customContentTopic2 + }) + ).to.eq(true); + + messageCollector.verifyReceivedMessage(0, { + expectedContentTopic: customContentTopic1, + expectedPubsubTopic: autoshardingPubsubTopic1, + expectedMessageText: "M1" + }); + + messageCollector2.verifyReceivedMessage(0, { + expectedContentTopic: customContentTopic2, + expectedPubsubTopic: autoshardingPubsubTopic2, + expectedMessageText: "M2" + }); + }); + + it("Subscribe and receive messages from 2 nwaku nodes each with different pubsubtopics", async function () { + await subscription.subscribe([customDecoder1], messageCollector.callback); + + // Set up and start a new nwaku node with customPubsubTopic1 + nwaku2 = new NimGoNode(makeLogFileName(this) + "2"); + await nwaku2.start({ + filter: true, + lightpush: true, + relay: true, + pubsubTopic: [autoshardingPubsubTopic2] + }); + await waku.dial(await nwaku2.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]); + + // Subscribe from the same lightnode to the new nwaku on the new pubsubtopic + const subscription2 = await waku.filter.createSubscription( + pubsubTopicToSingleShardInfo(autoshardingPubsubTopic2), + await nwaku2.getPeerId() + ); + await nwaku2.ensureSubscriptionsAutosharding([customContentTopic2]); + + const messageCollector2 = new MessageCollector(); + + await subscription2.subscribe([customDecoder2], messageCollector2.callback); + + // Making sure that messages are send and reveiced for both subscriptions + // While loop is done because of https://github.com/waku-org/js-waku/issues/1606 + while ( + !(await messageCollector.waitForMessagesAutosharding(1, { + contentTopic: customContentTopic1 + })) || + !(await messageCollector2.waitForMessagesAutosharding(1, { + contentTopic: customContentTopic2 + })) + ) { + await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") }); + await waku.lightPush.send(customEncoder2, { payload: utf8ToBytes("M2") }); + } + + messageCollector.verifyReceivedMessage(0, { + expectedContentTopic: customContentTopic1, + expectedPubsubTopic: autoshardingPubsubTopic1, + expectedMessageText: "M1" + }); + + messageCollector2.verifyReceivedMessage(0, { + expectedContentTopic: customContentTopic2, + expectedPubsubTopic: autoshardingPubsubTopic2, + expectedMessageText: "M2" + }); + }); + + it("Should fail to subscribe with decoder with wrong pubsubTopic", async function () { + // this subscription object is set up with the `customPubsubTopic` but we're passing it a Decoder with the `DefaultPubsubTopic` + try { + await subscription.subscribe([customDecoder2], messageCollector.callback); + } catch (error) { + expect((error as Error).message).to.include( + "Pubsub topic not configured" + ); + } + }); +}); diff --git a/packages/tests/tests/filter/ping.node.spec.ts b/packages/tests/tests/filter/ping.node.spec.ts index 2ec5cd8e1f..8a447f9360 100644 --- a/packages/tests/tests/filter/ping.node.spec.ts +++ b/packages/tests/tests/filter/ping.node.spec.ts @@ -1,4 +1,4 @@ -import { DefaultPubsubTopic } from "@waku/core"; +import { DefaultPubsubTopic } from "@waku/interfaces"; import type { IFilterSubscription, LightNode } from "@waku/interfaces"; import { utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; diff --git a/packages/tests/tests/filter/push.node.spec.ts b/packages/tests/tests/filter/push.node.spec.ts index d58b373934..db72aba606 100644 --- a/packages/tests/tests/filter/push.node.spec.ts +++ b/packages/tests/tests/filter/push.node.spec.ts @@ -1,5 +1,6 @@ -import { DefaultPubsubTopic, waitForRemotePeer } from "@waku/core"; +import { waitForRemotePeer } from "@waku/core"; import type { IFilterSubscription, LightNode } from "@waku/interfaces"; +import { DefaultPubsubTopic } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces"; import { utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; diff --git a/packages/tests/tests/filter/subscribe.node.spec.ts b/packages/tests/tests/filter/subscribe.node.spec.ts index 68b89e3d72..4e382a47e8 100644 --- a/packages/tests/tests/filter/subscribe.node.spec.ts +++ b/packages/tests/tests/filter/subscribe.node.spec.ts @@ -1,10 +1,6 @@ -import { - createDecoder, - createEncoder, - DefaultPubsubTopic, - waitForRemotePeer -} from "@waku/core"; +import { createDecoder, createEncoder, waitForRemotePeer } from "@waku/core"; import type { IFilterSubscription, LightNode } from "@waku/interfaces"; +import { DefaultPubsubTopic } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces"; import { ecies, diff --git a/packages/tests/tests/filter/unsubscribe.node.spec.ts b/packages/tests/tests/filter/unsubscribe.node.spec.ts index f2d4a405df..2ec1588b29 100644 --- a/packages/tests/tests/filter/unsubscribe.node.spec.ts +++ b/packages/tests/tests/filter/unsubscribe.node.spec.ts @@ -1,5 +1,6 @@ -import { createDecoder, createEncoder, DefaultPubsubTopic } from "@waku/core"; +import { createDecoder, createEncoder } from "@waku/core"; import type { IFilterSubscription, LightNode } from "@waku/interfaces"; +import { DefaultPubsubTopic } from "@waku/interfaces"; import { utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; diff --git a/packages/tests/tests/filter/utils.ts b/packages/tests/tests/filter/utils.ts index d9cb4f0135..2541184101 100644 --- a/packages/tests/tests/filter/utils.ts +++ b/packages/tests/tests/filter/utils.ts @@ -1,14 +1,10 @@ +import { createDecoder, createEncoder, waitForRemotePeer } from "@waku/core"; import { - createDecoder, - createEncoder, DefaultPubsubTopic, - waitForRemotePeer -} from "@waku/core"; -import { IFilterSubscription, LightNode, Protocols, - ShardInfo + ShardingParams } from "@waku/interfaces"; import { createLightNode } from "@waku/sdk"; import { Logger } from "@waku/utils"; @@ -50,7 +46,7 @@ export async function runNodes( context: Context, //TODO: change this to use `ShardInfo` instead of `string[]` pubsubTopics: string[], - shardInfo?: ShardInfo + shardInfo?: ShardingParams ): Promise<[NimGoNode, LightNode]> { const nwaku = new NimGoNode(makeLogFileName(context)); diff --git a/packages/tests/tests/light-push/index.node.spec.ts b/packages/tests/tests/light-push/index.node.spec.ts index 7e569cba47..cf00748fb2 100644 --- a/packages/tests/tests/light-push/index.node.spec.ts +++ b/packages/tests/tests/light-push/index.node.spec.ts @@ -1,5 +1,10 @@ -import { createEncoder, DefaultPubsubTopic } from "@waku/core"; -import { IRateLimitProof, LightNode, SendError } from "@waku/interfaces"; +import { createEncoder } from "@waku/core"; +import { + DefaultPubsubTopic, + IRateLimitProof, + LightNode, + SendError +} from "@waku/interfaces"; import { utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; diff --git a/packages/tests/tests/light-push/multiple_pubsub.node.spec.ts b/packages/tests/tests/light-push/multiple_pubsub.node.spec.ts index c598122ab4..32d7bad791 100644 --- a/packages/tests/tests/light-push/multiple_pubsub.node.spec.ts +++ b/packages/tests/tests/light-push/multiple_pubsub.node.spec.ts @@ -1,13 +1,17 @@ import type { PeerId } from "@libp2p/interface/peer-id"; import { createEncoder, waitForRemotePeer } from "@waku/core"; import { + ContentTopicInfo, LightNode, Protocols, SendResult, ShardInfo, SingleShardInfo } from "@waku/interfaces"; -import { singleShardInfoToPubsubTopic } from "@waku/utils"; +import { + contentTopicToPubsubTopic, + singleShardInfoToPubsubTopic +} from "@waku/utils"; import { utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; @@ -116,7 +120,7 @@ describe("Waku Light Push : Multiple PubsubTopics", function () { messageCollector2.verifyReceivedMessage(0, { expectedMessageText: "M2", expectedContentTopic: customContentTopic2, - expectedPubsubTopic: customPubsubTopic2 + expectedPubsubTopic: customPubsubTopic1 }); }); @@ -169,3 +173,161 @@ describe("Waku Light Push : Multiple PubsubTopics", function () { }); }); }); + +describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () { + this.timeout(30000); + let waku: LightNode; + let nwaku: NimGoNode; + let nwaku2: NimGoNode; + let messageCollector: MessageCollector; + + // When using lightpush, we have to use a cluster id of 1 because that is the default cluster id for autosharding + // With a different cluster id, we never find a viable peer + const clusterId = 1; + const customContentTopic1 = "/waku/2/content/utf8"; + const customContentTopic2 = "/myapp/1/latest/proto"; + const autoshardingPubsubTopic1 = contentTopicToPubsubTopic( + customContentTopic1, + clusterId + ); + const autoshardingPubsubTopic2 = contentTopicToPubsubTopic( + customContentTopic2, + clusterId + ); + const contentTopicInfo: ContentTopicInfo = { + clusterId, + contentTopics: [customContentTopic1, customContentTopic2] + }; + const customEncoder1 = createEncoder({ + contentTopic: customContentTopic1, + pubsubTopicShardInfo: { + clusterId + } + }); + const customEncoder2 = createEncoder({ + contentTopic: customContentTopic2, + pubsubTopicShardInfo: { clusterId } + }); + + let nimPeerId: PeerId; + + this.beforeEach(async function () { + this.timeout(15000); + [nwaku, waku] = await runNodes( + this, + [autoshardingPubsubTopic1, autoshardingPubsubTopic2], + contentTopicInfo + ); + messageCollector = new MessageCollector(nwaku); + nimPeerId = await nwaku.getPeerId(); + }); + + this.afterEach(async function () { + this.timeout(15000); + await tearDownNodes([nwaku, nwaku2], waku); + }); + + it("Push message on custom pubsubTopic", async function () { + const pushResponse = await waku.lightPush.send(customEncoder1, { + payload: utf8ToBytes(messageText) + }); + + expect(pushResponse.errors).to.be.empty; + expect(pushResponse.recipients[0].toString()).to.eq(nimPeerId.toString()); + + expect( + await messageCollector.waitForMessagesAutosharding(1, { + contentTopic: customContentTopic1 + }) + ).to.eq(true); + messageCollector.verifyReceivedMessage(0, { + expectedMessageText: messageText, + expectedContentTopic: customContentTopic1 + }); + }); + + it("Subscribe and receive messages on 2 different pubsubtopics", async function () { + const pushResponse1 = await waku.lightPush.send(customEncoder1, { + payload: utf8ToBytes("M1") + }); + const pushResponse2 = await waku.lightPush.send(customEncoder2, { + payload: utf8ToBytes("M2") + }); + expect(pushResponse1.recipients[0].toString()).to.eq(nimPeerId.toString()); + expect(pushResponse2.recipients[0].toString()).to.eq(nimPeerId.toString()); + + const messageCollector2 = new MessageCollector(nwaku); + + expect( + await messageCollector.waitForMessagesAutosharding(1, { + contentTopic: customContentTopic1 + }) + ).to.eq(true); + + expect( + await messageCollector2.waitForMessagesAutosharding(1, { + contentTopic: customContentTopic2 + }) + ).to.eq(true); + + messageCollector.verifyReceivedMessage(0, { + expectedMessageText: "M1", + expectedContentTopic: customContentTopic1, + expectedPubsubTopic: autoshardingPubsubTopic1 + }); + messageCollector2.verifyReceivedMessage(0, { + expectedMessageText: "M2", + expectedContentTopic: customContentTopic2, + expectedPubsubTopic: autoshardingPubsubTopic2 + }); + }); + + it("Light push messages to 2 nwaku nodes each with different pubsubtopics", async function () { + // Set up and start a new nwaku node with Default PubsubTopic + nwaku2 = new NimGoNode(makeLogFileName(this) + "2"); + await nwaku2.start({ + filter: true, + lightpush: true, + relay: true, + pubsubTopic: [autoshardingPubsubTopic2] + }); + await nwaku2.ensureSubscriptionsAutosharding([customContentTopic2]); + await waku.dial(await nwaku2.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.LightPush]); + + const messageCollector2 = new MessageCollector(nwaku2); + + let pushResponse1: SendResult; + let pushResponse2: SendResult; + // Making sure that we send messages to both nwaku nodes + // While loop is done because of https://github.com/waku-org/js-waku/issues/1606 + while ( + !(await messageCollector.waitForMessagesAutosharding(1, { + contentTopic: customContentTopic1 + })) || + !(await messageCollector2.waitForMessagesAutosharding(1, { + contentTopic: customContentTopic2 + })) || + pushResponse1!.recipients[0].toString() === + pushResponse2!.recipients[0].toString() + ) { + pushResponse1 = await waku.lightPush.send(customEncoder1, { + payload: utf8ToBytes("M1") + }); + pushResponse2 = await waku.lightPush.send(customEncoder2, { + payload: utf8ToBytes("M2") + }); + } + + messageCollector.verifyReceivedMessage(0, { + expectedMessageText: "M1", + expectedContentTopic: customContentTopic1, + expectedPubsubTopic: autoshardingPubsubTopic1 + }); + messageCollector2.verifyReceivedMessage(0, { + expectedMessageText: "M2", + expectedContentTopic: customContentTopic2, + expectedPubsubTopic: autoshardingPubsubTopic2 + }); + }); +}); diff --git a/packages/tests/tests/light-push/utils.ts b/packages/tests/tests/light-push/utils.ts index 4f30082c09..635d8e3989 100644 --- a/packages/tests/tests/light-push/utils.ts +++ b/packages/tests/tests/light-push/utils.ts @@ -1,9 +1,10 @@ +import { createEncoder, waitForRemotePeer } from "@waku/core"; import { - createEncoder, DefaultPubsubTopic, - waitForRemotePeer -} from "@waku/core"; -import { LightNode, Protocols, ShardInfo } from "@waku/interfaces"; + LightNode, + Protocols, + ShardingParams +} from "@waku/interfaces"; import { createLightNode, utf8ToBytes } from "@waku/sdk"; import { Logger } from "@waku/utils"; @@ -19,7 +20,7 @@ export const messagePayload = { payload: utf8ToBytes(messageText) }; export async function runNodes( context: Mocha.Context, pubsubTopics: string[], - shardInfo?: ShardInfo + shardInfo?: ShardingParams ): Promise<[NimGoNode, LightNode]> { const nwaku = new NimGoNode(makeLogFileName(context)); await nwaku.start( @@ -44,6 +45,13 @@ export async function runNodes( if (waku) { await waku.dial(await nwaku.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.LightPush]); + if ( + shardInfo && + "contentTopics" in shardInfo && + shardInfo.contentTopics.length > 0 + ) { + await nwaku.ensureSubscriptionsAutosharding(shardInfo.contentTopics); + } await nwaku.ensureSubscriptions(pubsubTopics); return [nwaku, waku]; } else { diff --git a/packages/tests/tests/relay/interop.node.spec.ts b/packages/tests/tests/relay/interop.node.spec.ts index ff6030b5a6..47cf2eeeba 100644 --- a/packages/tests/tests/relay/interop.node.spec.ts +++ b/packages/tests/tests/relay/interop.node.spec.ts @@ -1,11 +1,6 @@ import type { PeerId } from "@libp2p/interface/peer-id"; -import { - DecodedMessage, - DefaultPubsubTopic, - waitForRemotePeer -} from "@waku/core"; -import { RelayNode } from "@waku/interfaces"; -import { Protocols } from "@waku/interfaces"; +import { DecodedMessage, waitForRemotePeer } from "@waku/core"; +import { DefaultPubsubTopic, Protocols, RelayNode } from "@waku/interfaces"; import { createRelayNode } from "@waku/sdk"; import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; diff --git a/packages/tests/tests/relay/multiple_pubsub.node.spec.ts b/packages/tests/tests/relay/multiple_pubsub.node.spec.ts index a870f0cba2..f1cfaf886f 100644 --- a/packages/tests/tests/relay/multiple_pubsub.node.spec.ts +++ b/packages/tests/tests/relay/multiple_pubsub.node.spec.ts @@ -4,10 +4,18 @@ import { DecodedMessage, waitForRemotePeer } from "@waku/core"; -import { RelayNode, ShardInfo, SingleShardInfo } from "@waku/interfaces"; +import { + ContentTopicInfo, + RelayNode, + ShardInfo, + SingleShardInfo +} from "@waku/interfaces"; import { Protocols } from "@waku/interfaces"; import { createRelayNode } from "@waku/sdk"; -import { singleShardInfoToPubsubTopic } from "@waku/utils"; +import { + contentTopicToPubsubTopic, + singleShardInfoToPubsubTopic +} from "@waku/utils"; import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; @@ -305,3 +313,328 @@ describe("Waku Relay, multiple pubsub topics", function () { expect(waku2ReceivedMsg.pubsubTopic).to.eq(customPubsubTopic1); }); }); + +describe("Waku Relay (Autosharding), multiple pubsub topics", function () { + this.timeout(15000); + let waku1: RelayNode; + let waku2: RelayNode; + let waku3: RelayNode; + + const customContentTopic1 = "/waku/2/content/utf8"; + const customContentTopic2 = "/myapp/1/latest/proto"; + const autoshardingPubsubTopic1 = contentTopicToPubsubTopic( + customContentTopic1, + 3 + ); + const autoshardingPubsubTopic2 = contentTopicToPubsubTopic( + customContentTopic2, + 3 + ); + const contentTopicInfo1: ContentTopicInfo = { + clusterId: 3, + contentTopics: [customContentTopic1] + }; + const contentTopicInfo2: ContentTopicInfo = { + clusterId: 3, + contentTopics: [customContentTopic2] + }; + const customEncoder1 = createEncoder({ + contentTopic: customContentTopic1, + pubsubTopicShardInfo: { + clusterId: 3 + } + }); + const customDecoder1 = createDecoder(customContentTopic1, { clusterId: 3 }); + const customEncoder2 = createEncoder({ + contentTopic: customContentTopic2, + pubsubTopicShardInfo: { clusterId: 3 } + }); + const customDecoder2 = createDecoder(customContentTopic2, { clusterId: 3 }); + const contentTopicInfoBothShards: ContentTopicInfo = { + clusterId: 3, + contentTopics: [customContentTopic1, customContentTopic2] + }; + + afterEach(async function () { + this.timeout(15000); + await tearDownNodes([], [waku1, waku2, waku3]); + }); + + [ + { + pubsub: autoshardingPubsubTopic1, + shardInfo: contentTopicInfo1, + encoder: customEncoder1, + decoder: customDecoder1 + }, + { + pubsub: autoshardingPubsubTopic2, + shardInfo: contentTopicInfo2, + encoder: customEncoder2, + decoder: customDecoder2 + } + ].forEach((testItem) => { + it(`3 nodes on ${testItem.pubsub} topic`, async function () { + const [msgCollector1, msgCollector2, msgCollector3] = Array(3) + .fill(null) + .map(() => new MessageCollector()); + + [waku1, waku2, waku3] = await Promise.all([ + createRelayNode({ + shardInfo: testItem.shardInfo, + staticNoiseKey: NOISE_KEY_1 + }).then((waku) => waku.start().then(() => waku)), + createRelayNode({ + shardInfo: testItem.shardInfo, + staticNoiseKey: NOISE_KEY_2, + libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } } + }).then((waku) => waku.start().then(() => waku)), + createRelayNode({ + shardInfo: testItem.shardInfo, + staticNoiseKey: NOISE_KEY_3 + }).then((waku) => waku.start().then(() => waku)) + ]); + + await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, { + multiaddrs: waku2.libp2p.getMultiaddrs() + }); + await waku3.libp2p.peerStore.merge(waku2.libp2p.peerId, { + multiaddrs: waku2.libp2p.getMultiaddrs() + }); + await Promise.all([ + waku1.dial(waku2.libp2p.peerId), + waku3.dial(waku2.libp2p.peerId) + ]); + + await Promise.all([ + waitForRemotePeer(waku1, [Protocols.Relay]), + waitForRemotePeer(waku2, [Protocols.Relay]), + waitForRemotePeer(waku3, [Protocols.Relay]) + ]); + + await waku1.relay.subscribe([testItem.decoder], msgCollector1.callback); + await waku2.relay.subscribe([testItem.decoder], msgCollector2.callback); + await waku3.relay.subscribe([testItem.decoder], msgCollector3.callback); + + // The nodes are setup in such a way that all messages send should be relayed to the other nodes in the network + const relayResponse1 = await waku1.relay.send(testItem.encoder, { + payload: utf8ToBytes("M1") + }); + const relayResponse2 = await waku2.relay.send(testItem.encoder, { + payload: utf8ToBytes("M2") + }); + const relayResponse3 = await waku3.relay.send(testItem.encoder, { + payload: utf8ToBytes("M3") + }); + + expect(relayResponse1.recipients[0].toString()).to.eq( + waku2.libp2p.peerId.toString() + ); + expect(relayResponse3.recipients[0].toString()).to.eq( + waku2.libp2p.peerId.toString() + ); + expect(relayResponse2.recipients.map((r) => r.toString())).to.include( + waku1.libp2p.peerId.toString() + ); + expect(relayResponse2.recipients.map((r) => r.toString())).to.include( + waku3.libp2p.peerId.toString() + ); + + expect( + await msgCollector1.waitForMessagesAutosharding(2, { + contentTopic: testItem.encoder.contentTopic, + exact: true + }) + ).to.eq(true); + expect( + await msgCollector2.waitForMessagesAutosharding(2, { + contentTopic: testItem.encoder.contentTopic, + exact: true + }) + ).to.eq(true); + expect( + await msgCollector3.waitForMessagesAutosharding(2, { + contentTopic: testItem.encoder.contentTopic, + exact: true + }) + ).to.eq(true); + + expect( + msgCollector1.hasMessage(testItem.encoder.contentTopic, "M2") + ).to.eq(true); + expect( + msgCollector1.hasMessage(testItem.encoder.contentTopic, "M3") + ).to.eq(true); + expect( + msgCollector2.hasMessage(testItem.encoder.contentTopic, "M1") + ).to.eq(true); + expect( + msgCollector2.hasMessage(testItem.encoder.contentTopic, "M3") + ).to.eq(true); + expect( + msgCollector3.hasMessage(testItem.encoder.contentTopic, "M1") + ).to.eq(true); + expect( + msgCollector3.hasMessage(testItem.encoder.contentTopic, "M2") + ).to.eq(true); + }); + }); + + it("Nodes with multiple pubsub topic", async function () { + const [msgCollector1, msgCollector2, msgCollector3] = Array(3) + .fill(null) + .map(() => new MessageCollector()); + + // Waku1 and waku2 are using multiple pubsub topis + [waku1, waku2, waku3] = await Promise.all([ + createRelayNode({ + shardInfo: contentTopicInfoBothShards, + staticNoiseKey: NOISE_KEY_1 + }).then((waku) => waku.start().then(() => waku)), + createRelayNode({ + shardInfo: contentTopicInfoBothShards, + staticNoiseKey: NOISE_KEY_2, + libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } } + }).then((waku) => waku.start().then(() => waku)), + createRelayNode({ + shardInfo: contentTopicInfo1, + staticNoiseKey: NOISE_KEY_3 + }).then((waku) => waku.start().then(() => waku)) + ]); + + await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, { + multiaddrs: waku2.libp2p.getMultiaddrs() + }); + await waku3.libp2p.peerStore.merge(waku2.libp2p.peerId, { + multiaddrs: waku2.libp2p.getMultiaddrs() + }); + await Promise.all([ + waku1.dial(waku2.libp2p.peerId), + waku3.dial(waku2.libp2p.peerId) + ]); + + await Promise.all([ + waitForRemotePeer(waku1, [Protocols.Relay]), + waitForRemotePeer(waku2, [Protocols.Relay]), + waitForRemotePeer(waku3, [Protocols.Relay]) + ]); + + await waku1.relay.subscribe( + [customDecoder1, customDecoder2], + msgCollector1.callback + ); + await waku2.relay.subscribe( + [customDecoder1, customDecoder2], + msgCollector2.callback + ); + await waku3.relay.subscribe([customDecoder1], msgCollector3.callback); + + // The nodes are setup in such a way that all messages send should be relayed to the other nodes in the network + // However onlt waku1 and waku2 are receiving messages on the CustomPubSubTopic + await waku1.relay.send(customEncoder1, { payload: utf8ToBytes("M1") }); + await waku1.relay.send(customEncoder2, { payload: utf8ToBytes("M2") }); + await waku2.relay.send(customEncoder1, { payload: utf8ToBytes("M3") }); + await waku2.relay.send(customEncoder2, { payload: utf8ToBytes("M4") }); + await waku3.relay.send(customEncoder1, { payload: utf8ToBytes("M5") }); + await waku3.relay.send(customEncoder2, { payload: utf8ToBytes("M6") }); + + expect( + await msgCollector1.waitForMessagesAutosharding(3, { + contentTopic: customContentTopic1, + exact: true + }) + ).to.eq(true); + expect( + await msgCollector1.waitForMessagesAutosharding(3, { + contentTopic: customContentTopic2, + exact: true + }) + ).to.eq(true); + expect( + await msgCollector2.waitForMessagesAutosharding(3, { + contentTopic: customContentTopic1, + exact: true + }) + ).to.eq(true); + expect( + await msgCollector2.waitForMessagesAutosharding(3, { + contentTopic: customContentTopic2, + exact: true + }) + ).to.eq(true); + expect( + await msgCollector3.waitForMessagesAutosharding(2, { + contentTopic: customContentTopic1, + exact: true + }) + ).to.eq(true); + expect(msgCollector1.hasMessage(customContentTopic1, "M3")).to.eq(true); + expect(msgCollector1.hasMessage(customContentTopic2, "M4")).to.eq(true); + expect(msgCollector1.hasMessage(customContentTopic1, "M5")).to.eq(true); + expect(msgCollector2.hasMessage(customContentTopic1, "M1")).to.eq(true); + expect(msgCollector2.hasMessage(customContentTopic2, "M2")).to.eq(true); + expect(msgCollector2.hasMessage(customContentTopic1, "M5")).to.eq(true); + expect(msgCollector3.hasMessage(customContentTopic1, "M1")).to.eq(true); + expect(msgCollector3.hasMessage(customContentTopic1, "M3")).to.eq(true); + }); + + it("n1 and n2 uses a custom pubsub, n3 uses the default pubsub", async function () { + [waku1, waku2, waku3] = await Promise.all([ + createRelayNode({ + shardInfo: contentTopicInfo1, + staticNoiseKey: NOISE_KEY_1 + }).then((waku) => waku.start().then(() => waku)), + createRelayNode({ + shardInfo: contentTopicInfo1, + staticNoiseKey: NOISE_KEY_2, + libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } } + }).then((waku) => waku.start().then(() => waku)), + createRelayNode({ + staticNoiseKey: NOISE_KEY_3 + }).then((waku) => waku.start().then(() => waku)) + ]); + + await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, { + multiaddrs: waku2.libp2p.getMultiaddrs() + }); + await waku3.libp2p.peerStore.merge(waku2.libp2p.peerId, { + multiaddrs: waku2.libp2p.getMultiaddrs() + }); + await Promise.all([ + waku1.dial(waku2.libp2p.peerId), + waku3.dial(waku2.libp2p.peerId) + ]); + + await Promise.all([ + waitForRemotePeer(waku1, [Protocols.Relay]), + waitForRemotePeer(waku2, [Protocols.Relay]) + ]); + + const messageText = "Communicating using a custom pubsub topic"; + + const waku2ReceivedMsgPromise: Promise = new Promise( + (resolve) => { + void waku2.relay.subscribe([customDecoder1], resolve); + } + ); + + // The promise **fails** if we receive a message on the default + // pubsub topic. + const waku3NoMsgPromise: Promise = new Promise( + (resolve, reject) => { + void waku3.relay.subscribe([TestDecoder], reject); + setTimeout(resolve, 1000); + } + ); + + await waku1.relay.send(customEncoder1, { + payload: utf8ToBytes(messageText) + }); + + const waku2ReceivedMsg = await waku2ReceivedMsgPromise; + await waku3NoMsgPromise; + + expect(bytesToUtf8(waku2ReceivedMsg.payload!)).to.eq(messageText); + expect(waku2ReceivedMsg.pubsubTopic).to.eq(autoshardingPubsubTopic1); + }); +}); diff --git a/packages/tests/tests/relay/subscribe.node.spec.ts b/packages/tests/tests/relay/subscribe.node.spec.ts index d2c5fd59b9..bb67659cf3 100644 --- a/packages/tests/tests/relay/subscribe.node.spec.ts +++ b/packages/tests/tests/relay/subscribe.node.spec.ts @@ -1,5 +1,5 @@ -import { createDecoder, createEncoder, DefaultPubsubTopic } from "@waku/core"; -import { RelayNode } from "@waku/interfaces"; +import { createDecoder, createEncoder } from "@waku/core"; +import { DefaultPubsubTopic, RelayNode } from "@waku/interfaces"; import { createRelayNode } from "@waku/sdk"; import { utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; diff --git a/packages/tests/tests/sharding/peer_management.spec.ts b/packages/tests/tests/sharding/peer_management.spec.ts index 833795eb18..fcaacf097b 100644 --- a/packages/tests/tests/sharding/peer_management.spec.ts +++ b/packages/tests/tests/sharding/peer_management.spec.ts @@ -1,8 +1,17 @@ import { bootstrap } from "@libp2p/bootstrap"; import type { PeerId } from "@libp2p/interface/peer-id"; import { wakuPeerExchangeDiscovery } from "@waku/peer-exchange"; -import { createLightNode, LightNode, ShardInfo, Tags } from "@waku/sdk"; -import { singleShardInfoToPubsubTopic } from "@waku/utils"; +import { + ContentTopicInfo, + createLightNode, + LightNode, + ShardInfo, + Tags +} from "@waku/sdk"; +import { + contentTopicToPubsubTopic, + singleShardInfoToPubsubTopic +} from "@waku/utils"; import chai, { expect } from "chai"; import chaiAsPromised from "chai-as-promised"; import Sinon, { SinonSpy } from "sinon"; @@ -184,3 +193,176 @@ describe("Static Sharding: Peer Management", function () { }); }); }); + +describe("Autosharding: Peer Management", function () { + const ContentTopic = "/waku/2/content/test.js"; + + describe("Peer Exchange", function () { + let waku: LightNode; + let nwaku1: NimGoNode; + let nwaku2: NimGoNode; + let nwaku3: NimGoNode; + + let dialPeerSpy: SinonSpy; + + beforeEach(async function () { + this.timeout(15000); + nwaku1 = new NimGoNode(makeLogFileName(this) + "1_auto"); + nwaku2 = new NimGoNode(makeLogFileName(this) + "2_auto"); + nwaku3 = new NimGoNode(makeLogFileName(this) + "3_auto"); + }); + + afterEach(async function () { + this.timeout(15000); + await tearDownNodes([nwaku1, nwaku2, nwaku3], waku); + dialPeerSpy && dialPeerSpy.restore(); + }); + + it("all px service nodes subscribed to the shard topic should be dialed", async function () { + this.timeout(100_000); + + const pubsubTopics = [contentTopicToPubsubTopic(ContentTopic, 1)]; + const contentTopicInfo: ContentTopicInfo = { + clusterId: 1, + contentTopics: [ContentTopic] + }; + + await nwaku1.start({ + pubsubTopic: pubsubTopics, + discv5Discovery: true, + peerExchange: true, + relay: true + }); + + const enr1 = (await nwaku1.info()).enrUri; + + await nwaku2.start({ + pubsubTopic: pubsubTopics, + discv5Discovery: true, + peerExchange: true, + discv5BootstrapNode: enr1, + relay: true + }); + + const enr2 = (await nwaku2.info()).enrUri; + + await nwaku3.start({ + pubsubTopic: pubsubTopics, + discv5Discovery: true, + peerExchange: true, + discv5BootstrapNode: enr2, + relay: true + }); + const nwaku3Ma = await nwaku3.getMultiaddrWithId(); + + waku = await createLightNode({ + shardInfo: contentTopicInfo, + libp2p: { + peerDiscovery: [ + bootstrap({ list: [nwaku3Ma.toString()] }), + wakuPeerExchangeDiscovery() + ] + } + }); + + await waku.start(); + + dialPeerSpy = Sinon.spy((waku as any).connectionManager, "dialPeer"); + + const pxPeersDiscovered = new Set(); + + await new Promise((resolve) => { + waku.libp2p.addEventListener("peer:discovery", (evt) => { + return void (async () => { + const peerId = evt.detail.id; + const peer = await waku.libp2p.peerStore.get(peerId); + const tags = Array.from(peer.tags.keys()); + if (tags.includes(Tags.PEER_EXCHANGE)) { + pxPeersDiscovered.add(peerId); + if (pxPeersDiscovered.size === 2) { + resolve(); + } + } + })(); + }); + }); + + await delay(1000); + + expect(dialPeerSpy.callCount).to.equal(3); + }); + + it("px service nodes not subscribed to the shard should not be dialed", async function () { + this.timeout(100_000); + const pubsubTopicsToDial = [contentTopicToPubsubTopic(ContentTopic, 1)]; + const contentTopicInfoToDial: ContentTopicInfo = { + clusterId: 1, + contentTopics: [ContentTopic] + }; + const pubsubTopicsToIgnore = [contentTopicToPubsubTopic(ContentTopic, 2)]; + + // this service node is not subscribed to the shard + await nwaku1.start({ + pubsubTopic: pubsubTopicsToIgnore, + relay: true, + discv5Discovery: true, + peerExchange: true + }); + + const enr1 = (await nwaku1.info()).enrUri; + + await nwaku2.start({ + pubsubTopic: pubsubTopicsToDial, + relay: true, + discv5Discovery: true, + peerExchange: true, + discv5BootstrapNode: enr1 + }); + + const enr2 = (await nwaku2.info()).enrUri; + + await nwaku3.start({ + relay: true, + discv5Discovery: true, + peerExchange: true, + discv5BootstrapNode: enr2 + }); + const nwaku3Ma = await nwaku3.getMultiaddrWithId(); + + waku = await createLightNode({ + shardInfo: contentTopicInfoToDial, + libp2p: { + peerDiscovery: [ + bootstrap({ list: [nwaku3Ma.toString()] }), + wakuPeerExchangeDiscovery() + ] + } + }); + + dialPeerSpy = Sinon.spy((waku as any).connectionManager, "dialPeer"); + + await waku.start(); + + const pxPeersDiscovered = new Set(); + + await new Promise((resolve) => { + waku.libp2p.addEventListener("peer:discovery", (evt) => { + return void (async () => { + const peerId = evt.detail.id; + const peer = await waku.libp2p.peerStore.get(peerId); + const tags = Array.from(peer.tags.keys()); + if (tags.includes(Tags.PEER_EXCHANGE)) { + pxPeersDiscovered.add(peerId); + if (pxPeersDiscovered.size === 1) { + resolve(); + } + } + })(); + }); + }); + + await delay(1000); + expect(dialPeerSpy.callCount).to.equal(2); + }); + }); +}); diff --git a/packages/tests/tests/sharding/running_nodes.spec.ts b/packages/tests/tests/sharding/running_nodes.spec.ts index 043c0db423..41cbaefa86 100644 --- a/packages/tests/tests/sharding/running_nodes.spec.ts +++ b/packages/tests/tests/sharding/running_nodes.spec.ts @@ -20,6 +20,7 @@ const shardInfoBothShards: ShardInfo = { clusterId: 0, shards: [2, 3] }; const singleShardInfo1: SingleShardInfo = { clusterId: 0, shard: 2 }; const singleShardInfo2: SingleShardInfo = { clusterId: 0, shard: 3 }; const ContentTopic = "/waku/2/content/test.js"; +const ContentTopic2 = "/myapp/1/latest/proto"; describe("Static Sharding: Running Nodes", () => { let waku: LightNode; @@ -93,3 +94,51 @@ describe("Static Sharding: Running Nodes", () => { } }); }); + +describe("Autosharding: Running Nodes", () => { + let waku: LightNode; + let nwaku: NimGoNode; + + beforeEach(async function () { + this.timeout(15_000); + nwaku = new NimGoNode(makeLogFileName(this)); + await nwaku.start({ store: true, lightpush: true, relay: true }); + }); + + afterEach(async function () { + this.timeout(15000); + await tearDownNodes(nwaku, waku); + }); + + it("configure the node with multiple pubsub topics", async function () { + this.timeout(15_000); + waku = await createLightNode({ + shardInfo: { + ...shardInfoBothShards, + // For autosharding, we configure multiple pubsub topics by using two content topics that hash to different shards + contentTopics: [ContentTopic, ContentTopic2] + } + }); + + const encoder1 = createEncoder({ + contentTopic: ContentTopic, + pubsubTopicShardInfo: { clusterId: 0 } + }); + + const encoder2 = createEncoder({ + contentTopic: ContentTopic, + pubsubTopicShardInfo: { clusterId: 0 } + }); + + const request1 = await waku.lightPush.send(encoder1, { + payload: utf8ToBytes("Hello World") + }); + + const request2 = await waku.lightPush.send(encoder2, { + payload: utf8ToBytes("Hello World") + }); + + expect(request1.recipients.length).to.eq(0); + expect(request2.recipients.length).to.eq(0); + }); +}); diff --git a/packages/tests/tests/store/cursor.node.spec.ts b/packages/tests/tests/store/cursor.node.spec.ts index cddf6602af..55bfe97d2a 100644 --- a/packages/tests/tests/store/cursor.node.spec.ts +++ b/packages/tests/tests/store/cursor.node.spec.ts @@ -1,5 +1,6 @@ -import { createCursor, DecodedMessage, DefaultPubsubTopic } from "@waku/core"; +import { createCursor, DecodedMessage } from "@waku/core"; import type { LightNode } from "@waku/interfaces"; +import { DefaultPubsubTopic } from "@waku/interfaces"; import { bytesToUtf8 } from "@waku/utils/bytes"; import { expect } from "chai"; diff --git a/packages/tests/tests/store/error_handling.node.spec.ts b/packages/tests/tests/store/error_handling.node.spec.ts index e18f9bcc8f..148f76a6a8 100644 --- a/packages/tests/tests/store/error_handling.node.spec.ts +++ b/packages/tests/tests/store/error_handling.node.spec.ts @@ -1,4 +1,4 @@ -import { DefaultPubsubTopic } from "@waku/core"; +import { DefaultPubsubTopic } from "@waku/interfaces"; import { IMessage, type LightNode } from "@waku/interfaces"; import { expect } from "chai"; diff --git a/packages/tests/tests/store/index.node.spec.ts b/packages/tests/tests/store/index.node.spec.ts index c2347e54e0..c5e2c8e127 100644 --- a/packages/tests/tests/store/index.node.spec.ts +++ b/packages/tests/tests/store/index.node.spec.ts @@ -1,11 +1,6 @@ -import { - createDecoder, - DecodedMessage, - DefaultPubsubTopic, - waitForRemotePeer -} from "@waku/core"; +import { createDecoder, DecodedMessage, waitForRemotePeer } from "@waku/core"; import type { IMessage, LightNode } from "@waku/interfaces"; -import { Protocols } from "@waku/interfaces"; +import { DefaultPubsubTopic, Protocols } from "@waku/interfaces"; import { generatePrivateKey, generateSymmetricKey, diff --git a/packages/tests/tests/store/multiple_pubsub.spec.ts b/packages/tests/tests/store/multiple_pubsub.spec.ts index d476ebf0b3..e8c3fc9665 100644 --- a/packages/tests/tests/store/multiple_pubsub.spec.ts +++ b/packages/tests/tests/store/multiple_pubsub.spec.ts @@ -1,6 +1,7 @@ -import { waitForRemotePeer } from "@waku/core"; -import type { IMessage, LightNode } from "@waku/interfaces"; +import { createDecoder, waitForRemotePeer } from "@waku/core"; +import type { ContentTopicInfo, IMessage, LightNode } from "@waku/interfaces"; import { createLightNode, Protocols } from "@waku/sdk"; +import { contentTopicToPubsubTopic } from "@waku/utils"; import { expect } from "chai"; import { @@ -19,6 +20,7 @@ import { customShardedPubsubTopic2, processQueriedMessages, sendMessages, + sendMessagesAutosharding, shardInfo1, shardInfoBothShards, startAndConnectLightNode, @@ -169,3 +171,153 @@ describe("Waku Store, custom pubsub topic", function () { } }); }); + +describe("Waku Store (Autosharding), custom pubsub topic", function () { + this.timeout(15000); + let waku: LightNode; + let nwaku: NimGoNode; + let nwaku2: NimGoNode; + + const customContentTopic1 = "/waku/2/content/utf8"; + const customContentTopic2 = "/myapp/1/latest/proto"; + const clusterId = 1; + const autoshardingPubsubTopic1 = contentTopicToPubsubTopic( + customContentTopic1, + clusterId + ); + const autoshardingPubsubTopic2 = contentTopicToPubsubTopic( + customContentTopic2, + clusterId + ); + const contentTopicInfo1: ContentTopicInfo = { + clusterId, + contentTopics: [customContentTopic1] + }; + const customDecoder1 = createDecoder(customContentTopic1, { + clusterId + }); + const customDecoder2 = createDecoder(customContentTopic2, { + clusterId + }); + const contentTopicInfoBothShards: ContentTopicInfo = { + clusterId, + contentTopics: [customContentTopic1, customContentTopic2] + }; + + beforeEach(async function () { + this.timeout(15000); + nwaku = new NimGoNode(makeLogFileName(this)); + await nwaku.start({ + store: true, + pubsubTopic: [autoshardingPubsubTopic1, autoshardingPubsubTopic2], + relay: true + }); + await nwaku.ensureSubscriptionsAutosharding([ + customContentTopic1, + customContentTopic2 + ]); + }); + + afterEach(async function () { + this.timeout(15000); + await tearDownNodes([nwaku, nwaku2], waku); + }); + + it("Generator, custom pubsub topic", async function () { + await sendMessagesAutosharding(nwaku, totalMsgs, customContentTopic1); + waku = await startAndConnectLightNode(nwaku, [], contentTopicInfo1); + const messages = await processQueriedMessages( + waku, + [customDecoder1], + autoshardingPubsubTopic1 + ); + + expect(messages?.length).eq(totalMsgs); + const result = messages?.findIndex((msg) => { + return msg.payload![0]! === 0; + }); + expect(result).to.not.eq(-1); + }); + + it("Generator, 2 different pubsubtopics", async function () { + this.timeout(10000); + + const totalMsgs = 10; + await sendMessagesAutosharding(nwaku, totalMsgs, customContentTopic1); + await sendMessagesAutosharding(nwaku, totalMsgs, customContentTopic2); + + waku = await startAndConnectLightNode( + nwaku, + [], + contentTopicInfoBothShards + ); + + const customMessages = await processQueriedMessages( + waku, + [customDecoder1], + autoshardingPubsubTopic1 + ); + expect(customMessages?.length).eq(totalMsgs); + const result1 = customMessages?.findIndex((msg) => { + return msg.payload![0]! === 0; + }); + expect(result1).to.not.eq(-1); + + const testMessages = await processQueriedMessages( + waku, + [customDecoder2], + autoshardingPubsubTopic2 + ); + expect(testMessages?.length).eq(totalMsgs); + const result2 = testMessages?.findIndex((msg) => { + return msg.payload![0]! === 0; + }); + expect(result2).to.not.eq(-1); + }); + + it("Generator, 2 nwaku nodes each with different pubsubtopics", async function () { + this.timeout(10000); + + // Set up and start a new nwaku node with Default Pubsubtopic + nwaku2 = new NimGoNode(makeLogFileName(this) + "2"); + await nwaku2.start({ + store: true, + pubsubTopic: [autoshardingPubsubTopic2], + relay: true + }); + await nwaku2.ensureSubscriptionsAutosharding([customContentTopic2]); + + const totalMsgs = 10; + await sendMessagesAutosharding(nwaku, totalMsgs, customContentTopic1); + await sendMessagesAutosharding(nwaku2, totalMsgs, customContentTopic2); + + waku = await createLightNode({ + staticNoiseKey: NOISE_KEY_1, + shardInfo: contentTopicInfoBothShards + }); + await waku.start(); + + await waku.dial(await nwaku.getMultiaddrWithId()); + await waku.dial(await nwaku2.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.Store]); + + let customMessages: IMessage[] = []; + let testMessages: IMessage[] = []; + + while ( + customMessages.length != totalMsgs || + testMessages.length != totalMsgs + ) { + customMessages = await processQueriedMessages( + waku, + [customDecoder1], + autoshardingPubsubTopic1 + ); + testMessages = await processQueriedMessages( + waku, + [customDecoder2], + autoshardingPubsubTopic2 + ); + } + }); +}); diff --git a/packages/tests/tests/store/order.node.spec.ts b/packages/tests/tests/store/order.node.spec.ts index f4836f51db..9701807a05 100644 --- a/packages/tests/tests/store/order.node.spec.ts +++ b/packages/tests/tests/store/order.node.spec.ts @@ -1,5 +1,6 @@ -import { DecodedMessage, DefaultPubsubTopic, PageDirection } from "@waku/core"; +import { DecodedMessage, PageDirection } from "@waku/core"; import type { IMessage, LightNode } from "@waku/interfaces"; +import { DefaultPubsubTopic } from "@waku/interfaces"; import { expect } from "chai"; import { makeLogFileName, NimGoNode, tearDownNodes } from "../../src/index.js"; diff --git a/packages/tests/tests/store/page_size.node.spec.ts b/packages/tests/tests/store/page_size.node.spec.ts index 60092fd1f4..c4bfee0757 100644 --- a/packages/tests/tests/store/page_size.node.spec.ts +++ b/packages/tests/tests/store/page_size.node.spec.ts @@ -1,4 +1,4 @@ -import { DefaultPubsubTopic } from "@waku/core"; +import { DefaultPubsubTopic } from "@waku/interfaces"; import type { LightNode } from "@waku/interfaces"; import { expect } from "chai"; diff --git a/packages/tests/tests/store/sorting.node.spec.ts b/packages/tests/tests/store/sorting.node.spec.ts index a403febf69..7fa3ca6410 100644 --- a/packages/tests/tests/store/sorting.node.spec.ts +++ b/packages/tests/tests/store/sorting.node.spec.ts @@ -1,5 +1,6 @@ -import { DecodedMessage, DefaultPubsubTopic, PageDirection } from "@waku/core"; +import { DecodedMessage, PageDirection } from "@waku/core"; import type { IMessage, LightNode } from "@waku/interfaces"; +import { DefaultPubsubTopic } from "@waku/interfaces"; import { makeLogFileName, NimGoNode, tearDownNodes } from "../../src/index.js"; diff --git a/packages/tests/tests/store/utils.ts b/packages/tests/tests/store/utils.ts index 4a52290f66..76ce57f09f 100644 --- a/packages/tests/tests/store/utils.ts +++ b/packages/tests/tests/store/utils.ts @@ -3,10 +3,15 @@ import { createEncoder, DecodedMessage, Decoder, - DefaultPubsubTopic, waitForRemotePeer } from "@waku/core"; -import { LightNode, Protocols, ShardInfo } from "@waku/interfaces"; +import { + DefaultPubsubTopic, + LightNode, + Protocols, + ShardInfo, + ShardingParams +} from "@waku/interfaces"; import { createLightNode } from "@waku/sdk"; import { Logger, singleShardInfoToPubsubTopic } from "@waku/utils"; import { expect } from "chai"; @@ -61,6 +66,24 @@ export async function sendMessages( } } +export async function sendMessagesAutosharding( + instance: NimGoNode, + numMessages: number, + contentTopic: string +): Promise { + for (let i = 0; i < numMessages; i++) { + expect( + await instance.sendMessageAutosharding( + NimGoNode.toMessageRpcQuery({ + payload: new Uint8Array([i]), + contentTopic: contentTopic + }) + ) + ).to.eq(true); + await delay(1); // to ensure each timestamp is unique. + } +} + export async function processQueriedMessages( instance: LightNode, decoders: Array, @@ -81,7 +104,7 @@ export async function processQueriedMessages( export async function startAndConnectLightNode( instance: NimGoNode, pubsubTopics: string[] = [DefaultPubsubTopic], - shardInfo?: ShardInfo + shardInfo?: ShardingParams ): Promise { const waku = await createLightNode({ ...((pubsubTopics.length !== 1 || diff --git a/packages/tests/tests/utils.spec.ts b/packages/tests/tests/utils.spec.ts index eab0bb35d4..ecb1f02bf5 100644 --- a/packages/tests/tests/utils.spec.ts +++ b/packages/tests/tests/utils.spec.ts @@ -1,14 +1,8 @@ import type { PeerStore } from "@libp2p/interface/peer-store"; import type { Peer } from "@libp2p/interface/peer-store"; import { createSecp256k1PeerId } from "@libp2p/peer-id-factory"; -import { - createDecoder, - createEncoder, - DefaultPubsubTopic, - waitForRemotePeer -} from "@waku/core"; -import { LightNode } from "@waku/interfaces"; -import { Protocols } from "@waku/interfaces"; +import { createDecoder, createEncoder, waitForRemotePeer } from "@waku/core"; +import { DefaultPubsubTopic, LightNode, Protocols } from "@waku/interfaces"; import { createLightNode } from "@waku/sdk"; import { toAsyncIterator } from "@waku/utils"; import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes"; diff --git a/packages/tests/tests/wait_for_remote_peer.node.spec.ts b/packages/tests/tests/wait_for_remote_peer.node.spec.ts index cd33cb78fb..586d2574c0 100644 --- a/packages/tests/tests/wait_for_remote_peer.node.spec.ts +++ b/packages/tests/tests/wait_for_remote_peer.node.spec.ts @@ -1,6 +1,6 @@ -import { DefaultPubsubTopic, waitForRemotePeer } from "@waku/core"; +import { waitForRemotePeer } from "@waku/core"; import type { LightNode, RelayNode } from "@waku/interfaces"; -import { Protocols } from "@waku/interfaces"; +import { DefaultPubsubTopic, Protocols } from "@waku/interfaces"; import { createLightNode, createRelayNode } from "@waku/sdk"; import { expect } from "chai"; diff --git a/packages/utils/package.json b/packages/utils/package.json index 7a22787da9..3af685e546 100644 --- a/packages/utils/package.json +++ b/packages/utils/package.json @@ -68,6 +68,7 @@ }, "dependencies": { "@noble/hashes": "^1.3.2", + "@waku/interfaces": "0.0.20", "chai": "^4.3.10", "debug": "^4.3.4", "uint8arrays": "^4.0.4" @@ -77,7 +78,6 @@ "@rollup/plugin-json": "^6.0.0", "@rollup/plugin-node-resolve": "^15.2.3", "@waku/build-utils": "*", - "@waku/interfaces": "0.0.20", "cspell": "^7.3.2", "npm-run-all": "^4.1.5", "rollup": "^4.6.0" diff --git a/packages/utils/src/common/sharding.spec.ts b/packages/utils/src/common/sharding.spec.ts index fd206dd788..06484615ac 100644 --- a/packages/utils/src/common/sharding.spec.ts +++ b/packages/utils/src/common/sharding.spec.ts @@ -1,6 +1,11 @@ import { expect } from "chai"; -import { contentTopicToShardIndex, ensureValidContentTopic } from "./sharding"; +import { + contentTopicsByPubsubTopic, + contentTopicToPubsubTopic, + contentTopicToShardIndex, + ensureValidContentTopic +} from "./sharding"; const testInvalidCases = ( contentTopics: string[], @@ -91,10 +96,35 @@ describe("contentTopicToShardIndex", () => { it("converts content topics to expected shard index", () => { const contentTopics: [string, number][] = [ ["/toychat/2/huilong/proto", 3], - ["/myapp/1/latest/proto", 0] + ["/myapp/1/latest/proto", 0], + ["/waku/2/content/test.js", 1] ]; for (const [topic, shard] of contentTopics) { expect(contentTopicToShardIndex(topic)).to.eq(shard); } }); + + it("topics with same application and version share the same shard", () => { + const contentTopics: [string, string][] = [ + ["/toychat/2/huilong/proto", "/toychat/2/othertopic/otherencoding"], + ["/myapp/1/latest/proto", "/myapp/1/new/proto"], + ["/waku/2/content/test.js", "/waku/2/users/proto"] + ]; + for (const [topic1, topic2] of contentTopics) { + expect(contentTopicToShardIndex(topic1)).to.eq( + contentTopicToShardIndex(topic2) + ); + } + }); +}); + +describe("contentTopicsByPubsubTopic", () => { + it("groups content topics by expected pubsub topic", () => { + const contentTopics = ["/toychat/2/huilong/proto", "/myapp/1/latest/proto"]; + const grouped = contentTopicsByPubsubTopic(contentTopics); + for (const contentTopic of contentTopics) { + const pubsubTopic = contentTopicToPubsubTopic(contentTopic); + expect(grouped.get(pubsubTopic)?.includes(contentTopic)).to.be.true; + } + }); }); diff --git a/packages/utils/src/common/sharding.ts b/packages/utils/src/common/sharding.ts index 648cb72b17..f4a8afb71b 100644 --- a/packages/utils/src/common/sharding.ts +++ b/packages/utils/src/common/sharding.ts @@ -1,5 +1,10 @@ import { sha256 } from "@noble/hashes/sha256"; -import type { PubsubTopic, ShardInfo, SingleShardInfo } from "@waku/interfaces"; +import { + DefaultPubsubTopic, + PubsubTopic, + ShardingParams, + SingleShardInfo +} from "@waku/interfaces"; import { concat, utf8ToBytes } from "../bytes/index.js"; @@ -13,14 +18,20 @@ export const singleShardInfoToPubsubTopic = ( }; export const shardInfoToPubsubTopics = ( - shardInfo: ShardInfo + shardInfo: ShardingParams ): PubsubTopic[] => { - if (shardInfo.clusterId === undefined || shardInfo.shards === undefined) - throw new Error("Invalid shard"); - - return shardInfo.shards.map( - (index) => `/waku/2/rs/${shardInfo.clusterId}/${index}` - ); + if (shardInfo.clusterId === undefined) + throw new Error("Cluster ID must be specified"); + if ("contentTopics" in shardInfo) { + return shardInfo.contentTopics.map((contentTopic) => + contentTopicToPubsubTopic(contentTopic, shardInfo.clusterId) + ); + } else { + if (shardInfo.shards === undefined) throw new Error("Invalid shard"); + return shardInfo.shards.map( + (index) => `/waku/2/rs/${shardInfo.clusterId}/${index}` + ); + } }; export const pubsubTopicToSingleShardInfo = ( @@ -140,3 +151,43 @@ export function contentTopicToPubsubTopic( const shardIndex = contentTopicToShardIndex(contentTopic, networkShards); return `/waku/2/rs/${clusterId}/${shardIndex}`; } + +/** + * Given an array of content topics, groups them together by their Pubsub topic as derived using the algorithm for autosharding. + * If any of the content topics are not properly formatted, the function will throw an error. + */ +export function contentTopicsByPubsubTopic( + contentTopics: string[], + clusterId: number = 1, + networkShards: number = 8 +): Map> { + const groupedContentTopics = new Map(); + for (const contentTopic of contentTopics) { + const pubsubTopic = contentTopicToPubsubTopic( + contentTopic, + clusterId, + networkShards + ); + let topics = groupedContentTopics.get(pubsubTopic); + if (!topics) { + groupedContentTopics.set(pubsubTopic, []); + topics = groupedContentTopics.get(pubsubTopic); + } + topics.push(contentTopic); + } + return groupedContentTopics; +} + +/** + * Used when creating encoders/decoders to determine which pubsub topic to use + */ +export function determinePubsubTopic( + contentTopic: string, + pubsubTopicShardInfo?: SingleShardInfo +): string { + return pubsubTopicShardInfo + ? pubsubTopicShardInfo.shard + ? singleShardInfoToPubsubTopic(pubsubTopicShardInfo) + : contentTopicToPubsubTopic(contentTopic, pubsubTopicShardInfo.clusterId) + : DefaultPubsubTopic; +}