diff --git a/packages/core/src/lib/base_protocol.ts b/packages/core/src/lib/base_protocol.ts index 9eb3ab57d8..ea96b2b293 100644 --- a/packages/core/src/lib/base_protocol.ts +++ b/packages/core/src/lib/base_protocol.ts @@ -7,7 +7,11 @@ import type { PubsubTopic } from "@waku/interfaces"; import { DefaultPubsubTopic } from "@waku/interfaces"; -import { Logger, shardInfoToPubsubTopics } from "@waku/utils"; +import { + ensureShardingConfigured, + Logger, + shardInfoToPubsubTopics +} from "@waku/utils"; import { getConnectedPeersForProtocolAndShard, getPeersForProtocol, @@ -108,6 +112,8 @@ export class BaseProtocol implements IBaseProtocol { this.peerStore, [this.multicodec], this.options?.shardInfo + ? ensureShardingConfigured(this.options.shardInfo).shardInfo + : undefined ); // Filter the peers based on discovery & number of peers requested diff --git a/packages/core/src/lib/metadata/index.ts b/packages/core/src/lib/metadata/index.ts index 59197357a2..f48024b3f7 100644 --- a/packages/core/src/lib/metadata/index.ts +++ b/packages/core/src/lib/metadata/index.ts @@ -132,7 +132,7 @@ class Metadata extends BaseProtocol implements IMetadata { } export function wakuMetadata( - shardInfo: ShardingParams + shardInfo: ShardInfo ): (components: Libp2pComponents) => IMetadata { return (components: Libp2pComponents) => new Metadata(shardInfo, components); } diff --git a/packages/interfaces/src/constants.ts b/packages/interfaces/src/constants.ts index 165476dd09..1a666c280d 100644 --- a/packages/interfaces/src/constants.ts +++ b/packages/interfaces/src/constants.ts @@ -2,3 +2,8 @@ * DefaultPubsubTopic is the default gossipsub topic to use for Waku. */ export const DefaultPubsubTopic = "/waku/2/default-waku/proto"; + +/** + * The default cluster ID for The Waku Network + */ +export const DEFAULT_CLUSTER_ID = 1; diff --git a/packages/interfaces/src/message.ts b/packages/interfaces/src/message.ts index 1c8348239e..1e48030288 100644 --- a/packages/interfaces/src/message.ts +++ b/packages/interfaces/src/message.ts @@ -5,7 +5,7 @@ export interface SingleShardInfo { /** * Specifying this field indicates to the encoder/decoder that static sharding must be used. */ - shard?: number; + shard: number; } export interface IRateLimitProof { diff --git a/packages/interfaces/src/protocols.ts b/packages/interfaces/src/protocols.ts index 0d30c230e9..4446820278 100644 --- a/packages/interfaces/src/protocols.ts +++ b/packages/interfaces/src/protocols.ts @@ -60,7 +60,7 @@ export type ProtocolCreateOptions = { * See [Waku v2 Topic Usage Recommendations](https://rfc.vac.dev/spec/23/) for details. * */ - shardInfo?: ShardingParams; + shardInfo?: Partial; /** * You can pass options to the `Libp2p` instance used by {@link @waku/core!WakuNode} using the `libp2p` property. * This property is the same type as the one passed to [`Libp2p.create`](https://github.com/libp2p/js-libp2p/blob/master/doc/API.md#create) diff --git a/packages/sdk/src/create.ts b/packages/sdk/src/create.ts index 15bcf4e423..4f438e45ee 100644 --- a/packages/sdk/src/create.ts +++ b/packages/sdk/src/create.ts @@ -16,18 +16,19 @@ import { wakuStore } from "@waku/core"; import { enrTree, wakuDnsDiscovery } from "@waku/dns-discovery"; -import type { - CreateLibp2pOptions, - FullNode, - IMetadata, - Libp2p, - Libp2pComponents, - LightNode, - ProtocolCreateOptions, - ShardingParams +import { + type CreateLibp2pOptions, + type FullNode, + type IMetadata, + type Libp2p, + type Libp2pComponents, + type LightNode, + type ProtocolCreateOptions, + type ShardInfo } from "@waku/interfaces"; import { wakuPeerExchangeDiscovery } from "@waku/peer-exchange"; import { RelayCreateOptions, wakuGossipSub, wakuRelay } from "@waku/relay"; +import { ensureShardingConfigured } from "@waku/utils"; import { createLibp2p } from "libp2p"; const DEFAULT_NODE_REQUIREMENTS = { @@ -38,17 +39,6 @@ const DEFAULT_NODE_REQUIREMENTS = { export { Libp2pComponents }; -const ensureShardingConfigured = (shardInfo: ShardingParams): void => { - if ( - ("shards" in shardInfo && shardInfo.shards.length < 1) || - ("contentTopics" in shardInfo && shardInfo.contentTopics.length < 1) - ) { - throw new Error( - "Missing required configuration options for static sharding or autosharding." - ); - } -}; - /** * Create a Waku node configured to use autosharding or static sharding. */ @@ -61,7 +51,7 @@ export async function createNode( throw new Error("Shard info must be set"); } - ensureShardingConfigured(options.shardInfo); + const shardInfo = ensureShardingConfigured(options.shardInfo); const libp2pOptions = options?.libp2p ?? {}; const peerDiscovery = libp2pOptions.peerDiscovery ?? []; @@ -71,7 +61,7 @@ export async function createNode( } const libp2p = await defaultLibp2p( - undefined, + shardInfo.shardInfo, wakuGossipSub(options), libp2pOptions, options?.userAgent @@ -85,7 +75,7 @@ export async function createNode( options ?? {}, [], libp2p, - options.shardInfo, + shardInfo.shardInfo, store, lightPush, filter @@ -102,9 +92,9 @@ export async function createLightNode( ): Promise { options = options ?? {}; - if (options.shardInfo) { - ensureShardingConfigured(options.shardInfo); - } + const shardInfo = options.shardInfo + ? ensureShardingConfigured(options.shardInfo) + : undefined; const libp2pOptions = options?.libp2p ?? {}; const peerDiscovery = libp2pOptions.peerDiscovery ?? []; @@ -114,7 +104,7 @@ export async function createLightNode( } const libp2p = await defaultLibp2p( - options.shardInfo, + shardInfo?.shardInfo, wakuGossipSub(options), libp2pOptions, options?.userAgent @@ -128,7 +118,7 @@ export async function createLightNode( options ?? {}, options.pubsubTopics, libp2p, - options.shardInfo, + shardInfo?.shardingParams, store, lightPush, filter @@ -153,9 +143,9 @@ export async function createFullNode( ): Promise { options = options ?? {}; - if (options.shardInfo) { - ensureShardingConfigured(options.shardInfo); - } + const shardInfo = options.shardInfo + ? ensureShardingConfigured(options.shardInfo) + : undefined; const libp2pOptions = options?.libp2p ?? {}; const peerDiscovery = libp2pOptions.peerDiscovery ?? []; @@ -165,7 +155,7 @@ export async function createFullNode( } const libp2p = await defaultLibp2p( - options.shardInfo, + shardInfo?.shardInfo, wakuGossipSub(options), libp2pOptions, options?.userAgent @@ -180,7 +170,7 @@ export async function createFullNode( options ?? {}, options.pubsubTopics, libp2p, - options.shardInfo, + shardInfo?.shardingParams, store, lightPush, filter, @@ -207,7 +197,7 @@ type MetadataService = { }; export async function defaultLibp2p( - shardInfo?: ShardingParams, + shardInfo?: ShardInfo, wakuGossipSub?: PubsubService["pubsub"], options?: Partial, userAgent?: string diff --git a/packages/sdk/src/relay/index.ts b/packages/sdk/src/relay/index.ts index d02b989d93..0e1be794df 100644 --- a/packages/sdk/src/relay/index.ts +++ b/packages/sdk/src/relay/index.ts @@ -1,6 +1,7 @@ import { WakuNode, WakuOptions } from "@waku/core"; import type { ProtocolCreateOptions, RelayNode } from "@waku/interfaces"; import { RelayCreateOptions, wakuGossipSub, wakuRelay } from "@waku/relay"; +import { ensureShardingConfigured } from "@waku/utils"; import { defaultLibp2p, defaultPeerDiscoveries } from "../create.js"; @@ -26,8 +27,12 @@ export async function createRelayNode( Object.assign(libp2pOptions, { peerDiscovery }); } + const shardInfo = options.shardInfo + ? ensureShardingConfigured(options.shardInfo) + : undefined; + const libp2p = await defaultLibp2p( - options.shardInfo, + shardInfo?.shardInfo, wakuGossipSub(options), libp2pOptions, options?.userAgent @@ -39,7 +44,7 @@ export async function createRelayNode( options, options.pubsubTopics, libp2p, - options.shardInfo, + shardInfo?.shardingParams, undefined, undefined, undefined, diff --git a/packages/tests/tests/filter/single_node/multiple_pubsub.node.spec.ts b/packages/tests/tests/filter/single_node/multiple_pubsub.node.spec.ts index dbaad69bd9..a633c994a8 100644 --- a/packages/tests/tests/filter/single_node/multiple_pubsub.node.spec.ts +++ b/packages/tests/tests/filter/single_node/multiple_pubsub.node.spec.ts @@ -9,6 +9,7 @@ import type { import { Protocols } from "@waku/interfaces"; import { contentTopicToPubsubTopic, + contentTopicToShardIndex, pubsubTopicToSingleShardInfo, singleShardInfoToPubsubTopic } from "@waku/utils"; @@ -207,17 +208,25 @@ describe("Waku Filter V2 (Autosharding): Multiple PubsubTopics", function () { const customEncoder1 = createEncoder({ contentTopic: customContentTopic1, pubsubTopicShardInfo: { - clusterId: 3 + clusterId: 3, + shard: contentTopicToShardIndex(customContentTopic1) } }); - const customDecoder1 = createDecoder(customContentTopic1, { clusterId: 3 }); + const customDecoder1 = createDecoder(customContentTopic1, { + clusterId: 3, + shard: contentTopicToShardIndex(customContentTopic1) + }); const customEncoder2 = createEncoder({ contentTopic: customContentTopic2, pubsubTopicShardInfo: { - clusterId: 3 + clusterId: 3, + shard: contentTopicToShardIndex(customContentTopic2) } }); - const customDecoder2 = createDecoder(customContentTopic2, { clusterId: 3 }); + const customDecoder2 = createDecoder(customContentTopic2, { + clusterId: 3, + shard: contentTopicToShardIndex(customContentTopic2) + }); this.beforeEach(async function () { this.timeout(15000); diff --git a/packages/tests/tests/getPeers.spec.ts b/packages/tests/tests/getPeers.spec.ts index 6da9d27b44..7103aac760 100644 --- a/packages/tests/tests/getPeers.spec.ts +++ b/packages/tests/tests/getPeers.spec.ts @@ -11,7 +11,7 @@ import { Tags, utf8ToBytes } from "@waku/sdk"; -import { shardInfoToPubsubTopics } from "@waku/utils"; +import { ensureShardingConfigured, shardInfoToPubsubTopics } from "@waku/utils"; import { getConnectedPeersForProtocolAndShard } from "@waku/utils/libp2p"; import { expect } from "chai"; import fc from "fast-check"; @@ -237,7 +237,7 @@ describe("getConnectedPeersForProtocolAndShard", function () { waku.libp2p.getConnections(), waku.libp2p.peerStore, waku.libp2p.getProtocols(), - shardInfo + ensureShardingConfigured(shardInfo).shardInfo ); expect(peers.length).to.be.greaterThan(0); }); @@ -289,7 +289,7 @@ describe("getConnectedPeersForProtocolAndShard", function () { waku.libp2p.getConnections(), waku.libp2p.peerStore, waku.libp2p.getProtocols(), - shardInfo2 + ensureShardingConfigured(shardInfo2).shardInfo ); expect(peers.length).to.be.equal(1); }); @@ -341,7 +341,7 @@ describe("getConnectedPeersForProtocolAndShard", function () { waku.libp2p.getConnections(), waku.libp2p.peerStore, waku.libp2p.getProtocols(), - shardInfo2 + ensureShardingConfigured(shardInfo2).shardInfo ); expect(peers.length).to.be.equal(1); }); @@ -393,7 +393,7 @@ describe("getConnectedPeersForProtocolAndShard", function () { waku.libp2p.getConnections(), waku.libp2p.peerStore, waku.libp2p.getProtocols(), - shardInfo2 + ensureShardingConfigured(shardInfo2).shardInfo ); expect(peers.length).to.be.equal(1); }); diff --git a/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts b/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts index a5e7745e32..b55864f9ae 100644 --- a/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts +++ b/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts @@ -10,6 +10,8 @@ import { } from "@waku/interfaces"; import { contentTopicToPubsubTopic, + contentTopicToShardIndex, + pubsubTopicToSingleShardInfo, singleShardInfoToPubsubTopic } from "@waku/utils"; import { utf8ToBytes } from "@waku/utils/bytes"; @@ -202,11 +204,11 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () { }; const customEncoder1 = createEncoder({ contentTopic: customContentTopic1, - pubsubTopicShardInfo: shardInfo + pubsubTopicShardInfo: pubsubTopicToSingleShardInfo(autoshardingPubsubTopic1) }); const customEncoder2 = createEncoder({ contentTopic: customContentTopic2, - pubsubTopicShardInfo: shardInfo + pubsubTopicShardInfo: pubsubTopicToSingleShardInfo(autoshardingPubsubTopic2) }); let nimPeerId: PeerId; @@ -356,12 +358,16 @@ describe("Waku Light Push (named sharding): Multiple PubsubTopics", function () const customEncoder1 = createEncoder({ contentTopic: customContentTopic1, pubsubTopicShardInfo: { - clusterId + clusterId, + shard: contentTopicToShardIndex(customContentTopic1) } }); const customEncoder2 = createEncoder({ contentTopic: customContentTopic2, - pubsubTopicShardInfo: { clusterId } + pubsubTopicShardInfo: { + clusterId, + shard: contentTopicToShardIndex(customContentTopic2) + } }); let nimPeerId: PeerId; diff --git a/packages/tests/tests/relay/multiple_pubsub.node.spec.ts b/packages/tests/tests/relay/multiple_pubsub.node.spec.ts index f9d7fcc582..c6e9a4426d 100644 --- a/packages/tests/tests/relay/multiple_pubsub.node.spec.ts +++ b/packages/tests/tests/relay/multiple_pubsub.node.spec.ts @@ -14,6 +14,7 @@ import { Protocols } from "@waku/interfaces"; import { createRelayNode } from "@waku/sdk/relay"; import { contentTopicToPubsubTopic, + pubsubTopicToSingleShardInfo, singleShardInfoToPubsubTopic } from "@waku/utils"; import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes"; @@ -340,16 +341,20 @@ describe("Waku Relay (Autosharding), multiple pubsub topics", function () { }; const customEncoder1 = createEncoder({ contentTopic: customContentTopic1, - pubsubTopicShardInfo: { - clusterId: 3 - } + pubsubTopicShardInfo: pubsubTopicToSingleShardInfo(autoshardingPubsubTopic1) }); - const customDecoder1 = createDecoder(customContentTopic1, { clusterId: 3 }); + const customDecoder1 = createDecoder( + customContentTopic1, + pubsubTopicToSingleShardInfo(autoshardingPubsubTopic1) + ); const customEncoder2 = createEncoder({ contentTopic: customContentTopic2, - pubsubTopicShardInfo: { clusterId: 3 } + pubsubTopicShardInfo: pubsubTopicToSingleShardInfo(autoshardingPubsubTopic2) }); - const customDecoder2 = createDecoder(customContentTopic2, { clusterId: 3 }); + const customDecoder2 = createDecoder( + customContentTopic2, + pubsubTopicToSingleShardInfo(autoshardingPubsubTopic2) + ); const contentTopicInfoBothShards: ContentTopicInfo = { clusterId: 3, contentTopics: [customContentTopic1, customContentTopic2] diff --git a/packages/tests/tests/sharding/running_nodes.spec.ts b/packages/tests/tests/sharding/running_nodes.spec.ts index 6f08572a4f..2ea82e6ff2 100644 --- a/packages/tests/tests/sharding/running_nodes.spec.ts +++ b/packages/tests/tests/sharding/running_nodes.spec.ts @@ -10,7 +10,10 @@ import { utf8ToBytes, waitForRemotePeer } from "@waku/sdk"; -import { singleShardInfoToPubsubTopic } from "@waku/utils"; +import { + contentTopicToShardIndex, + singleShardInfoToPubsubTopic +} from "@waku/utils"; import { expect } from "chai"; import { @@ -138,12 +141,18 @@ describe("Autosharding: Running Nodes", () => { const encoder1 = createEncoder({ contentTopic: ContentTopic, - pubsubTopicShardInfo: { clusterId: 0 } + pubsubTopicShardInfo: { + clusterId: 0, + shard: contentTopicToShardIndex(ContentTopic) + } }); const encoder2 = createEncoder({ contentTopic: ContentTopic, - pubsubTopicShardInfo: { clusterId: 0 } + pubsubTopicShardInfo: { + clusterId: 0, + shard: contentTopicToShardIndex(ContentTopic2) + } }); const request1 = await waku.lightPush.send(encoder1, { diff --git a/packages/tests/tests/store/multiple_pubsub.spec.ts b/packages/tests/tests/store/multiple_pubsub.spec.ts index 70f5c1acec..d7ead85543 100644 --- a/packages/tests/tests/store/multiple_pubsub.spec.ts +++ b/packages/tests/tests/store/multiple_pubsub.spec.ts @@ -3,6 +3,7 @@ import type { ContentTopicInfo, IMessage, LightNode } from "@waku/interfaces"; import { createLightNode, Protocols } from "@waku/sdk"; import { contentTopicToPubsubTopic, + pubsubTopicToSingleShardInfo, singleShardInfosToShardInfo } from "@waku/utils"; import { expect } from "chai"; @@ -200,12 +201,14 @@ describe("Waku Store (Autosharding), custom pubsub topic", function () { clusterId, contentTopics: [customContentTopic1] }; - const customDecoder1 = createDecoder(customContentTopic1, { - clusterId - }); - const customDecoder2 = createDecoder(customContentTopic2, { - clusterId - }); + const customDecoder1 = createDecoder( + customContentTopic1, + pubsubTopicToSingleShardInfo(autoshardingPubsubTopic1) + ); + const customDecoder2 = createDecoder( + customContentTopic2, + pubsubTopicToSingleShardInfo(autoshardingPubsubTopic2) + ); const contentTopicInfoBothShards: ContentTopicInfo = { clusterId, contentTopics: [customContentTopic1, customContentTopic2] diff --git a/packages/utils/src/common/sharding.ts b/packages/utils/src/common/sharding.ts index 18ee894eb4..6e2b6de68b 100644 --- a/packages/utils/src/common/sharding.ts +++ b/packages/utils/src/common/sharding.ts @@ -1,5 +1,6 @@ import { sha256 } from "@noble/hashes/sha256"; import { + DEFAULT_CLUSTER_ID, DefaultPubsubTopic, PubsubTopic, ShardInfo, @@ -39,12 +40,9 @@ export const singleShardInfosToShardInfo = ( }; export const shardInfoToPubsubTopics = ( - shardInfo: ShardingParams + shardInfo: Partial ): PubsubTopic[] => { - if (shardInfo.clusterId === undefined) - throw new Error("Cluster ID must be specified"); - - if ("contentTopics" in shardInfo) { + if ("contentTopics" in shardInfo && shardInfo.contentTopics) { // Autosharding: explicitly defined content topics return Array.from( new Set( @@ -59,17 +57,20 @@ export const shardInfoToPubsubTopics = ( return Array.from( new Set( shardInfo.shards.map( - (index) => `/waku/2/rs/${shardInfo.clusterId}/${index}` + (index) => + `/waku/2/rs/${shardInfo.clusterId ?? DEFAULT_CLUSTER_ID}/${index}` ) ) ); - } else { + } else if ("application" in shardInfo && "version" in shardInfo) { // Autosharding: single shard from application and version return [ contentTopicToPubsubTopic( `/${shardInfo.application}/${shardInfo.version}/default/default` ) ]; + } else { + throw new Error("Missing required configuration in shard parameters"); } }; @@ -184,7 +185,7 @@ export function contentTopicToShardIndex( export function contentTopicToPubsubTopic( contentTopic: string, - clusterId: number = 1, + clusterId: number = DEFAULT_CLUSTER_ID, networkShards: number = 8 ): string { const shardIndex = contentTopicToShardIndex(contentTopic, networkShards); @@ -197,7 +198,7 @@ export function contentTopicToPubsubTopic( */ export function contentTopicsByPubsubTopic( contentTopics: string[], - clusterId: number = 1, + clusterId: number = DEFAULT_CLUSTER_ID, networkShards: number = 8 ): Map> { const groupedContentTopics = new Map(); @@ -237,3 +238,74 @@ export function determinePubsubTopic( : DefaultPubsubTopic; } } + +/** + * Validates sharding configuration and sets defaults where possible. + * @returns Validated sharding parameters, with any missing values set to defaults + */ +export const ensureShardingConfigured = ( + shardInfo: Partial +): { + shardingParams: ShardingParams; + shardInfo: ShardInfo; + pubsubTopics: PubsubTopic[]; +} => { + const clusterId = shardInfo.clusterId ?? DEFAULT_CLUSTER_ID; + const shards = "shards" in shardInfo ? shardInfo.shards : []; + const contentTopics = + "contentTopics" in shardInfo ? shardInfo.contentTopics : []; + const [application, version] = + "application" in shardInfo && "version" in shardInfo + ? [shardInfo.application, shardInfo.version] + : [undefined, undefined]; + + const isShardsConfigured = shards && shards.length > 0; + const isContentTopicsConfigured = contentTopics && contentTopics.length > 0; + const isApplicationVersionConfigured = application && version; + + if (isShardsConfigured) { + return { + shardingParams: { clusterId, shards }, + shardInfo: { clusterId, shards }, + pubsubTopics: shardInfoToPubsubTopics({ clusterId, shards }) + }; + } + + if (isContentTopicsConfigured) { + const pubsubTopics = Array.from( + new Set( + contentTopics.map((topic) => + contentTopicToPubsubTopic(topic, clusterId) + ) + ) + ); + const shards = Array.from( + new Set( + contentTopics.map((topic) => contentTopicToShardIndex(topic, clusterId)) + ) + ); + return { + shardingParams: { clusterId, contentTopics }, + shardInfo: { clusterId, shards }, + pubsubTopics + }; + } + + if (isApplicationVersionConfigured) { + const pubsubTopic = contentTopicToPubsubTopic( + `/${application}/${version}/default/default` + ); + return { + shardingParams: { clusterId, application, version }, + shardInfo: { + clusterId, + shards: [pubsubTopicToSingleShardInfo(pubsubTopic).shard] + }, + pubsubTopics: [pubsubTopic] + }; + } + + throw new Error( + "Missing minimum required configuration options for static sharding or autosharding." + ); +}; diff --git a/packages/utils/src/libp2p/index.ts b/packages/utils/src/libp2p/index.ts index d562cc49a6..04c37c7d6d 100644 --- a/packages/utils/src/libp2p/index.ts +++ b/packages/utils/src/libp2p/index.ts @@ -1,5 +1,5 @@ import type { Connection, Peer, PeerStore } from "@libp2p/interface"; -import { ShardingParams } from "@waku/interfaces"; +import { ShardInfo } from "@waku/interfaces"; import { bytesToUtf8 } from "../bytes/index.js"; import { decodeRelayShard } from "../common/relay_shard_codec.js"; @@ -74,7 +74,7 @@ export async function getConnectedPeersForProtocolAndShard( connections: Connection[], peerStore: PeerStore, protocols: string[], - shardInfo?: ShardingParams + shardInfo?: ShardInfo ): Promise { const openConnections = connections.filter( (connection) => connection.status === "open"