diff --git a/packages/core/src/lib/base_protocol.ts b/packages/core/src/lib/base_protocol.ts index 64dd435d7a..8ce6162a50 100644 --- a/packages/core/src/lib/base_protocol.ts +++ b/packages/core/src/lib/base_protocol.ts @@ -3,10 +3,9 @@ import type { Peer, PeerStore, Stream } from "@libp2p/interface"; import type { IBaseProtocolCore, Libp2pComponents, - ProtocolCreateOptions, PubsubTopic } from "@waku/interfaces"; -import { ensureShardingConfigured, Logger } from "@waku/utils"; +import { Logger, pubsubTopicsToShardInfo } from "@waku/utils"; import { getConnectedPeersForProtocolAndShard, getPeersForProtocol, @@ -29,8 +28,7 @@ export class BaseProtocol implements IBaseProtocolCore { public multicodec: string, private components: Libp2pComponents, private log: Logger, - public readonly pubsubTopics: PubsubTopic[], - private options?: ProtocolCreateOptions + public readonly pubsubTopics: PubsubTopic[] ) { this.addLibp2pEventListener = components.events.addEventListener.bind( components.events @@ -100,9 +98,7 @@ export class BaseProtocol implements IBaseProtocolCore { this.components.connectionManager.getConnections(), this.peerStore, [this.multicodec], - this.options?.shardInfo - ? ensureShardingConfigured(this.options.shardInfo).shardInfo - : undefined + pubsubTopicsToShardInfo(this.pubsubTopics) ); // Filter the peers based on discovery & number of peers requested diff --git a/packages/core/src/lib/connection_manager.ts b/packages/core/src/lib/connection_manager.ts index 6b23713eb5..3112f32d5f 100644 --- a/packages/core/src/lib/connection_manager.ts +++ b/packages/core/src/lib/connection_manager.ts @@ -173,7 +173,7 @@ export class ConnectionManager private constructor( libp2p: Libp2p, keepAliveOptions: KeepAliveOptions, - private configuredPubsubTopics: PubsubTopic[], + public readonly configuredPubsubTopics: PubsubTopic[], relay?: IRelay, options?: Partial ) { diff --git a/packages/core/src/lib/filter/index.ts b/packages/core/src/lib/filter/index.ts index 97ba6d4fe8..e4b64218fa 100644 --- a/packages/core/src/lib/filter/index.ts +++ b/packages/core/src/lib/filter/index.ts @@ -5,7 +5,6 @@ import { type CoreProtocolResult, type IBaseProtocolCore, type Libp2p, - type ProtocolCreateOptions, ProtocolError, type PubsubTopic } from "@waku/interfaces"; @@ -38,16 +37,10 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore { wakuMessage: WakuMessage, peerIdStr: string ) => Promise, - libp2p: Libp2p, - options?: ProtocolCreateOptions + public readonly pubsubTopics: PubsubTopic[], + libp2p: Libp2p ) { - super( - FilterCodecs.SUBSCRIBE, - libp2p.components, - log, - options!.pubsubTopics!, - options - ); + super(FilterCodecs.SUBSCRIBE, libp2p.components, log, pubsubTopics); libp2p .handle(FilterCodecs.PUSH, this.onRequest.bind(this), { diff --git a/packages/core/src/lib/light_push/index.ts b/packages/core/src/lib/light_push/index.ts index ec899d40e5..6e89da610d 100644 --- a/packages/core/src/lib/light_push/index.ts +++ b/packages/core/src/lib/light_push/index.ts @@ -5,8 +5,8 @@ import { type IEncoder, type IMessage, type Libp2p, - type ProtocolCreateOptions, ProtocolError, + PubsubTopic, type ThisOrThat } from "@waku/interfaces"; import { PushResponse } from "@waku/proto"; @@ -32,14 +32,11 @@ type PreparePushMessageResult = ThisOrThat<"query", PushRpc>; * Implements the [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/). */ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore { - public constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) { - super( - LightPushCodec, - libp2p.components, - log, - options!.pubsubTopics!, - options - ); + public constructor( + public readonly pubsubTopics: PubsubTopic[], + libp2p: Libp2p + ) { + super(LightPushCodec, libp2p.components, log, pubsubTopics); } private async preparePushMessage( diff --git a/packages/core/src/lib/metadata/index.ts b/packages/core/src/lib/metadata/index.ts index 183f5b3aa5..3dbf7ed8a2 100644 --- a/packages/core/src/lib/metadata/index.ts +++ b/packages/core/src/lib/metadata/index.ts @@ -6,10 +6,11 @@ import { type MetadataQueryResult, type PeerIdStr, ProtocolError, + PubsubTopic, type ShardInfo } from "@waku/interfaces"; import { proto_metadata } from "@waku/proto"; -import { encodeRelayShard, Logger, shardInfoToPubsubTopics } from "@waku/utils"; +import { encodeRelayShard, Logger, pubsubTopicsToShardInfo } from "@waku/utils"; import all from "it-all"; import * as lp from "it-length-prefixed"; import { pipe } from "it-pipe"; @@ -26,15 +27,10 @@ class Metadata extends BaseProtocol implements IMetadata { protected handshakesConfirmed: Map = new Map(); public constructor( - public shardInfo: ShardInfo, + public pubsubTopics: PubsubTopic[], libp2p: Libp2pComponents ) { - super( - MetadataCodec, - libp2p.components, - log, - shardInfoToPubsubTopics(shardInfo) - ); + super(MetadataCodec, libp2p.components, log, pubsubTopics); this.libp2pComponents = libp2p; void libp2p.registrar.handle(MetadataCodec, (streamData) => { void this.onRequest(streamData); @@ -45,7 +41,9 @@ class Metadata extends BaseProtocol implements IMetadata { * Make a metadata query to a peer */ public async query(peerId: PeerId): Promise { - const request = proto_metadata.WakuMetadataRequest.encode(this.shardInfo); + const request = proto_metadata.WakuMetadataRequest.encode( + pubsubTopicsToShardInfo(this.pubsubTopics) + ); const peer = await this.peerStore.get(peerId); if (!peer) { @@ -112,7 +110,7 @@ class Metadata extends BaseProtocol implements IMetadata { try { const { stream, connection } = streamData; const encodedShardInfo = proto_metadata.WakuMetadataResponse.encode( - this.shardInfo + pubsubTopicsToShardInfo(this.pubsubTopics) ); const encodedResponse = await pipe( @@ -177,7 +175,8 @@ class Metadata extends BaseProtocol implements IMetadata { } export function wakuMetadata( - shardInfo: ShardInfo + pubsubTopics: PubsubTopic[] ): (components: Libp2pComponents) => IMetadata { - return (components: Libp2pComponents) => new Metadata(shardInfo, components); + return (components: Libp2pComponents) => + new Metadata(pubsubTopics, components); } diff --git a/packages/core/src/lib/store/index.ts b/packages/core/src/lib/store/index.ts index 352461281f..a4b8b9c2e7 100644 --- a/packages/core/src/lib/store/index.ts +++ b/packages/core/src/lib/store/index.ts @@ -4,7 +4,7 @@ import { IDecoder, IStoreCore, Libp2p, - ProtocolCreateOptions, + PubsubTopic, QueryRequestParams } from "@waku/interfaces"; import { Logger } from "@waku/utils"; @@ -28,14 +28,11 @@ const log = new Logger("store"); export const StoreCodec = "/vac/waku/store-query/3.0.0"; export class StoreCore extends BaseProtocol implements IStoreCore { - public constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) { - super( - StoreCodec, - libp2p.components, - log, - options?.pubsubTopics || [], - options - ); + public constructor( + public readonly pubsubTopics: PubsubTopic[], + libp2p: Libp2p + ) { + super(StoreCodec, libp2p.components, log, pubsubTopics); } public async *queryPerPage( diff --git a/packages/interfaces/src/connection_manager.ts b/packages/interfaces/src/connection_manager.ts index 8c57e8210f..d85991eb3c 100644 --- a/packages/interfaces/src/connection_manager.ts +++ b/packages/interfaces/src/connection_manager.ts @@ -1,5 +1,7 @@ import type { Peer, PeerId, TypedEventEmitter } from "@libp2p/interface"; +import { PubsubTopic } from "./misc"; + export enum Tags { BOOTSTRAP = "bootstrap", PEER_EXCHANGE = "peer-exchange", @@ -61,6 +63,7 @@ export interface IConnectionStateEvents { export interface IConnectionManager extends TypedEventEmitter { + configuredPubsubTopics: PubsubTopic[]; dropConnection(peerId: PeerId): Promise; getPeersByDiscovery(): Promise; stop(): void; diff --git a/packages/interfaces/src/constants.ts b/packages/interfaces/src/constants.ts index 92c8afd7dc..f50485174d 100644 --- a/packages/interfaces/src/constants.ts +++ b/packages/interfaces/src/constants.ts @@ -1,4 +1,4 @@ -import { ShardInfo } from "./enr"; +import type { ShardInfo } from "./sharding"; /** * The default cluster ID for The Waku Network @@ -12,3 +12,5 @@ export const DefaultShardInfo: ShardInfo = { clusterId: DEFAULT_CLUSTER_ID, shards: [0, 1, 2, 3, 4, 5, 6, 7, 8] }; + +export const DefaultNetworkConfig = DefaultShardInfo; diff --git a/packages/interfaces/src/enr.ts b/packages/interfaces/src/enr.ts index 2d74614346..6bb663edc1 100644 --- a/packages/interfaces/src/enr.ts +++ b/packages/interfaces/src/enr.ts @@ -2,6 +2,8 @@ import type { PeerId } from "@libp2p/interface"; import type { PeerInfo } from "@libp2p/interface"; import type { Multiaddr } from "@multiformats/multiaddr"; +import { ShardInfo } from "./sharding"; + export type ENRKey = string; export type ENRValue = Uint8Array; /** @@ -18,11 +20,6 @@ export interface Waku2 { lightPush: boolean; } -export interface ShardInfo { - clusterId: number; - shards: number[]; -} - export interface IEnr extends Map { nodeId?: NodeId; peerId?: PeerId; diff --git a/packages/interfaces/src/index.ts b/packages/interfaces/src/index.ts index 44db3dcc62..8cfe38114f 100644 --- a/packages/interfaces/src/index.ts +++ b/packages/interfaces/src/index.ts @@ -18,3 +18,4 @@ export * from "./metadata.js"; export * from "./constants.js"; export * from "./local_storage.js"; export * from "./health_manager.js"; +export * from "./sharding.js"; diff --git a/packages/interfaces/src/metadata.ts b/packages/interfaces/src/metadata.ts index 31f0ced66e..974a3a4b5f 100644 --- a/packages/interfaces/src/metadata.ts +++ b/packages/interfaces/src/metadata.ts @@ -1,14 +1,14 @@ import type { PeerId } from "@libp2p/interface"; -import { type ShardInfo } from "./enr.js"; -import { ThisOrThat } from "./misc.js"; -import type { IBaseProtocolCore, ShardingParams } from "./protocols.js"; +import { PubsubTopic, ThisOrThat } from "./misc.js"; +import type { IBaseProtocolCore } from "./protocols.js"; +import type { ShardInfo } from "./sharding.js"; export type MetadataQueryResult = ThisOrThat<"shardInfo", ShardInfo>; // IMetadata always has shardInfo defined while it is optionally undefined in IBaseProtocol export interface IMetadata extends Omit { - shardInfo: ShardingParams; + pubsubTopics: PubsubTopic[]; confirmOrAttemptHandshake(peerId: PeerId): Promise; query(peerId: PeerId): Promise; } diff --git a/packages/interfaces/src/protocols.ts b/packages/interfaces/src/protocols.ts index 6d0a051513..47149b48f9 100644 --- a/packages/interfaces/src/protocols.ts +++ b/packages/interfaces/src/protocols.ts @@ -2,10 +2,10 @@ import type { Libp2p } from "@libp2p/interface"; import type { PeerId } from "@libp2p/interface"; import type { Peer, PeerStore } from "@libp2p/interface"; -import type { ShardInfo } from "./enr.js"; import type { CreateLibp2pOptions } from "./libp2p.js"; import type { IDecodedMessage } from "./message.js"; -import { PubsubTopic, ThisAndThat, ThisOrThat } from "./misc.js"; +import { ThisAndThat, ThisOrThat } from "./misc.js"; +import { AutoSharding, StaticSharding } from "./sharding.js"; export enum Protocols { Relay = "relay", @@ -15,7 +15,6 @@ export enum Protocols { } export type IBaseProtocolCore = { - shardInfo?: ShardInfo; multicodec: string; peerStore: PeerStore; allPeers: () => Promise; @@ -30,18 +29,7 @@ export type IBaseProtocolSDK = { readonly numPeersToUse: number; }; -export type ContentTopicInfo = { - clusterId?: number; - contentTopics: string[]; -}; - -export type ApplicationInfo = { - clusterId: number; - application: string; - version: string; -}; - -export type ShardingParams = ShardInfo | ContentTopicInfo | ApplicationInfo; +export type NetworkConfig = StaticSharding | AutoSharding; //TODO: merge this with ProtocolCreateOptions or establish distinction: https://github.com/waku-org/js-waku/issues/2048 /** @@ -72,38 +60,35 @@ export type ProtocolUseOptions = { export type ProtocolCreateOptions = { /** - * @deprecated - * Should be used ONLY if some other than The Waku Network is in use. + * Configuration for determining the network in use. * - * See [Waku v2 Topic Usage Recommendations](https://github.com/vacp2p/rfc-index/blob/main/waku/informational/23/topics.md#pubsub-topics) for details. - * - * This is used by: - * - WakuRelay to receive, route and send messages, - * - WakuLightPush to send messages, - * - WakuStore to retrieve messages. - * - * If no pubsub topic is specified, the default pubsub topic will be determined from DefaultShardInfo. - * - * You cannot add or remove pubsub topics after initialization of the node. - */ - pubsubTopics?: PubsubTopic[]; - /** - * ShardInfo is used to determine which network is in use. - * Defaults to {@link @waku/interfaces!DefaultShardInfo}. - * Default value is configured for The Waku Network - * - * The format to specify a shard is: - * clusterId: number, shards: number[] + * If using Static Sharding: + * Default value is configured for The Waku Network. + * The format to specify a shard is: clusterId: number, shards: number[] * To learn more about the sharding specification, see [Relay Sharding](https://rfc.vac.dev/spec/51/). - */ - shardInfo?: Partial; - /** - * Content topics are used to determine network in use. - * See [Waku v2 Topic Usage Recommendations](https://github.com/vacp2p/rfc-index/blob/main/waku/informational/23/topics.md#content-topics) for details. * + * If using Auto Sharding: + * See [Waku v2 Topic Usage Recommendations](https://github.com/vacp2p/rfc-index/blob/main/waku/informational/23/topics.md#content-topics) for details. * You cannot add or remove content topics after initialization of the node. */ - contentTopics?: string[]; + /** + * Configuration for determining the network in use. + * Network configuration refers to the shards and clusters used in the network. + * + * If using Static Sharding: + * Cluster ID and shards are specified in the format: clusterId: number, shards: number[] + * The default value is configured for The Waku Network => clusterId: 0, shards: [0, 1, 2, 3, 4, 5, 6, 7] + * To learn more about the sharding specification, see [Relay Sharding](https://rfc.vac.dev/spec/51/). + * + * If using Auto Sharding: + * Cluster ID and content topics are specified in the format: clusterId: number, contentTopics: string[] + * Content topics are used to determine the shards to be configured for the network. + * Cluster ID is optional, and defaults to The Waku Network's cluster ID => 0 + * To specify content topics, see [Waku v2 Topic Usage Recommendations](https://github.com/vacp2p/rfc-index/blob/main/waku/informational/23/topics.md#content-topics) for details + * + * @default { clusterId: 1, shards: [0, 1, 2, 3, 4, 5, 6, 7] } + */ + networkConfig?: NetworkConfig; /** * You can pass options to the `Libp2p` instance used by {@link @waku/sdk!WakuNode} using the `libp2p` property. * This property is the same type as the one passed to [`Libp2p.create`](https://github.com/libp2p/js-libp2p/blob/master/doc/API.md#create) diff --git a/packages/interfaces/src/sharding.ts b/packages/interfaces/src/sharding.ts new file mode 100644 index 0000000000..110bf0e74f --- /dev/null +++ b/packages/interfaces/src/sharding.ts @@ -0,0 +1,12 @@ +export type ShardInfo = { + clusterId: number; + shards: number[]; +}; + +export type ContentTopicInfo = { + clusterId?: number; + contentTopics: string[]; +}; + +export type StaticSharding = ShardInfo; +export type AutoSharding = ContentTopicInfo; diff --git a/packages/sdk/src/create/create.ts b/packages/sdk/src/create/create.ts index 8b1ca9562d..4e0b3706c5 100644 --- a/packages/sdk/src/create/create.ts +++ b/packages/sdk/src/create/create.ts @@ -1,6 +1,6 @@ import { type LightNode } from "@waku/interfaces"; -import { CreateWakuNodeOptions, WakuNode, WakuOptions } from "../waku.js"; +import { CreateWakuNodeOptions, WakuNode } from "../waku.js"; import { createLibp2pAndUpdateOptions } from "./libp2p.js"; @@ -12,9 +12,9 @@ import { createLibp2pAndUpdateOptions } from "./libp2p.js"; export async function createLightNode( options: CreateWakuNodeOptions = {} ): Promise { - const libp2p = await createLibp2pAndUpdateOptions(options); + const { libp2p, pubsubTopics } = await createLibp2pAndUpdateOptions(options); - return new WakuNode(options as WakuOptions, libp2p, { + return new WakuNode(pubsubTopics, options, libp2p, { store: true, lightpush: true, filter: true diff --git a/packages/sdk/src/create/libp2p.ts b/packages/sdk/src/create/libp2p.ts index 4e0445fa86..ba66dd0b29 100644 --- a/packages/sdk/src/create/libp2p.ts +++ b/packages/sdk/src/create/libp2p.ts @@ -9,14 +9,14 @@ import { all as filterAll, wss } from "@libp2p/websockets/filters"; import { wakuMetadata } from "@waku/core"; import { type CreateLibp2pOptions, - DefaultShardInfo, + DefaultNetworkConfig, type IMetadata, type Libp2p, type Libp2pComponents, - type ShardInfo + PubsubTopic } from "@waku/interfaces"; import { wakuGossipSub } from "@waku/relay"; -import { ensureShardingConfigured, Logger } from "@waku/utils"; +import { derivePubsubTopicsFromNetworkConfig, Logger } from "@waku/utils"; import { createLibp2p } from "libp2p"; import { @@ -35,10 +35,10 @@ type MetadataService = { metadata?: (components: Libp2pComponents) => IMetadata; }; -const logger = new Logger("sdk:create"); +const log = new Logger("sdk:create"); export async function defaultLibp2p( - shardInfo?: ShardInfo, + pubsubTopics: PubsubTopic[], wakuGossipSub?: PubsubService["pubsub"], options?: Partial, userAgent?: string @@ -60,8 +60,8 @@ export async function defaultLibp2p( ? { pubsub: wakuGossipSub } : {}; - const metadataService: MetadataService = shardInfo - ? { metadata: wakuMetadata(shardInfo) } + const metadataService: MetadataService = pubsubTopics + ? { metadata: wakuMetadata(pubsubTopics) } : {}; const filter = process?.env?.NODE_ENV === "test" ? filterAll : wss; @@ -91,14 +91,18 @@ export async function defaultLibp2p( export async function createLibp2pAndUpdateOptions( options: CreateWakuNodeOptions -): Promise { - const shardInfo = configureNetworkOptions(options); +): Promise<{ libp2p: Libp2p; pubsubTopics: PubsubTopic[] }> { + const { networkConfig } = options; + const pubsubTopics = derivePubsubTopicsFromNetworkConfig( + networkConfig ?? DefaultNetworkConfig + ); + log.info("Creating Waku node with pubsub topics", pubsubTopics); const libp2pOptions = options?.libp2p ?? {}; const peerDiscovery = libp2pOptions.peerDiscovery ?? []; if (options?.defaultBootstrap) { - peerDiscovery.push(...defaultPeerDiscoveries(options.pubsubTopics!)); + peerDiscovery.push(...defaultPeerDiscoveries(pubsubTopics)); } if (options?.bootstrapPeers) { @@ -108,64 +112,11 @@ export async function createLibp2pAndUpdateOptions( libp2pOptions.peerDiscovery = peerDiscovery; const libp2p = await defaultLibp2p( - shardInfo, + pubsubTopics, wakuGossipSub(options), libp2pOptions, options?.userAgent ); - return libp2p; -} - -function configureNetworkOptions( - options: CreateWakuNodeOptions -): ShardInfo | undefined { - const flags = [ - options.contentTopics, - options.pubsubTopics, - options.shardInfo - ].filter((v) => !!v); - - if (flags.length > 1) { - throw Error( - "Too many network configurations provided. Pass only one of: pubsubTopic, contentTopics or shardInfo." - ); - } - - logWhichShardInfoIsUsed(options); - - if (options.contentTopics) { - options.shardInfo = { contentTopics: options.contentTopics }; - } - - if (!options.shardInfo) { - options.shardInfo = DefaultShardInfo; - } - - const shardInfo = options.shardInfo - ? ensureShardingConfigured(options.shardInfo) - : undefined; - - options.pubsubTopics = options.pubsubTopics ?? shardInfo?.pubsubTopics; - - return shardInfo?.shardInfo; -} - -function logWhichShardInfoIsUsed(options: CreateWakuNodeOptions): void { - if (options.pubsubTopics) { - logger.info("Using pubsubTopics array to bootstrap the node."); - return; - } - - if (options.contentTopics) { - logger.info( - "Using contentTopics and default cluster ID (1) to bootstrap the node." - ); - return; - } - - if (options.shardInfo) { - logger.info("Using shardInfo parameters to bootstrap the node."); - return; - } + return { libp2p, pubsubTopics }; } diff --git a/packages/sdk/src/protocols/filter.ts b/packages/sdk/src/protocols/filter.ts index 36a3d6f820..18f1dc7521 100644 --- a/packages/sdk/src/protocols/filter.ts +++ b/packages/sdk/src/protocols/filter.ts @@ -13,13 +13,13 @@ import { type IProtoMessage, type ISubscriptionSDK, type Libp2p, + NetworkConfig, type PeerIdStr, type ProtocolCreateOptions, ProtocolError, type ProtocolUseOptions, type PubsubTopic, type SDKProtocolResult, - type ShardingParams, type SubscribeOptions, SubscribeResult, type Unsubscribe @@ -437,8 +437,8 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK { await subscription.processIncomingMessage(wakuMessage, peerIdStr); }, - libp2p, - options + connectionManager.configuredPubsubTopics, + libp2p ), connectionManager, { numPeersToUse: options?.numPeersToUse } @@ -541,7 +541,7 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK { * @returns The subscription object. */ private async createSubscription( - pubsubTopicShardInfo: ShardingParams | PubsubTopic, + pubsubTopicShardInfo: NetworkConfig | PubsubTopic, options?: ProtocolUseOptions ): Promise { options = { diff --git a/packages/sdk/src/protocols/light_push.ts b/packages/sdk/src/protocols/light_push.ts index a2794b4371..1a49fa66f6 100644 --- a/packages/sdk/src/protocols/light_push.ts +++ b/packages/sdk/src/protocols/light_push.ts @@ -25,9 +25,13 @@ class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK { libp2p: Libp2p, options?: ProtocolCreateOptions ) { - super(new LightPushCore(libp2p, options), connectionManager, { - numPeersToUse: options?.numPeersToUse - }); + super( + new LightPushCore(connectionManager.configuredPubsubTopics, libp2p), + connectionManager, + { + numPeersToUse: options?.numPeersToUse + } + ); this.protocol = this.core as LightPushCore; } diff --git a/packages/sdk/src/protocols/store.ts b/packages/sdk/src/protocols/store.ts index c84e5e87f0..0390c8a40f 100644 --- a/packages/sdk/src/protocols/store.ts +++ b/packages/sdk/src/protocols/store.ts @@ -4,7 +4,6 @@ import { IDecoder, IStoreSDK, Libp2p, - ProtocolCreateOptions, QueryRequestParams, StoreCursor } from "@waku/interfaces"; @@ -24,14 +23,14 @@ const log = new Logger("waku:store:sdk"); export class StoreSDK extends BaseProtocolSDK implements IStoreSDK { public readonly protocol: StoreCore; - public constructor( - connectionManager: ConnectionManager, - libp2p: Libp2p, - options?: ProtocolCreateOptions - ) { - super(new StoreCore(libp2p, options), connectionManager, { - numPeersToUse: DEFAULT_NUM_PEERS - }); + public constructor(connectionManager: ConnectionManager, libp2p: Libp2p) { + super( + new StoreCore(connectionManager.configuredPubsubTopics, libp2p), + connectionManager, + { + numPeersToUse: DEFAULT_NUM_PEERS + } + ); this.protocol = this.core as StoreCore; } @@ -238,10 +237,9 @@ export class StoreSDK extends BaseProtocolSDK implements IStoreSDK { * @returns A function that takes a Libp2p instance and returns a StoreSDK instance. */ export function wakuStore( - connectionManager: ConnectionManager, - init: Partial = {} + connectionManager: ConnectionManager ): (libp2p: Libp2p) => IStoreSDK { return (libp2p: Libp2p) => { - return new StoreSDK(connectionManager, libp2p, init); + return new StoreSDK(connectionManager, libp2p); }; } diff --git a/packages/sdk/src/relay-node/index.ts b/packages/sdk/src/relay-node/index.ts index f30eb27113..4a5de91494 100644 --- a/packages/sdk/src/relay-node/index.ts +++ b/packages/sdk/src/relay-node/index.ts @@ -15,13 +15,11 @@ import { CreateWakuNodeOptions, WakuNode, WakuOptions } from "../waku.js"; * or use this function with caution. */ export async function createRelayNode( - options: CreateWakuNodeOptions & Partial = { - pubsubTopics: [] - } + options: CreateWakuNodeOptions & Partial ): Promise { - const libp2p = await createLibp2pAndUpdateOptions(options); + const { libp2p, pubsubTopics } = await createLibp2pAndUpdateOptions(options); - return new WakuNode(options as WakuOptions, libp2p, { + return new WakuNode(pubsubTopics, options as WakuOptions, libp2p, { relay: true }) as RelayNode; } @@ -40,13 +38,11 @@ export async function createRelayNode( * @internal */ export async function createFullNode( - options: CreateWakuNodeOptions & Partial = { - pubsubTopics: [] - } + options: CreateWakuNodeOptions & Partial ): Promise { - const libp2p = await createLibp2pAndUpdateOptions(options); + const { libp2p, pubsubTopics } = await createLibp2pAndUpdateOptions(options); - return new WakuNode(options as WakuOptions, libp2p, { + return new WakuNode(pubsubTopics, options as WakuOptions, libp2p, { filter: true, lightpush: true, relay: true, diff --git a/packages/sdk/src/waku.ts b/packages/sdk/src/waku.ts index 95977419f4..ae79a71849 100644 --- a/packages/sdk/src/waku.ts +++ b/packages/sdk/src/waku.ts @@ -48,7 +48,6 @@ export interface WakuOptions { * @default {@link @waku/core.DefaultUserAgent} */ userAgent?: string; - pubsubTopics: PubsubTopic[]; } export type CreateWakuNodeOptions = ProtocolCreateOptions & @@ -68,19 +67,14 @@ export class WakuNode implements Waku { public filter?: IFilterSDK; public lightPush?: ILightPushSDK; public connectionManager: ConnectionManager; - public readonly pubsubTopics: PubsubTopic[]; public readonly health: IHealthManager; public constructor( + public readonly pubsubTopics: PubsubTopic[], options: WakuOptions, libp2p: Libp2p, protocolsEnabled: ProtocolsEnabled ) { - if (options.pubsubTopics.length == 0) { - throw new Error("At least one pubsub topic must be provided"); - } - this.pubsubTopics = options.pubsubTopics; - this.libp2p = libp2p; protocolsEnabled = { @@ -110,17 +104,17 @@ export class WakuNode implements Waku { this.health = getHealthManager(); if (protocolsEnabled.store) { - const store = wakuStore(this.connectionManager, options); + const store = wakuStore(this.connectionManager); this.store = store(libp2p); } if (protocolsEnabled.lightpush) { - const lightPush = wakuLightPush(this.connectionManager, options); + const lightPush = wakuLightPush(this.connectionManager); this.lightPush = lightPush(libp2p); } if (protocolsEnabled.filter) { - const filter = wakuFilter(this.connectionManager, options); + const filter = wakuFilter(this.connectionManager); this.filter = filter(libp2p); } diff --git a/packages/tests/src/lib/index.ts b/packages/tests/src/lib/index.ts index 28b6f11b79..c123a88c4d 100644 --- a/packages/tests/src/lib/index.ts +++ b/packages/tests/src/lib/index.ts @@ -1,6 +1,6 @@ import { DecodedMessage } from "@waku/core"; -import { PubsubTopic, ShardingParams } from "@waku/interfaces"; -import { ensureShardingConfigured, Logger } from "@waku/utils"; +import { NetworkConfig } from "@waku/interfaces"; +import { derivePubsubTopicsFromNetworkConfig, Logger } from "@waku/utils"; import { expect } from "chai"; import { DefaultTestPubsubTopic } from "../constants"; @@ -23,10 +23,9 @@ const log = new Logger("test:message-collector"); export class ServiceNodesFleet { public static async createAndRun( mochaContext: Mocha.Context, - pubsubTopics: PubsubTopic[], nodesToCreate: number = 3, strictChecking: boolean = false, - shardInfo?: ShardingParams, + networkConfig: NetworkConfig, _args?: Args, withoutFilter = false ): Promise { @@ -38,10 +37,7 @@ export class ServiceNodesFleet { Math.random().toString(36).substring(7) ); - shardInfo = shardInfo - ? ensureShardingConfigured(shardInfo).shardInfo - : undefined; - const args = getArgs(pubsubTopics, shardInfo, _args); + const args = getArgs(networkConfig, _args); await node.start(args, { retries: 3 }); @@ -266,11 +262,8 @@ class MultipleNodesMessageCollector { } } -function getArgs( - pubsubTopics: PubsubTopic[], - shardInfo?: ShardingParams, - args?: Args -): Args { +function getArgs(networkConfig: NetworkConfig, args?: Args): Args { + const pubsubTopics = derivePubsubTopicsFromNetworkConfig(networkConfig); const defaultArgs = { lightpush: true, filter: true, @@ -278,7 +271,7 @@ function getArgs( peerExchange: true, relay: true, pubsubTopic: pubsubTopics, - ...(shardInfo && { clusterId: shardInfo.clusterId }) + clusterId: networkConfig.clusterId } as Args; return { ...defaultArgs, ...args }; diff --git a/packages/tests/src/lib/runNodes.ts b/packages/tests/src/lib/runNodes.ts index 908b4e274d..d5509d7f32 100644 --- a/packages/tests/src/lib/runNodes.ts +++ b/packages/tests/src/lib/runNodes.ts @@ -1,13 +1,16 @@ import { waitForRemotePeer } from "@waku/core"; import { - ContentTopicInfo, + NetworkConfig, ProtocolCreateOptions, - Protocols, - ShardingParams + Protocols } from "@waku/interfaces"; import { createLightNode, WakuNode } from "@waku/sdk"; import { createRelayNode } from "@waku/sdk/relay"; -import { Logger, shardInfoToPubsubTopics } from "@waku/utils"; +import { + derivePubsubTopicsFromNetworkConfig, + Logger, + pubsubTopicsToShardInfo +} from "@waku/utils"; import { Context } from "mocha"; import { NOISE_KEY_1 } from "../constants.js"; @@ -19,7 +22,7 @@ export const log = new Logger("test:runNodes"); type RunNodesOptions = { context: Context; - shardInfo: ShardingParams; + networkConfig: NetworkConfig; protocols: Protocols[]; createNode: typeof createLightNode | typeof createRelayNode; }; @@ -27,14 +30,11 @@ type RunNodesOptions = { export async function runNodes( options: RunNodesOptions ): Promise<[ServiceNode, T]> { - const { context, shardInfo, createNode, protocols } = options; + const { context, networkConfig, createNode, protocols } = options; const nwaku = new ServiceNode(makeLogFileName(context)); - const pubsubTopics = shardInfoToPubsubTopics(shardInfo); - - function isContentTopicInfo(info: ShardingParams): info is ContentTopicInfo { - return (info as ContentTopicInfo).contentTopics !== undefined; - } + const pubsubTopics = derivePubsubTopicsFromNetworkConfig(networkConfig); + const shardInfo = pubsubTopicsToShardInfo(pubsubTopics); await nwaku.start( { @@ -43,19 +43,14 @@ export async function runNodes( relay: true, store: true, pubsubTopic: pubsubTopics, - // Conditionally include clusterId if shardInfo exists - ...(shardInfo && { clusterId: shardInfo.clusterId }), - // Conditionally include contentTopic if shardInfo exists and clusterId is 1 - ...(shardInfo && - isContentTopicInfo(shardInfo) && - shardInfo.clusterId === 1 && { contentTopic: shardInfo.contentTopics }) + clusterId: shardInfo.clusterId }, { retries: 3 } ); const waku_options: ProtocolCreateOptions = { staticNoiseKey: NOISE_KEY_1, libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }, - shardInfo + networkConfig: shardInfo }; log.info("Starting js waku node with :", JSON.stringify(waku_options)); diff --git a/packages/tests/src/utils/nodes.ts b/packages/tests/src/utils/nodes.ts index 7d11ef8e12..511694a5d6 100644 --- a/packages/tests/src/utils/nodes.ts +++ b/packages/tests/src/utils/nodes.ts @@ -1,17 +1,18 @@ import { waitForRemotePeer } from "@waku/core"; import { + DefaultNetworkConfig, LightNode, + NetworkConfig, ProtocolCreateOptions, Protocols, - ShardingParams, Waku } from "@waku/interfaces"; import { createLightNode } from "@waku/sdk"; -import { isDefined, shardInfoToPubsubTopics } from "@waku/utils"; +import { derivePubsubTopicsFromNetworkConfig, isDefined } from "@waku/utils"; import { Context } from "mocha"; import pRetry from "p-retry"; -import { DefaultTestPubsubTopic, NOISE_KEY_1 } from "../constants"; +import { NOISE_KEY_1 } from "../constants"; import { ServiceNodesFleet } from "../lib"; import { Args } from "../types"; @@ -19,22 +20,18 @@ import { waitForConnections } from "./waitForConnections"; export async function runMultipleNodes( context: Context, - shardInfo?: ShardingParams, + networkConfig: NetworkConfig = DefaultNetworkConfig, customArgs?: Args, strictChecking: boolean = false, numServiceNodes = 3, withoutFilter = false ): Promise<[ServiceNodesFleet, LightNode]> { - const pubsubTopics = shardInfo - ? shardInfoToPubsubTopics(shardInfo) - : [DefaultTestPubsubTopic]; // create numServiceNodes nodes const serviceNodes = await ServiceNodesFleet.createAndRun( context, - pubsubTopics, numServiceNodes, strictChecking, - shardInfo, + networkConfig, customArgs, withoutFilter ); @@ -43,15 +40,10 @@ export async function runMultipleNodes( staticNoiseKey: NOISE_KEY_1, libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } - } + }, + networkConfig }; - if (shardInfo) { - wakuOptions.shardInfo = shardInfo; - } else { - wakuOptions.pubsubTopics = pubsubTopics; - } - const waku = await createLightNode(wakuOptions); await waku.start(); @@ -68,7 +60,9 @@ export async function runMultipleNodes( !customArgs?.lightpush ? undefined : Protocols.LightPush ].filter(isDefined) ); - await node.ensureSubscriptions(pubsubTopics); + await node.ensureSubscriptions( + derivePubsubTopicsFromNetworkConfig(networkConfig) + ); const wakuConnections = waku.libp2p.getConnections(); const nodePeers = await node.peers(); diff --git a/packages/tests/tests/connection-mananger/connection_state.spec.ts b/packages/tests/tests/connection-mananger/connection_state.spec.ts index c1c7e7b351..2764c36dc8 100644 --- a/packages/tests/tests/connection-mananger/connection_state.spec.ts +++ b/packages/tests/tests/connection-mananger/connection_state.spec.ts @@ -29,7 +29,11 @@ describe("Connection state", function () { let nwaku2PeerId: Multiaddr; beforeEachCustom(this, async () => { - waku = await createLightNode({ shardInfo: DefaultTestShardInfo }); + try { + waku = await createLightNode({ networkConfig: DefaultTestShardInfo }); + } catch (error) { + console.error(error); + } nwaku1 = new ServiceNode(makeLogFileName(this.ctx) + "1"); nwaku2 = new ServiceNode(makeLogFileName(this.ctx) + "2"); await nwaku1.start({ filter: true }); @@ -91,11 +95,11 @@ describe("Connection state", function () { it("`waku:online` between 2 js-waku relay nodes", async function () { const waku1 = await createRelayNode({ staticNoiseKey: NOISE_KEY_1, - shardInfo: DefaultTestShardInfo + networkConfig: DefaultTestShardInfo }); const waku2 = await createRelayNode({ libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }, - shardInfo: DefaultTestShardInfo + networkConfig: DefaultTestShardInfo }); let eventCount1 = 0; diff --git a/packages/tests/tests/connection-mananger/dials.spec.ts b/packages/tests/tests/connection-mananger/dials.spec.ts index a157c7a3c4..babd4f2533 100644 --- a/packages/tests/tests/connection-mananger/dials.spec.ts +++ b/packages/tests/tests/connection-mananger/dials.spec.ts @@ -21,7 +21,7 @@ describe("Dials", function () { let waku: LightNode; beforeEachCustom(this, async () => { - waku = await createLightNode({ shardInfo: { shards: [0] } }); + waku = await createLightNode(); isPeerTopicConfigured = sinon.stub( waku.connectionManager as any, "isPeerTopicConfigured" diff --git a/packages/tests/tests/connection-mananger/events.spec.ts b/packages/tests/tests/connection-mananger/events.spec.ts index 637c355f97..e8f1a5ba63 100644 --- a/packages/tests/tests/connection-mananger/events.spec.ts +++ b/packages/tests/tests/connection-mananger/events.spec.ts @@ -19,7 +19,7 @@ describe("Events", function () { let waku: LightNode; this.timeout(TEST_TIMEOUT); beforeEachCustom(this, async () => { - waku = await createLightNode({ shardInfo: { shards: [0] } }); + waku = await createLightNode(); }); afterEachCustom(this, async () => { diff --git a/packages/tests/tests/connection-mananger/methods.spec.ts b/packages/tests/tests/connection-mananger/methods.spec.ts index 1a2175e1cf..d2bde33f9e 100644 --- a/packages/tests/tests/connection-mananger/methods.spec.ts +++ b/packages/tests/tests/connection-mananger/methods.spec.ts @@ -19,7 +19,7 @@ describe("Public methods", function () { let waku: LightNode; this.timeout(TEST_TIMEOUT); beforeEachCustom(this, async () => { - waku = await createLightNode({ shardInfo: { shards: [0] } }); + waku = await createLightNode(); }); afterEachCustom(this, async () => { diff --git a/packages/tests/tests/enr.node.spec.ts b/packages/tests/tests/enr.node.spec.ts index 1ea6fe5f9b..383a8be946 100644 --- a/packages/tests/tests/enr.node.spec.ts +++ b/packages/tests/tests/enr.node.spec.ts @@ -37,7 +37,7 @@ describe("ENR Interop: ServiceNode", function () { waku = await createRelayNode({ staticNoiseKey: NOISE_KEY_1, - shardInfo: DefaultTestShardInfo + networkConfig: DefaultTestShardInfo }); await waku.start(); await waku.dial(multiAddrWithId); @@ -71,7 +71,7 @@ describe("ENR Interop: ServiceNode", function () { waku = await createRelayNode({ staticNoiseKey: NOISE_KEY_1, - shardInfo: DefaultTestShardInfo + networkConfig: DefaultTestShardInfo }); await waku.start(); await waku.dial(multiAddrWithId); @@ -106,7 +106,7 @@ describe("ENR Interop: ServiceNode", function () { waku = await createRelayNode({ staticNoiseKey: NOISE_KEY_1, - shardInfo: DefaultTestShardInfo + networkConfig: DefaultTestShardInfo }); await waku.start(); await waku.dial(multiAddrWithId); diff --git a/packages/tests/tests/ephemeral.node.spec.ts b/packages/tests/tests/ephemeral.node.spec.ts index 2c11e19c5c..c9a68a110f 100644 --- a/packages/tests/tests/ephemeral.node.spec.ts +++ b/packages/tests/tests/ephemeral.node.spec.ts @@ -107,7 +107,7 @@ describe("Waku Message Ephemeral field", function () { waku = await createLightNode({ staticNoiseKey: NOISE_KEY_1, libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }, - shardInfo: { + networkConfig: { contentTopics: [TestContentTopic, AsymContentTopic, SymContentTopic], clusterId: ClusterId } @@ -142,14 +142,14 @@ describe("Waku Message Ephemeral field", function () { const [waku1, waku2, nimWakuMultiaddr] = await Promise.all([ createLightNode({ staticNoiseKey: NOISE_KEY_1, - shardInfo: { + networkConfig: { contentTopics: [TestContentTopic, AsymContentTopic, SymContentTopic], clusterId: ClusterId } }).then((waku) => waku.start().then(() => waku)), createLightNode({ staticNoiseKey: NOISE_KEY_2, - shardInfo: { + networkConfig: { contentTopics: [TestContentTopic, AsymContentTopic, SymContentTopic], clusterId: ClusterId } diff --git a/packages/tests/tests/filter/ping.node.spec.ts b/packages/tests/tests/filter/ping.node.spec.ts index dcc4c2740b..a88d9874bf 100644 --- a/packages/tests/tests/filter/ping.node.spec.ts +++ b/packages/tests/tests/filter/ping.node.spec.ts @@ -26,7 +26,11 @@ const runTests = (strictCheckNodes: boolean): void => { let serviceNodes: ServiceNodesFleet; beforeEachCustom(this, async () => { - [serviceNodes, waku] = await runMultipleNodes(this.ctx, TestShardInfo); + try { + [serviceNodes, waku] = await runMultipleNodes(this.ctx, TestShardInfo); + } catch (error) { + console.error(error); + } }); afterEachCustom(this, async () => { diff --git a/packages/tests/tests/filter/single_node/utils.ts b/packages/tests/tests/filter/single_node/utils.ts index da2bf7b328..0280bcb5d2 100644 --- a/packages/tests/tests/filter/single_node/utils.ts +++ b/packages/tests/tests/filter/single_node/utils.ts @@ -1,4 +1,4 @@ -import { LightNode, Protocols, ShardingParams } from "@waku/interfaces"; +import { LightNode, NetworkConfig, Protocols } from "@waku/interfaces"; import { createLightNode } from "@waku/sdk"; import { Logger } from "@waku/utils"; import { Context } from "mocha"; @@ -12,11 +12,11 @@ export const log = new Logger("test:filter:single_node"); export const runNodes = ( context: Context, - shardInfo: ShardingParams + shardInfo: NetworkConfig ): Promise<[ServiceNode, LightNode]> => runNodesBuilder({ context, createNode: createLightNode, protocols: [Protocols.LightPush, Protocols.Filter], - shardInfo + networkConfig: shardInfo }); diff --git a/packages/tests/tests/filter/utils.ts b/packages/tests/tests/filter/utils.ts index 930f8b4afb..1976b5e075 100644 --- a/packages/tests/tests/filter/utils.ts +++ b/packages/tests/tests/filter/utils.ts @@ -1,28 +1,24 @@ import { createDecoder, createEncoder, waitForRemotePeer } from "@waku/core"; import { + DefaultNetworkConfig, ISubscriptionSDK, LightNode, + NetworkConfig, ProtocolCreateOptions, Protocols, - ShardingParams, Waku } from "@waku/interfaces"; import { createLightNode } from "@waku/sdk"; import { contentTopicToPubsubTopic, - Logger, - shardInfoToPubsubTopics + derivePubsubTopicsFromNetworkConfig, + Logger } from "@waku/utils"; import { utf8ToBytes } from "@waku/utils/bytes"; import { Context } from "mocha"; import pRetry from "p-retry"; -import { - DefaultTestPubsubTopic, - NOISE_KEY_1, - ServiceNodesFleet, - waitForConnections -} from "../../src"; +import { NOISE_KEY_1, ServiceNodesFleet, waitForConnections } from "../../src"; // Constants for test configuration. export const log = new Logger("test:filter"); @@ -69,21 +65,18 @@ export async function validatePingError( export async function runMultipleNodes( context: Context, - shardInfo?: ShardingParams, + networkConfig: NetworkConfig = DefaultNetworkConfig, strictChecking: boolean = false, numServiceNodes = 3, withoutFilter = false ): Promise<[ServiceNodesFleet, LightNode]> { - const pubsubTopics = shardInfo - ? shardInfoToPubsubTopics(shardInfo) - : [DefaultTestPubsubTopic]; + const pubsubTopics = derivePubsubTopicsFromNetworkConfig(networkConfig); // create numServiceNodes nodes const serviceNodes = await ServiceNodesFleet.createAndRun( context, - pubsubTopics, numServiceNodes, strictChecking, - shardInfo, + networkConfig, undefined, withoutFilter ); @@ -95,12 +88,6 @@ export async function runMultipleNodes( } }; - if (shardInfo) { - wakuOptions.shardInfo = shardInfo; - } else { - wakuOptions.pubsubTopics = pubsubTopics; - } - log.info("Starting js waku node with :", JSON.stringify(wakuOptions)); let waku: LightNode | undefined; try { diff --git a/packages/tests/tests/getPeers.spec.ts b/packages/tests/tests/getPeers.spec.ts index f8c2a4d6c5..252340a5bf 100644 --- a/packages/tests/tests/getPeers.spec.ts +++ b/packages/tests/tests/getPeers.spec.ts @@ -66,7 +66,7 @@ describe("getConnectedPeersForProtocolAndShard", function () { const serviceNodeMa = await serviceNode1.getMultiaddrWithId(); - waku = await createLightNode({ shardInfo }); + waku = await createLightNode({ networkConfig: shardInfo }); await waku.start(); await waku.libp2p.dialProtocol(serviceNodeMa, LightPushCodec); await waitForRemotePeer(waku, [Protocols.LightPush]); @@ -115,7 +115,7 @@ describe("getConnectedPeersForProtocolAndShard", function () { const serviceNode1Ma = await serviceNode1.getMultiaddrWithId(); const serviceNode2Ma = await serviceNode2.getMultiaddrWithId(); - waku = await createLightNode({ shardInfo: shardInfo2 }); + waku = await createLightNode({ networkConfig: shardInfo2 }); await waku.libp2p.dialProtocol(serviceNode1Ma, LightPushCodec); await waku.libp2p.dialProtocol(serviceNode2Ma, LightPushCodec); await waku.start(); @@ -166,7 +166,7 @@ describe("getConnectedPeersForProtocolAndShard", function () { const serviceNode1Ma = await serviceNode1.getMultiaddrWithId(); const serviceNode2Ma = await serviceNode2.getMultiaddrWithId(); - waku = await createLightNode({ shardInfo: shardInfo2 }); + waku = await createLightNode({ networkConfig: shardInfo2 }); await waku.libp2p.dialProtocol(serviceNode1Ma, LightPushCodec); await delay(500); await waku.libp2p.dialProtocol(serviceNode2Ma, LightPushCodec); @@ -220,7 +220,7 @@ describe("getConnectedPeersForProtocolAndShard", function () { const serviceNodeMa1 = await serviceNode1.getMultiaddrWithId(); const serviceNodeMa2 = await serviceNode2.getMultiaddrWithId(); - waku = await createLightNode({ shardInfo: shardInfo2 }); + waku = await createLightNode({ networkConfig: shardInfo2 }); await waku.libp2p.dialProtocol(serviceNodeMa1, LightPushCodec); await delay(500); await waku.libp2p.dialProtocol(serviceNodeMa2, LightPushCodec); @@ -256,7 +256,7 @@ describe("getConnectedPeersForProtocolAndShard", function () { const serviceNodeMa = await serviceNode1.getMultiaddrWithId(); - waku = await createLightNode({ shardInfo }); + waku = await createLightNode({ networkConfig: shardInfo }); await waku.start(); await waku.libp2p.dialProtocol(serviceNodeMa, LightPushCodec); await waitForRemotePeer(waku, [Protocols.LightPush]); @@ -307,7 +307,7 @@ describe("getConnectedPeersForProtocolAndShard", function () { const serviceNode1Ma = await serviceNode1.getMultiaddrWithId(); const serviceNode2Ma = await serviceNode2.getMultiaddrWithId(); - waku = await createLightNode({ shardInfo: shardInfo2 }); + waku = await createLightNode({ networkConfig: shardInfo2 }); await waku.libp2p.dialProtocol(serviceNode1Ma, LightPushCodec); await waku.libp2p.dialProtocol(serviceNode2Ma, LightPushCodec); @@ -360,7 +360,7 @@ describe("getConnectedPeersForProtocolAndShard", function () { const serviceNode1Ma = await serviceNode1.getMultiaddrWithId(); const serviceNode2Ma = await serviceNode2.getMultiaddrWithId(); - waku = await createLightNode({ shardInfo: shardInfo2 }); + waku = await createLightNode({ networkConfig: shardInfo2 }); await waku.libp2p.dialProtocol(serviceNode1Ma, LightPushCodec); await delay(500); await waku.libp2p.dialProtocol(serviceNode2Ma, LightPushCodec); @@ -415,7 +415,7 @@ describe("getConnectedPeersForProtocolAndShard", function () { const serviceNodeMa1 = await serviceNode1.getMultiaddrWithId(); const serviceNodeMa2 = await serviceNode2.getMultiaddrWithId(); - waku = await createLightNode({ shardInfo: shardInfo2 }); + waku = await createLightNode({ networkConfig: shardInfo2 }); await waku.libp2p.dialProtocol(serviceNodeMa1, LightPushCodec); await delay(500); await waku.libp2p.dialProtocol(serviceNodeMa2, LightPushCodec); @@ -454,7 +454,7 @@ describe("getPeers", function () { let allPeers: Peer[]; beforeEachCustom(this, async () => { - waku = await createLightNode({ shardInfo: DefaultTestShardInfo }); + waku = await createLightNode({ networkConfig: DefaultTestShardInfo }); peerStore = waku.libp2p.peerStore; connectionManager = waku.libp2p.components.connectionManager; diff --git a/packages/tests/tests/health-manager/node.spec.ts b/packages/tests/tests/health-manager/node.spec.ts index 1eaebaab68..decd505cc6 100644 --- a/packages/tests/tests/health-manager/node.spec.ts +++ b/packages/tests/tests/health-manager/node.spec.ts @@ -133,7 +133,7 @@ async function setupTestEnvironment( ); serviceNodes.push(...serviceNodesFleet.nodes); } else { - waku = await createLightNode({ shardInfo: TestShardInfo }); + waku = await createLightNode({ networkConfig: TestShardInfo }); } // Create additional LightPush nodes if needed diff --git a/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts b/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts index b971f9b12c..4fabf1919f 100644 --- a/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts +++ b/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts @@ -3,9 +3,9 @@ import { createEncoder, waitForRemotePeer } from "@waku/core"; import { ContentTopicInfo, LightNode, + NetworkConfig, Protocols, ShardInfo, - ShardingParams, SingleShardInfo } from "@waku/interfaces"; import { @@ -344,7 +344,7 @@ describe("Waku Light Push (named sharding): Multiple PubsubTopics", function () pubsubTopicShardInfo: shardInfo2 }); - const testShardInfo: ShardingParams = { + const testShardInfo: NetworkConfig = { clusterId, shards: [ contentTopicToShardIndex(customContentTopic1), diff --git a/packages/tests/tests/metadata.spec.ts b/packages/tests/tests/metadata.spec.ts index 95a5cb4102..98099873e8 100644 --- a/packages/tests/tests/metadata.spec.ts +++ b/packages/tests/tests/metadata.spec.ts @@ -48,7 +48,7 @@ describe("Metadata Protocol", function () { const nwaku1Ma = await nwaku1.getMultiaddrWithId(); const nwaku1PeerId = await nwaku1.getPeerId(); - waku = await createLightNode({ shardInfo }); + waku = await createLightNode({ networkConfig: shardInfo }); await waku.start(); await waku.libp2p.dialProtocol(nwaku1Ma, MetadataCodec); @@ -95,7 +95,7 @@ describe("Metadata Protocol", function () { const nwaku1Ma = await nwaku1.getMultiaddrWithId(); const nwaku1PeerId = await nwaku1.getPeerId(); - waku = await createLightNode({ shardInfo: shardInfo2 }); + waku = await createLightNode({ networkConfig: shardInfo2 }); await waku.start(); await waku.libp2p.dialProtocol(nwaku1Ma, MetadataCodec); @@ -141,7 +141,7 @@ describe("Metadata Protocol", function () { const nwaku1Ma = await nwaku1.getMultiaddrWithId(); - waku = await createLightNode({ shardInfo: shardInfo2 }); + waku = await createLightNode({ networkConfig: shardInfo2 }); await waku.start(); await waku.libp2p.dialProtocol(nwaku1Ma, MetadataCodec); @@ -179,7 +179,7 @@ describe("Metadata Protocol", function () { const nwaku1Ma = await nwaku1.getMultiaddrWithId(); - waku = await createLightNode({ shardInfo: shardInfo2 }); + waku = await createLightNode({ networkConfig: shardInfo2 }); await waku.start(); await waku.libp2p.dialProtocol(nwaku1Ma, MetadataCodec); @@ -215,7 +215,7 @@ describe("Metadata Protocol", function () { const nwaku1Ma = await nwaku1.getMultiaddrWithId(); const nwaku1PeerId = await nwaku1.getPeerId(); - waku = await createLightNode({ shardInfo }); + waku = await createLightNode({ networkConfig: shardInfo }); await waku.start(); await waku.libp2p.dialProtocol(nwaku1Ma, MetadataCodec); @@ -251,7 +251,10 @@ describe("Metadata Protocol", function () { const nwaku1Ma = await nwaku1.getMultiaddrWithId(); const nwaku1PeerId = await nwaku1.getPeerId(); - waku = await createLightNode({ shardInfo, pingKeepAlive: 1 }); + waku = await createLightNode({ + networkConfig: shardInfo, + pingKeepAlive: 1 + }); await waku.start(); await waku.libp2p.dialProtocol(nwaku1Ma, MetadataCodec); diff --git a/packages/tests/tests/peer-exchange/compliance.spec.ts b/packages/tests/tests/peer-exchange/compliance.spec.ts index cd30097ae0..a7e43bdb24 100644 --- a/packages/tests/tests/peer-exchange/compliance.spec.ts +++ b/packages/tests/tests/peer-exchange/compliance.spec.ts @@ -41,7 +41,7 @@ describe("Peer Exchange", function () { tests({ async setup() { - waku = await createLightNode({ shardInfo: DefaultTestShardInfo }); + waku = await createLightNode({ networkConfig: DefaultTestShardInfo }); await waku.start(); const nwaku2Ma = await nwaku2.getMultiaddrWithId(); diff --git a/packages/tests/tests/peer-exchange/index.spec.ts b/packages/tests/tests/peer-exchange/index.spec.ts index 101dc6b2da..9cd019e1a9 100644 --- a/packages/tests/tests/peer-exchange/index.spec.ts +++ b/packages/tests/tests/peer-exchange/index.spec.ts @@ -53,7 +53,7 @@ describe("Peer Exchange", function () { it("getPeersByDiscovery", async function () { waku = await createLightNode({ - shardInfo: DefaultTestShardInfo, + networkConfig: DefaultTestShardInfo, libp2p: { peerDiscovery: [ bootstrap({ list: [(await nwaku2.getMultiaddrWithId()).toString()] }), diff --git a/packages/tests/tests/peer-exchange/pe.optional.spec.ts b/packages/tests/tests/peer-exchange/pe.optional.spec.ts index 01f9d40da1..608d5f5409 100644 --- a/packages/tests/tests/peer-exchange/pe.optional.spec.ts +++ b/packages/tests/tests/peer-exchange/pe.optional.spec.ts @@ -49,7 +49,7 @@ describe("Peer Exchange", () => { wakuPeerExchangeDiscovery([pubsubTopic]) ] }, - shardInfo: shardInfo + networkConfig: shardInfo }); await waku.start(); diff --git a/packages/tests/tests/relay/interop.node.spec.ts b/packages/tests/tests/relay/interop.node.spec.ts index ed6128bf52..e071cf830e 100644 --- a/packages/tests/tests/relay/interop.node.spec.ts +++ b/packages/tests/tests/relay/interop.node.spec.ts @@ -101,7 +101,7 @@ describe("Waku Relay, Interop", function () { const waku2 = await createRelayNode({ staticNoiseKey: NOISE_KEY_2, emitSelf: true, - shardInfo: TestShardInfo + networkConfig: TestShardInfo }); await waku2.start(); diff --git a/packages/tests/tests/relay/multiple_pubsub.node.spec.ts b/packages/tests/tests/relay/multiple_pubsub.node.spec.ts index 1ca29a2411..58dbe9e553 100644 --- a/packages/tests/tests/relay/multiple_pubsub.node.spec.ts +++ b/packages/tests/tests/relay/multiple_pubsub.node.spec.ts @@ -93,16 +93,16 @@ describe("Waku Relay, multiple pubsub topics", function () { [waku1, waku2, waku3] = await Promise.all([ createRelayNode({ - shardInfo: testItem.shardInfo, + networkConfig: testItem.shardInfo, staticNoiseKey: NOISE_KEY_1 }).then((waku) => waku.start().then(() => waku)), createRelayNode({ - shardInfo: testItem.shardInfo, + networkConfig: testItem.shardInfo, staticNoiseKey: NOISE_KEY_2, libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } } }).then((waku) => waku.start().then(() => waku)), createRelayNode({ - shardInfo: testItem.shardInfo, + networkConfig: testItem.shardInfo, staticNoiseKey: NOISE_KEY_3 }).then((waku) => waku.start().then(() => waku)) ]); @@ -200,16 +200,16 @@ describe("Waku Relay, multiple pubsub topics", function () { // Waku1 and waku2 are using multiple pubsub topis [waku1, waku2, waku3] = await Promise.all([ createRelayNode({ - shardInfo: shardInfoBothShards, + networkConfig: shardInfoBothShards, staticNoiseKey: NOISE_KEY_1 }).then((waku) => waku.start().then(() => waku)), createRelayNode({ - shardInfo: shardInfoBothShards, + networkConfig: shardInfoBothShards, staticNoiseKey: NOISE_KEY_2, libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } } }).then((waku) => waku.start().then(() => waku)), createRelayNode({ - shardInfo: shardInfo1, + networkConfig: shardInfo1, staticNoiseKey: NOISE_KEY_3 }).then((waku) => waku.start().then(() => waku)) ]); @@ -269,11 +269,11 @@ describe("Waku Relay, multiple pubsub topics", function () { it("n1 and n2 uses a custom pubsub, n3 uses the default pubsub", async function () { [waku1, waku2, waku3] = await Promise.all([ createRelayNode({ - shardInfo: shardInfo1, + networkConfig: shardInfo1, staticNoiseKey: NOISE_KEY_1 }).then((waku) => waku.start().then(() => waku)), createRelayNode({ - shardInfo: shardInfo1, + networkConfig: shardInfo1, staticNoiseKey: NOISE_KEY_2, libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } } }).then((waku) => waku.start().then(() => waku)), @@ -398,16 +398,16 @@ describe("Waku Relay (Autosharding), multiple pubsub topics", function () { [waku1, waku2, waku3] = await Promise.all([ createRelayNode({ - shardInfo: testItem.shardInfo, + networkConfig: testItem.shardInfo, staticNoiseKey: NOISE_KEY_1 }).then((waku) => waku.start().then(() => waku)), createRelayNode({ - shardInfo: testItem.shardInfo, + networkConfig: testItem.shardInfo, staticNoiseKey: NOISE_KEY_2, libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } } }).then((waku) => waku.start().then(() => waku)), createRelayNode({ - shardInfo: testItem.shardInfo, + networkConfig: testItem.shardInfo, staticNoiseKey: NOISE_KEY_3 }).then((waku) => waku.start().then(() => waku)) ]); @@ -514,16 +514,16 @@ describe("Waku Relay (Autosharding), multiple pubsub topics", function () { // Waku1 and waku2 are using multiple pubsub topis [waku1, waku2, waku3] = await Promise.all([ createRelayNode({ - shardInfo: contentTopicInfoBothShards, + networkConfig: contentTopicInfoBothShards, staticNoiseKey: NOISE_KEY_1 }).then((waku) => waku.start().then(() => waku)), createRelayNode({ - shardInfo: contentTopicInfoBothShards, + networkConfig: contentTopicInfoBothShards, staticNoiseKey: NOISE_KEY_2, libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } } }).then((waku) => waku.start().then(() => waku)), createRelayNode({ - shardInfo: contentTopicInfo1, + networkConfig: contentTopicInfo1, staticNoiseKey: NOISE_KEY_3 }).then((waku) => waku.start().then(() => waku)) ]); @@ -610,11 +610,11 @@ describe("Waku Relay (Autosharding), multiple pubsub topics", function () { it("n1 and n2 uses a custom pubsub, n3 uses the default pubsub", async function () { [waku1, waku2, waku3] = await Promise.all([ createRelayNode({ - shardInfo: contentTopicInfo1, + networkConfig: contentTopicInfo1, staticNoiseKey: NOISE_KEY_1 }).then((waku) => waku.start().then(() => waku)), createRelayNode({ - shardInfo: contentTopicInfo1, + networkConfig: contentTopicInfo1, staticNoiseKey: NOISE_KEY_2, libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } } }).then((waku) => waku.start().then(() => waku)), @@ -667,287 +667,3 @@ describe("Waku Relay (Autosharding), multiple pubsub topics", function () { expect(waku2ReceivedMsg.pubsubTopic).to.eq(autoshardingPubsubTopic1); }); }); - -describe("Waku Relay (named sharding), multiple pubsub topics", function () { - this.timeout(15000); - let waku1: RelayNode; - let waku2: RelayNode; - let waku3: RelayNode; - - const customPubsubTopic1 = singleShardInfoToPubsubTopic({ - clusterId: 3, - shard: 1 - }); - const customPubsubTopic2 = singleShardInfoToPubsubTopic({ - clusterId: 3, - shard: 2 - }); - const customContentTopic1 = "/test/2/waku-relay/utf8"; - const customContentTopic2 = "/test/3/waku-relay/utf8"; - const customEncoder1 = createEncoder({ - pubsubTopic: customPubsubTopic1, - contentTopic: customContentTopic1 - }); - const customDecoder1 = createDecoder(customContentTopic1, customPubsubTopic1); - const customEncoder2 = createEncoder({ - pubsubTopic: customPubsubTopic2, - contentTopic: customContentTopic2 - }); - const customDecoder2 = createDecoder(customContentTopic2, customPubsubTopic2); - - afterEachCustom(this, async () => { - await tearDownNodes([], [waku1, waku2, waku3]); - }); - - [ - { - pubsub: customPubsubTopic1, - encoder: customEncoder1, - decoder: customDecoder1 - }, - { - pubsub: customPubsubTopic2, - encoder: customEncoder2, - decoder: customDecoder2 - } - ].forEach((testItem) => { - it(`3 nodes on ${testItem.pubsub} topic`, async function () { - const [msgCollector1, msgCollector2, msgCollector3] = Array(3) - .fill(null) - .map(() => new MessageCollector()); - - [waku1, waku2, waku3] = await Promise.all([ - createRelayNode({ - pubsubTopics: [testItem.pubsub], - staticNoiseKey: NOISE_KEY_1 - }).then((waku) => waku.start().then(() => waku)), - createRelayNode({ - pubsubTopics: [testItem.pubsub], - staticNoiseKey: NOISE_KEY_2, - libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } } - }).then((waku) => waku.start().then(() => waku)), - createRelayNode({ - pubsubTopics: [testItem.pubsub], - staticNoiseKey: NOISE_KEY_3 - }).then((waku) => waku.start().then(() => waku)) - ]); - - await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, { - multiaddrs: waku2.libp2p.getMultiaddrs() - }); - await waku3.libp2p.peerStore.merge(waku2.libp2p.peerId, { - multiaddrs: waku2.libp2p.getMultiaddrs() - }); - await Promise.all([ - waku1.dial(waku2.libp2p.peerId), - waku3.dial(waku2.libp2p.peerId) - ]); - - await Promise.all([ - waitForRemotePeer(waku1, [Protocols.Relay]), - waitForRemotePeer(waku2, [Protocols.Relay]), - waitForRemotePeer(waku3, [Protocols.Relay]) - ]); - - await waku1.relay.subscribeWithUnsubscribe( - [testItem.decoder], - msgCollector1.callback - ); - await waku2.relay.subscribeWithUnsubscribe( - [testItem.decoder], - msgCollector2.callback - ); - await waku3.relay.subscribeWithUnsubscribe( - [testItem.decoder], - msgCollector3.callback - ); - - // The nodes are setup in such a way that all messages send should be relayed to the other nodes in the network - const relayResponse1 = await waku1.relay.send(testItem.encoder, { - payload: utf8ToBytes("M1") - }); - const relayResponse2 = await waku2.relay.send(testItem.encoder, { - payload: utf8ToBytes("M2") - }); - const relayResponse3 = await waku3.relay.send(testItem.encoder, { - payload: utf8ToBytes("M3") - }); - - expect(relayResponse1.successes[0].toString()).to.eq( - waku2.libp2p.peerId.toString() - ); - expect(relayResponse3.successes[0].toString()).to.eq( - waku2.libp2p.peerId.toString() - ); - expect(relayResponse2.successes.map((r) => r.toString())).to.include( - waku1.libp2p.peerId.toString() - ); - expect(relayResponse2.successes.map((r) => r.toString())).to.include( - waku3.libp2p.peerId.toString() - ); - - expect(await msgCollector1.waitForMessages(2, { exact: true })).to.eq( - true - ); - expect(await msgCollector2.waitForMessages(2, { exact: true })).to.eq( - true - ); - expect(await msgCollector3.waitForMessages(2, { exact: true })).to.eq( - true - ); - - expect( - msgCollector1.hasMessage(testItem.encoder.contentTopic, "M2") - ).to.eq(true); - expect( - msgCollector1.hasMessage(testItem.encoder.contentTopic, "M3") - ).to.eq(true); - expect( - msgCollector2.hasMessage(testItem.encoder.contentTopic, "M1") - ).to.eq(true); - expect( - msgCollector2.hasMessage(testItem.encoder.contentTopic, "M3") - ).to.eq(true); - expect( - msgCollector3.hasMessage(testItem.encoder.contentTopic, "M1") - ).to.eq(true); - expect( - msgCollector3.hasMessage(testItem.encoder.contentTopic, "M2") - ).to.eq(true); - }); - }); - - it("Nodes with multiple pubsub topic", async function () { - const [msgCollector1, msgCollector2, msgCollector3] = Array(3) - .fill(null) - .map(() => new MessageCollector()); - - // Waku1 and waku2 are using multiple pubsub topis - [waku1, waku2, waku3] = await Promise.all([ - createRelayNode({ - pubsubTopics: [customPubsubTopic1, customPubsubTopic2], - staticNoiseKey: NOISE_KEY_1 - }).then((waku) => waku.start().then(() => waku)), - createRelayNode({ - pubsubTopics: [customPubsubTopic1, customPubsubTopic2], - staticNoiseKey: NOISE_KEY_2, - libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } } - }).then((waku) => waku.start().then(() => waku)), - createRelayNode({ - pubsubTopics: [customPubsubTopic1], - staticNoiseKey: NOISE_KEY_3 - }).then((waku) => waku.start().then(() => waku)) - ]); - - await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, { - multiaddrs: waku2.libp2p.getMultiaddrs() - }); - await waku3.libp2p.peerStore.merge(waku2.libp2p.peerId, { - multiaddrs: waku2.libp2p.getMultiaddrs() - }); - await Promise.all([ - waku1.dial(waku2.libp2p.peerId), - waku3.dial(waku2.libp2p.peerId) - ]); - - await Promise.all([ - waitForRemotePeer(waku1, [Protocols.Relay]), - waitForRemotePeer(waku2, [Protocols.Relay]), - waitForRemotePeer(waku3, [Protocols.Relay]) - ]); - - await waku1.relay.subscribeWithUnsubscribe( - [customDecoder1, customDecoder2], - msgCollector1.callback - ); - await waku2.relay.subscribeWithUnsubscribe( - [customDecoder1, customDecoder2], - msgCollector2.callback - ); - await waku3.relay.subscribeWithUnsubscribe( - [customDecoder1], - msgCollector3.callback - ); - - // The nodes are setup in such a way that all messages send should be relayed to the other nodes in the network - // However onlt waku1 and waku2 are receiving messages on the CustomPubSubTopic - await waku1.relay.send(customEncoder1, { payload: utf8ToBytes("M1") }); - await waku1.relay.send(customEncoder2, { payload: utf8ToBytes("M2") }); - await waku2.relay.send(customEncoder1, { payload: utf8ToBytes("M3") }); - await waku2.relay.send(customEncoder2, { payload: utf8ToBytes("M4") }); - await waku3.relay.send(customEncoder1, { payload: utf8ToBytes("M5") }); - await waku3.relay.send(customEncoder2, { payload: utf8ToBytes("M6") }); - - expect(await msgCollector1.waitForMessages(3, { exact: true })).to.eq(true); - expect(await msgCollector2.waitForMessages(3, { exact: true })).to.eq(true); - expect(await msgCollector3.waitForMessages(2, { exact: true })).to.eq(true); - expect(msgCollector1.hasMessage(customContentTopic1, "M3")).to.eq(true); - expect(msgCollector1.hasMessage(customContentTopic2, "M4")).to.eq(true); - expect(msgCollector1.hasMessage(customContentTopic1, "M5")).to.eq(true); - expect(msgCollector2.hasMessage(customContentTopic1, "M1")).to.eq(true); - expect(msgCollector2.hasMessage(customContentTopic2, "M2")).to.eq(true); - expect(msgCollector2.hasMessage(customContentTopic1, "M5")).to.eq(true); - expect(msgCollector3.hasMessage(customContentTopic1, "M1")).to.eq(true); - expect(msgCollector3.hasMessage(customContentTopic1, "M3")).to.eq(true); - }); - - it("n1 and n2 uses a custom pubsub, n3 uses the default pubsub", async function () { - [waku1, waku2, waku3] = await Promise.all([ - createRelayNode({ - pubsubTopics: [customPubsubTopic1], - staticNoiseKey: NOISE_KEY_1 - }).then((waku) => waku.start().then(() => waku)), - createRelayNode({ - pubsubTopics: [customPubsubTopic1], - staticNoiseKey: NOISE_KEY_2, - libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } } - }).then((waku) => waku.start().then(() => waku)), - createRelayNode({ - staticNoiseKey: NOISE_KEY_3 - }).then((waku) => waku.start().then(() => waku)) - ]); - - await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, { - multiaddrs: waku2.libp2p.getMultiaddrs() - }); - await waku3.libp2p.peerStore.merge(waku2.libp2p.peerId, { - multiaddrs: waku2.libp2p.getMultiaddrs() - }); - await Promise.all([ - waku1.dial(waku2.libp2p.peerId), - waku3.dial(waku2.libp2p.peerId) - ]); - - await Promise.all([ - waitForRemotePeer(waku1, [Protocols.Relay]), - waitForRemotePeer(waku2, [Protocols.Relay]) - ]); - - const messageText = "Communicating using a custom pubsub topic"; - - const waku2ReceivedMsgPromise: Promise = new Promise( - (resolve) => { - void waku2.relay.subscribeWithUnsubscribe([customDecoder1], resolve); - } - ); - - // The promise **fails** if we receive a message on the default - // pubsub topic. - const waku3NoMsgPromise: Promise = new Promise( - (resolve, reject) => { - void waku3.relay.subscribeWithUnsubscribe([TestDecoder], reject); - setTimeout(resolve, 1000); - } - ); - - await waku1.relay.send(customEncoder1, { - payload: utf8ToBytes(messageText) - }); - - const waku2ReceivedMsg = await waku2ReceivedMsgPromise; - await waku3NoMsgPromise; - - expect(bytesToUtf8(waku2ReceivedMsg.payload!)).to.eq(messageText); - expect(waku2ReceivedMsg.pubsubTopic).to.eq(customPubsubTopic1); - }); -}); diff --git a/packages/tests/tests/relay/subscribe.node.spec.ts b/packages/tests/tests/relay/subscribe.node.spec.ts index a147c4ad68..84c1fceefc 100644 --- a/packages/tests/tests/relay/subscribe.node.spec.ts +++ b/packages/tests/tests/relay/subscribe.node.spec.ts @@ -65,7 +65,7 @@ describe("Waku Relay, Subscribe", function () { try { const waku = await createRelayNode({ staticNoiseKey: NOISE_KEY_1, - shardInfo: TestShardInfo + networkConfig: TestShardInfo }); await waku.start(); diff --git a/packages/tests/tests/relay/utils.ts b/packages/tests/tests/relay/utils.ts index df8cfeee2f..e6ab379eee 100644 --- a/packages/tests/tests/relay/utils.ts +++ b/packages/tests/tests/relay/utils.ts @@ -1,9 +1,9 @@ import { createDecoder, createEncoder, waitForRemotePeer } from "@waku/core"; import { + NetworkConfig, Protocols, RelayNode, - ShardInfo, - ShardingParams + ShardInfo } from "@waku/interfaces"; import { createRelayNode } from "@waku/sdk/relay"; import { contentTopicToPubsubTopic, Logger } from "@waku/utils"; @@ -51,10 +51,10 @@ export async function waitForAllRemotePeers( export const runRelayNodes = ( context: Context, - shardInfo: ShardingParams + networkConfig: NetworkConfig ): Promise<[ServiceNode, RelayNode]> => runNodes({ - shardInfo, + networkConfig, context, protocols: RELAY_PROTOCOLS, createNode: createRelayNode @@ -65,11 +65,11 @@ export async function runJSNodes(): Promise<[RelayNode, RelayNode]> { const [waku1, waku2] = await Promise.all([ createRelayNode({ staticNoiseKey: NOISE_KEY_1, - shardInfo: TestShardInfo + networkConfig: TestShardInfo }).then((waku) => waku.start().then(() => waku)), createRelayNode({ staticNoiseKey: NOISE_KEY_2, - shardInfo: TestShardInfo, + networkConfig: TestShardInfo, libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } } }).then((waku) => waku.start().then(() => waku)) ]); diff --git a/packages/tests/tests/sharding/auto_sharding.spec.ts b/packages/tests/tests/sharding/auto_sharding.spec.ts index adac109042..91e58979b7 100644 --- a/packages/tests/tests/sharding/auto_sharding.spec.ts +++ b/packages/tests/tests/sharding/auto_sharding.spec.ts @@ -55,7 +55,7 @@ describe("Autosharding: Running Nodes", function () { await nwaku.ensureSubscriptions(pubsubTopics); waku = await createLightNode({ - shardInfo: { + networkConfig: { clusterId: clusterId, contentTopics: [ContentTopic] } @@ -97,7 +97,7 @@ describe("Autosharding: Running Nodes", function () { await nwaku.ensureSubscriptions(pubsubTopics); waku = await createLightNode({ - shardInfo: { + networkConfig: { clusterId: clusterId, contentTopics: [ContentTopic] } @@ -153,7 +153,7 @@ describe("Autosharding: Running Nodes", function () { }); waku = await createLightNode({ - shardInfo: { + networkConfig: { clusterId: clusterId, contentTopics: [ContentTopic] } @@ -216,7 +216,7 @@ describe("Autosharding: Running Nodes", function () { }); waku = await createLightNode({ - shardInfo: { + networkConfig: { clusterId: clusterId, // For autosharding, we configure multiple pubsub topics by using two content topics that hash to different shards contentTopics: [ContentTopic, ContentTopic2] @@ -274,7 +274,7 @@ describe("Autosharding: Running Nodes", function () { }); waku = await createLightNode({ - shardInfo: { + networkConfig: { clusterId: clusterId, contentTopics: [ContentTopic] } @@ -301,52 +301,10 @@ describe("Autosharding: Running Nodes", function () { expect(errors).to.include(ProtocolError.TOPIC_NOT_CONFIGURED); }); - it("start node with ApplicationInfo", async function () { - const pubsubTopics = [contentTopicToPubsubTopic(ContentTopic, clusterId)]; - - await nwaku.start({ - store: true, - lightpush: true, - relay: true, - clusterId: clusterId, - pubsubTopic: pubsubTopics, - contentTopic: [ContentTopic] - }); - - waku = await createLightNode({ - shardInfo: { - clusterId: clusterId, - application: ContentTopic.split("/")[1], - version: ContentTopic.split("/")[2] - } - }); - await waku.dial(await nwaku.getMultiaddrWithId()); - await waitForRemotePeer(waku, [Protocols.LightPush]); - - const encoder = createEncoder({ - contentTopic: ContentTopic, - pubsubTopicShardInfo: { - clusterId: clusterId, - shard: contentTopicToShardIndex(ContentTopic) - } - }); - - const request = await waku.lightPush.send(encoder, { - payload: utf8ToBytes("Hello World") - }); - - expect(request.successes.length).to.eq(1); - expect( - await messageCollector.waitForMessagesAutosharding(1, { - contentTopic: ContentTopic - }) - ).to.eq(true); - }); - it("start node with empty content topic", async function () { try { waku = await createLightNode({ - shardInfo: { + networkConfig: { clusterId: clusterId, contentTopics: [] } @@ -358,7 +316,7 @@ describe("Autosharding: Running Nodes", function () { if ( !(err instanceof Error) || !err.message.includes( - "Missing minimum required configuration options for static sharding or autosharding" + "Invalid content topics configuration: please provide at least one content topic" ) ) { throw err; diff --git a/packages/tests/tests/sharding/peer_management.spec.ts b/packages/tests/tests/sharding/peer_management.spec.ts index 64bd342dde..c912f475ae 100644 --- a/packages/tests/tests/sharding/peer_management.spec.ts +++ b/packages/tests/tests/sharding/peer_management.spec.ts @@ -91,7 +91,7 @@ describe("Static Sharding: Peer Management", function () { const nwaku3Ma = await nwaku3.getMultiaddrWithId(); waku = await createLightNode({ - shardInfo: shardInfo, + networkConfig: shardInfo, libp2p: { peerDiscovery: [ bootstrap({ list: [nwaku3Ma.toString()] }), @@ -173,7 +173,7 @@ describe("Static Sharding: Peer Management", function () { const nwaku3Ma = await nwaku3.getMultiaddrWithId(); waku = await createLightNode({ - shardInfo: shardInfoToDial, + networkConfig: shardInfoToDial, libp2p: { peerDiscovery: [ bootstrap({ list: [nwaku3Ma.toString()] }), @@ -277,7 +277,7 @@ describe("Autosharding: Peer Management", function () { const nwaku3Ma = await nwaku3.getMultiaddrWithId(); waku = await createLightNode({ - shardInfo: contentTopicInfo, + networkConfig: contentTopicInfo, libp2p: { peerDiscovery: [ bootstrap({ list: [nwaku3Ma.toString()] }), @@ -359,7 +359,7 @@ describe("Autosharding: Peer Management", function () { const nwaku3Ma = await nwaku3.getMultiaddrWithId(); waku = await createLightNode({ - shardInfo: contentTopicInfoToDial, + networkConfig: contentTopicInfoToDial, libp2p: { peerDiscovery: [ bootstrap({ list: [nwaku3Ma.toString()] }), diff --git a/packages/tests/tests/sharding/static_sharding.spec.ts b/packages/tests/tests/sharding/static_sharding.spec.ts index c51e37cfcd..47cca240a5 100644 --- a/packages/tests/tests/sharding/static_sharding.spec.ts +++ b/packages/tests/tests/sharding/static_sharding.spec.ts @@ -59,7 +59,7 @@ describe("Static Sharding: Running Nodes", function () { await nwaku.ensureSubscriptions(shardInfoToPubsubTopics(shardInfo)); waku = await createLightNode({ - shardInfo: shardInfo + networkConfig: shardInfo }); await waku.dial(await nwaku.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.LightPush]); @@ -99,7 +99,7 @@ describe("Static Sharding: Running Nodes", function () { await nwaku.ensureSubscriptions(shardInfoToPubsubTopics(shardInfo)); waku = await createLightNode({ - shardInfo: shardInfo + networkConfig: shardInfo }); await waku.dial(await nwaku.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.LightPush]); @@ -148,7 +148,7 @@ describe("Static Sharding: Running Nodes", function () { }); waku = await createLightNode({ - shardInfo: shardInfo + networkConfig: shardInfo }); await waku.dial(await nwaku.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.LightPush]); @@ -214,7 +214,7 @@ describe("Static Sharding: Running Nodes", function () { it("configure the node with multiple pubsub topics", async function () { waku = await createLightNode({ - shardInfo: shardInfoBothShards + networkConfig: shardInfoBothShards }); await waku.dial(await nwaku.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.LightPush]); @@ -253,7 +253,7 @@ describe("Static Sharding: Running Nodes", function () { it("using a protocol with unconfigured pubsub topic should fail", async function () { this.timeout(15_000); waku = await createLightNode({ - shardInfo: shardInfoFirstShard + networkConfig: shardInfoFirstShard }); // use a pubsub topic that is not configured @@ -274,10 +274,10 @@ describe("Static Sharding: Running Nodes", function () { expect(errors).to.include(ProtocolError.TOPIC_NOT_CONFIGURED); }); - it("start node with empty shard", async function () { + it("start node with empty shard should fail", async function () { try { waku = await createLightNode({ - shardInfo: { clusterId: clusterId, shards: [] } + networkConfig: { clusterId: clusterId, shards: [] } }); throw new Error( "Starting the node with no shard should've thrown an error" @@ -286,7 +286,7 @@ describe("Static Sharding: Running Nodes", function () { if ( !(err instanceof Error) || !err.message.includes( - "Missing minimum required configuration options for static sharding or autosharding" + "Invalid shards configuration: please provide at least one shard" ) ) { throw err; diff --git a/packages/tests/tests/store/multiple_pubsub.spec.ts b/packages/tests/tests/store/multiple_pubsub.spec.ts index e2f42df56e..2a98793cb2 100644 --- a/packages/tests/tests/store/multiple_pubsub.spec.ts +++ b/packages/tests/tests/store/multiple_pubsub.spec.ts @@ -257,7 +257,7 @@ describe("Waku Store (Autosharding), custom pubsub topic", function () { waku = await createLightNode({ staticNoiseKey: NOISE_KEY_1, - shardInfo: contentTopicInfoBothShards + networkConfig: contentTopicInfoBothShards }); await waku.start(); diff --git a/packages/tests/tests/store/utils.ts b/packages/tests/tests/store/utils.ts index 4e7fddb993..248b9509eb 100644 --- a/packages/tests/tests/store/utils.ts +++ b/packages/tests/tests/store/utils.ts @@ -6,9 +6,9 @@ import { } from "@waku/core"; import { LightNode, + NetworkConfig, Protocols, ShardInfo, - ShardingParams, type SingleShardInfo } from "@waku/interfaces"; import { createLightNode, waitForRemotePeer } from "@waku/sdk"; @@ -102,12 +102,12 @@ export async function processQueriedMessages( export async function startAndConnectLightNode( instance: ServiceNode, - shardInfo: ShardingParams + networkConfig: NetworkConfig ): Promise { const waku = await createLightNode({ staticNoiseKey: NOISE_KEY_1, libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }, - shardInfo: shardInfo + networkConfig: networkConfig }); await waku.start(); await waku.dial(await instance.getMultiaddrWithId()); @@ -145,11 +145,11 @@ export const adjustDate = (baseDate: Date, adjustMs: number): Date => { export const runStoreNodes = ( context: Context, - shardInfo: ShardingParams + networkConfig: NetworkConfig ): Promise<[ServiceNode, LightNode]> => runNodes({ context, - shardInfo, + networkConfig, createNode: createLightNode, protocols: [Protocols.Store] }); diff --git a/packages/tests/tests/wait_for_remote_peer.node.spec.ts b/packages/tests/tests/wait_for_remote_peer.node.spec.ts index 88dbb34377..8fab283d75 100644 --- a/packages/tests/tests/wait_for_remote_peer.node.spec.ts +++ b/packages/tests/tests/wait_for_remote_peer.node.spec.ts @@ -57,7 +57,7 @@ describe("Wait for remote peer", function () { waku1 = await createRelayNode({ staticNoiseKey: NOISE_KEY_1, - shardInfo: DefaultTestShardInfo + networkConfig: DefaultTestShardInfo }); await waku1.start(); @@ -77,7 +77,7 @@ describe("Wait for remote peer", function () { this.timeout(5000); createRelayNode({ staticNoiseKey: NOISE_KEY_1, - shardInfo: DefaultTestShardInfo + networkConfig: DefaultTestShardInfo }) .then((waku1) => waku1.start().then(() => waku1)) .then((waku1) => { @@ -107,7 +107,7 @@ describe("Wait for remote peer", function () { waku2 = await createLightNode({ staticNoiseKey: NOISE_KEY_1, - shardInfo: DefaultTestShardInfo + networkConfig: DefaultTestShardInfo }); await waku2.start(); await waku2.dial(multiAddrWithId); @@ -136,7 +136,7 @@ describe("Wait for remote peer", function () { waku2 = await createLightNode({ staticNoiseKey: NOISE_KEY_1, - shardInfo: DefaultTestShardInfo + networkConfig: DefaultTestShardInfo }); await waku2.start(); const waitPromise = waitForRemotePeer(waku2, [Protocols.Store], 2000); @@ -167,7 +167,7 @@ describe("Wait for remote peer", function () { waku2 = await createLightNode({ staticNoiseKey: NOISE_KEY_1, - shardInfo: DefaultTestShardInfo + networkConfig: DefaultTestShardInfo }); await waku2.start(); await waku2.dial(multiAddrWithId); @@ -196,7 +196,7 @@ describe("Wait for remote peer", function () { waku2 = await createLightNode({ staticNoiseKey: NOISE_KEY_1, - shardInfo: DefaultTestShardInfo + networkConfig: DefaultTestShardInfo }); await waku2.start(); await waku2.dial(multiAddrWithId); @@ -225,7 +225,7 @@ describe("Wait for remote peer", function () { waku2 = await createLightNode({ staticNoiseKey: NOISE_KEY_1, - shardInfo: DefaultTestShardInfo + networkConfig: DefaultTestShardInfo }); await waku2.start(); await waku2.dial(multiAddrWithId); diff --git a/packages/tests/tests/waku.node.spec.ts b/packages/tests/tests/waku.node.spec.ts index e2c43c33e9..531ebddc0a 100644 --- a/packages/tests/tests/waku.node.spec.ts +++ b/packages/tests/tests/waku.node.spec.ts @@ -54,7 +54,7 @@ describe("Waku Dial [node only]", function () { waku = await createLightNode({ staticNoiseKey: NOISE_KEY_1, - shardInfo: DefaultTestShardInfo + networkConfig: DefaultTestShardInfo }); await waku.start(); await waku.dial(multiAddrWithId); @@ -88,7 +88,7 @@ describe("Waku Dial [node only]", function () { waku = await createLightNode({ staticNoiseKey: NOISE_KEY_1, - shardInfo: DefaultTestShardInfo + networkConfig: DefaultTestShardInfo }); await waku.start(); await waku.dial(multiAddrWithId); @@ -116,7 +116,7 @@ describe("Waku Dial [node only]", function () { const multiAddrWithId = await nwaku.getMultiaddrWithId(); waku = await createLightNode({ staticNoiseKey: NOISE_KEY_1, - shardInfo: DefaultTestShardInfo, + networkConfig: DefaultTestShardInfo, libp2p: { peerDiscovery: [bootstrap({ list: [multiAddrWithId.toString()] })] } @@ -142,7 +142,7 @@ describe("Waku Dial [node only]", function () { waku = await createLightNode({ staticNoiseKey: NOISE_KEY_1, - shardInfo: DefaultTestShardInfo, + networkConfig: DefaultTestShardInfo, libp2p: { peerDiscovery: [bootstrap({ list: [nwakuMa.toString()] })] } @@ -174,11 +174,11 @@ describe("Decryption Keys", function () { [waku1, waku2] = await Promise.all([ createRelayNode({ staticNoiseKey: NOISE_KEY_1, - shardInfo: DefaultTestShardInfo + networkConfig: DefaultTestShardInfo }).then((waku) => waku.start().then(() => waku)), createRelayNode({ staticNoiseKey: NOISE_KEY_2, - shardInfo: DefaultTestShardInfo, + networkConfig: DefaultTestShardInfo, libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } } }).then((waku) => waku.start().then(() => waku)) ]); @@ -254,11 +254,11 @@ describe("User Agent", function () { createRelayNode({ staticNoiseKey: NOISE_KEY_1, userAgent: waku1UserAgent, - shardInfo: DefaultTestShardInfo + networkConfig: DefaultTestShardInfo }).then((waku) => waku.start().then(() => waku)), createRelayNode({ staticNoiseKey: NOISE_KEY_2, - shardInfo: DefaultTestShardInfo, + networkConfig: DefaultTestShardInfo, libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } } }).then((waku) => waku.start().then(() => waku)) ]); diff --git a/packages/utils/src/common/index.ts b/packages/utils/src/common/index.ts index 025a06f03c..c0384f38c7 100644 --- a/packages/utils/src/common/index.ts +++ b/packages/utils/src/common/index.ts @@ -4,7 +4,7 @@ export * from "./random_subset.js"; export * from "./group_by.js"; export * from "./to_async_iterator.js"; export * from "./is_size_valid.js"; -export * from "./sharding.js"; +export * from "./sharding/index.js"; export * from "./push_or_init_map.js"; export * from "./relay_shard_codec.js"; export * from "./delay.js"; diff --git a/packages/utils/src/common/sharding.spec.ts b/packages/utils/src/common/sharding/index.spec.ts similarity index 89% rename from packages/utils/src/common/sharding.spec.ts rename to packages/utils/src/common/sharding/index.spec.ts index 8e4e10e00f..2e5da8d754 100644 --- a/packages/utils/src/common/sharding.spec.ts +++ b/packages/utils/src/common/sharding/index.spec.ts @@ -1,4 +1,4 @@ -import { DEFAULT_CLUSTER_ID } from "@waku/interfaces"; +import { DEFAULT_CLUSTER_ID, NetworkConfig } from "@waku/interfaces"; import { expect } from "chai"; import { @@ -13,7 +13,7 @@ import { shardInfoToPubsubTopics, singleShardInfosToShardInfo, singleShardInfoToPubsubTopic -} from "./sharding"; +} from "."; const testInvalidCases = ( contentTopics: string[], @@ -284,13 +284,6 @@ describe("shardInfoToPubsubTopics", () => { expect(topics.length).to.equal(2); }); - it("should handle application and version for autosharding", () => { - const shardInfo = { application: "app", version: "v1" }; - const topics = shardInfoToPubsubTopics(shardInfo); - expect(topics).to.be.an("array").that.includes("/waku/2/rs/1/4"); - expect(topics.length).to.equal(1); - }); - [0, 1, 6].forEach((clusterId) => { it(`should handle clusterId, application and version for autosharding with cluster iD ${clusterId}`, () => { const shardInfo = { @@ -431,7 +424,7 @@ describe("ensureShardingConfigured", () => { it("should return valid sharding parameters for static sharding", () => { const shardInfo = { clusterId: 1, shards: [0, 1] }; const result = ensureShardingConfigured(shardInfo); - expect(result.shardingParams).to.deep.include({ + expect(result.shardInfo).to.deep.include({ clusterId: 1, shards: [0, 1] }); @@ -443,11 +436,8 @@ describe("ensureShardingConfigured", () => { }); it("should return valid sharding parameters for content topics autosharding", () => { - const shardInfo = { contentTopics: ["/app/v1/topic1/proto"] }; - const result = ensureShardingConfigured(shardInfo); - expect(result.shardingParams).to.deep.include({ - contentTopics: ["/app/v1/topic1/proto"] - }); + const contentTopicInfo = { contentTopics: ["/app/v1/topic1/proto"] }; + const result = ensureShardingConfigured(contentTopicInfo); const expectedPubsubTopic = contentTopicToPubsubTopic( "/app/v1/topic1/proto", DEFAULT_CLUSTER_ID @@ -458,47 +448,8 @@ describe("ensureShardingConfigured", () => { expect(result.pubsubTopics).to.include(expectedPubsubTopic); }); - it("should configure sharding based on application and version for autosharding", () => { - const shardInfo = { application: "app", version: "v1" }; - const result = ensureShardingConfigured(shardInfo); - expect(result.shardingParams).to.deep.include({ - application: "app", - version: "v1" - }); - const expectedPubsubTopic = contentTopicToPubsubTopic( - `/app/v1/default/default` - ); - expect(result.pubsubTopics).to.include(expectedPubsubTopic); - expect(result.shardInfo.shards).to.include( - pubsubTopicToSingleShardInfo(expectedPubsubTopic).shard - ); - }); - - [0, 1, 4].forEach((clusterId) => { - it(`should configure sharding based on clusterId, application and version for autosharding with cluster iD ${clusterId}`, () => { - const shardInfo = { - clusterId: clusterId, - application: "app", - version: "v1" - }; - const result = ensureShardingConfigured(shardInfo); - expect(result.shardingParams).to.deep.include({ - application: "app", - version: "v1" - }); - const expectedPubsubTopic = contentTopicToPubsubTopic( - `/app/v1/default/default`, - shardInfo.clusterId - ); - expect(result.pubsubTopics).to.include(expectedPubsubTopic); - expect(result.shardInfo.shards).to.include( - pubsubTopicToSingleShardInfo(expectedPubsubTopic).shard - ); - }); - }); - it("should throw an error for missing sharding configuration", () => { - const shardInfo = {}; + const shardInfo = {} as any as NetworkConfig; expect(() => ensureShardingConfigured(shardInfo)).to.throw(); }); diff --git a/packages/utils/src/common/sharding.ts b/packages/utils/src/common/sharding/index.ts similarity index 80% rename from packages/utils/src/common/sharding.ts rename to packages/utils/src/common/sharding/index.ts index 892c9e5cf7..c30b4ac6e7 100644 --- a/packages/utils/src/common/sharding.ts +++ b/packages/utils/src/common/sharding/index.ts @@ -1,13 +1,43 @@ import { sha256 } from "@noble/hashes/sha256"; import { DEFAULT_CLUSTER_ID, + NetworkConfig, PubsubTopic, ShardInfo, - ShardingParams, SingleShardInfo } from "@waku/interfaces"; -import { concat, utf8ToBytes } from "../bytes/index.js"; +import { concat, utf8ToBytes } from "../../bytes/index.js"; + +import { isAutoSharding, isStaticSharding } from "./type_guards.js"; + +export * from "./type_guards.js"; + +export function derivePubsubTopicsFromNetworkConfig( + networkConfig: NetworkConfig +): PubsubTopic[] { + if (isStaticSharding(networkConfig)) { + if (networkConfig.shards.length === 0) { + throw new Error( + "Invalid shards configuration: please provide at least one shard" + ); + } + return shardInfoToPubsubTopics(networkConfig); + } else if (isAutoSharding(networkConfig)) { + if (networkConfig.contentTopics.length === 0) { + throw new Error( + "Invalid content topics configuration: please provide at least one content topic" + ); + } + return networkConfig.contentTopics.map((contentTopic) => + contentTopicToPubsubTopic(contentTopic, networkConfig.clusterId) + ); + } else { + throw new Error( + "Unknown shard config. Please use ShardInfo or ContentTopicInfo" + ); + } +} export const singleShardInfoToPubsubTopic = ( shardInfo: SingleShardInfo @@ -38,7 +68,7 @@ export const singleShardInfosToShardInfo = ( }; export const shardInfoToPubsubTopics = ( - shardInfo: Partial + shardInfo: Partial ): PubsubTopic[] => { if ("contentTopics" in shardInfo && shardInfo.contentTopics) { // Autosharding: explicitly defined content topics @@ -98,6 +128,39 @@ export const pubsubTopicToSingleShardInfo = ( }; }; +export const pubsubTopicsToShardInfo = ( + pubsubTopics: PubsubTopic[] +): ShardInfo => { + const shardInfoSet = new Set(); + const clusterIds = new Set(); + + for (const topic of pubsubTopics) { + const { clusterId, shard } = pubsubTopicToSingleShardInfo(topic); + shardInfoSet.add(`${clusterId}:${shard}`); + clusterIds.add(clusterId); + } + + if (shardInfoSet.size === 0) { + throw new Error("No valid pubsub topics provided"); + } + + if (clusterIds.size > 1) { + throw new Error( + "Pubsub topics from multiple cluster IDs are not supported" + ); + } + + const clusterId = clusterIds.values().next().value; + const shards = Array.from(shardInfoSet).map((info) => + parseInt(info.split(":")[1]) + ); + + return { + clusterId, + shards + }; +}; + //TODO: move part of BaseProtocol instead of utils // return `ProtocolError.TOPIC_NOT_CONFIGURED` instead of throwing export function ensurePubsubTopicIsConfigured( @@ -248,28 +311,21 @@ export function determinePubsubTopic( * @returns Validated sharding parameters, with any missing values set to defaults */ export const ensureShardingConfigured = ( - shardInfo: Partial + networkConfig: NetworkConfig ): { - shardingParams: ShardingParams; shardInfo: ShardInfo; pubsubTopics: PubsubTopic[]; } => { - const clusterId = shardInfo.clusterId ?? DEFAULT_CLUSTER_ID; - const shards = "shards" in shardInfo ? shardInfo.shards : []; + const clusterId = networkConfig.clusterId ?? DEFAULT_CLUSTER_ID; + const shards = "shards" in networkConfig ? networkConfig.shards : []; const contentTopics = - "contentTopics" in shardInfo ? shardInfo.contentTopics : []; - const [application, version] = - "application" in shardInfo && "version" in shardInfo - ? [shardInfo.application, shardInfo.version] - : [undefined, undefined]; + "contentTopics" in networkConfig ? networkConfig.contentTopics : []; const isShardsConfigured = shards && shards.length > 0; const isContentTopicsConfigured = contentTopics && contentTopics.length > 0; - const isApplicationVersionConfigured = application && version; if (isShardsConfigured) { return { - shardingParams: { clusterId, shards }, shardInfo: { clusterId, shards }, pubsubTopics: shardInfoToPubsubTopics({ clusterId, shards }) }; @@ -287,27 +343,11 @@ export const ensureShardingConfigured = ( new Set(contentTopics.map((topic) => contentTopicToShardIndex(topic))) ); return { - shardingParams: { clusterId, contentTopics }, shardInfo: { clusterId, shards }, pubsubTopics }; } - if (isApplicationVersionConfigured) { - const pubsubTopic = contentTopicToPubsubTopic( - `/${application}/${version}/default/default`, - clusterId - ); - return { - shardingParams: { clusterId, application, version }, - shardInfo: { - clusterId, - shards: [pubsubTopicToSingleShardInfo(pubsubTopic).shard!] - }, - pubsubTopics: [pubsubTopic] - }; - } - throw new Error( "Missing minimum required configuration options for static sharding or autosharding." ); diff --git a/packages/utils/src/common/sharding/type_guards.ts b/packages/utils/src/common/sharding/type_guards.ts new file mode 100644 index 0000000000..959097bf22 --- /dev/null +++ b/packages/utils/src/common/sharding/type_guards.ts @@ -0,0 +1,19 @@ +import type { + ContentTopicInfo, + ProtocolCreateOptions, + StaticSharding +} from "@waku/interfaces"; + +export function isStaticSharding( + config: NonNullable +): config is StaticSharding { + return ( + "clusterId" in config && "shards" in config && !("contentTopics" in config) + ); +} + +export function isAutoSharding( + config: NonNullable +): config is ContentTopicInfo { + return "contentTopics" in config && "clusterId" in config; +}