diff --git a/packages/core/src/lib/connection_manager/shard_reader.spec.ts b/packages/core/src/lib/connection_manager/shard_reader.spec.ts index 79f058a1d7..b618954d06 100644 --- a/packages/core/src/lib/connection_manager/shard_reader.spec.ts +++ b/packages/core/src/lib/connection_manager/shard_reader.spec.ts @@ -6,7 +6,7 @@ import { PubsubTopic, ShardInfo } from "@waku/interfaces"; -import { contentTopicToShardIndex, encodeRelayShard } from "@waku/utils"; +import { AutoShardingRoutingInfo, encodeRelayShard } from "@waku/utils"; import { expect } from "chai"; import { Libp2p } from "libp2p"; import sinon from "sinon"; @@ -29,16 +29,17 @@ describe("ShardReader", function () { const testContentTopic = "/test/1/waku-light-push/utf8"; const testClusterId = 3; - const testShardIndex = contentTopicToShardIndex( - testContentTopic, - DEFAULT_NUM_SHARDS - ); const testNetworkConfig: AutoSharding = { clusterId: testClusterId, numShardsInCluster: DEFAULT_NUM_SHARDS }; + const testShardIndex = AutoShardingRoutingInfo.fromContentTopic( + testContentTopic, + testNetworkConfig + ).shardId; + const testShardInfo: ShardInfo = { clusterId: testClusterId, shards: [testShardIndex] diff --git a/packages/core/src/lib/connection_manager/shard_reader.ts b/packages/core/src/lib/connection_manager/shard_reader.ts index 0867795ca4..89f95bc00d 100644 --- a/packages/core/src/lib/connection_manager/shard_reader.ts +++ b/packages/core/src/lib/connection_manager/shard_reader.ts @@ -6,11 +6,7 @@ import type { ShardId, ShardInfo } from "@waku/interfaces"; -import { - decodeRelayShard, - Logger, - pubsubTopicToSingleShardInfo -} from "@waku/utils"; +import { decodeRelayShard, Logger } from "@waku/utils"; import { Libp2p } from "libp2p"; const log = new Logger("shard-reader"); @@ -65,7 +61,7 @@ export class ShardReader implements IShardReader { pubsubTopic: PubsubTopic ): Promise { try { - const { clusterId, shard } = pubsubTopicToSingleShardInfo(pubsubTopic); + const { clusterId, shard } = this.parsePubsubTopic(pubsubTopic); if (clusterId !== this.clusterId) return false; return await this.isPeerOnShard(id, shard); } catch (error) { @@ -93,6 +89,34 @@ export class ShardReader implements IShardReader { ); } + private parsePubsubTopic(pubsubTopic: PubsubTopic): { + clusterId: ClusterId; + shard: ShardId; + } { + const parts = pubsubTopic.split("/"); + + if ( + parts.length !== 6 || + parts[1] !== "waku" || + parts[2] !== "2" || + parts[3] !== "rs" + ) { + throw new Error("Invalid pubsub topic"); + } + + const clusterId = parseInt(parts[4], 10); + const shard = parseInt(parts[5], 10); + + if (isNaN(clusterId) || isNaN(shard)) { + throw new Error("Invalid clusterId or shard"); + } + + return { + clusterId, + shard + }; + } + private async getRelayShards(id: PeerId): Promise { try { const peer = await this.libp2p.peerStore.get(id); diff --git a/packages/interfaces/src/index.ts b/packages/interfaces/src/index.ts index 042a27dbd5..9a00b37674 100644 --- a/packages/interfaces/src/index.ts +++ b/packages/interfaces/src/index.ts @@ -18,3 +18,4 @@ export * from "./constants.js"; export * from "./local_storage.js"; export * from "./sharding.js"; export * from "./health_status.js"; +export type { ShardInfo } from "./sharding.js"; diff --git a/packages/reliability-tests/tests/high-throughput.spec.ts b/packages/reliability-tests/tests/high-throughput.spec.ts index 7a11e9caf9..46151e3dd1 100644 --- a/packages/reliability-tests/tests/high-throughput.spec.ts +++ b/packages/reliability-tests/tests/high-throughput.spec.ts @@ -1,10 +1,6 @@ import { LightNode, Protocols } from "@waku/interfaces"; import { createDecoder, createLightNode, utf8ToBytes } from "@waku/sdk"; -import { - contentTopicToPubsubTopic, - createRoutingInfo, - delay -} from "@waku/utils"; +import { createRoutingInfo, delay } from "@waku/utils"; import { expect } from "chai"; import { @@ -64,13 +60,7 @@ describe("High Throughput Messaging", function () { await delay(1000); - await nwaku.ensureSubscriptions([ - contentTopicToPubsubTopic( - ContentTopic, - NetworkConfig.clusterId, - NetworkConfig.numShardsInCluster - ) - ]); + await nwaku.ensureSubscriptions([RoutingInfo.pubsubTopic]); waku = await createLightNode({ networkConfig: NetworkConfig }); await waku.start(); diff --git a/packages/reliability-tests/tests/longevity.spec.ts b/packages/reliability-tests/tests/longevity.spec.ts index e0ec05678f..f95990d7ce 100644 --- a/packages/reliability-tests/tests/longevity.spec.ts +++ b/packages/reliability-tests/tests/longevity.spec.ts @@ -1,10 +1,6 @@ import { LightNode, Protocols } from "@waku/interfaces"; import { createDecoder, createLightNode, utf8ToBytes } from "@waku/sdk"; -import { - contentTopicToPubsubTopic, - createRoutingInfo, - delay -} from "@waku/utils"; +import { AutoShardingRoutingInfo, createRoutingInfo, delay } from "@waku/utils"; import { expect } from "chai"; import { @@ -62,11 +58,8 @@ describe("Longevity", function () { ); await nwaku.ensureSubscriptions([ - contentTopicToPubsubTopic( - ContentTopic, - networkConfig.clusterId, - networkConfig.numShardsInCluster - ) + AutoShardingRoutingInfo.fromContentTopic(ContentTopic, networkConfig) + .pubsubTopic ]); waku = await createLightNode({ networkConfig }); diff --git a/packages/reliability-tests/tests/throughput-sizes.spec.ts b/packages/reliability-tests/tests/throughput-sizes.spec.ts index 7044176223..779289ba6d 100644 --- a/packages/reliability-tests/tests/throughput-sizes.spec.ts +++ b/packages/reliability-tests/tests/throughput-sizes.spec.ts @@ -1,10 +1,6 @@ import { LightNode, Protocols } from "@waku/interfaces"; import { createDecoder, createLightNode, utf8ToBytes } from "@waku/sdk"; -import { - contentTopicToPubsubTopic, - createRoutingInfo, - delay -} from "@waku/utils"; +import { AutoShardingRoutingInfo, createRoutingInfo, delay } from "@waku/utils"; import { expect } from "chai"; import { @@ -68,11 +64,8 @@ describe("Throughput Sanity Checks - Different Message Sizes", function () { await delay(1000); await nwaku.ensureSubscriptions([ - contentTopicToPubsubTopic( - ContentTopic, - networkConfig.clusterId, - networkConfig.numShardsInCluster - ) + AutoShardingRoutingInfo.fromContentTopic(ContentTopic, networkConfig) + .pubsubTopic ]); waku = await createLightNode({ networkConfig }); diff --git a/packages/tests/src/lib/service_node.ts b/packages/tests/src/lib/service_node.ts index a71526f9dc..79e6744923 100644 --- a/packages/tests/src/lib/service_node.ts +++ b/packages/tests/src/lib/service_node.ts @@ -3,11 +3,11 @@ import { peerIdFromString } from "@libp2p/peer-id"; import { Multiaddr, multiaddr } from "@multiformats/multiaddr"; import { ContentTopic, PubsubTopic } from "@waku/interfaces"; import { - formatPubsubTopic, isAutoSharding, isDefined, isStaticSharding, - RoutingInfo + RoutingInfo, + StaticShardingRoutingInfo } from "@waku/utils"; import { Logger } from "@waku/utils"; import pRetry from "p-retry"; @@ -279,10 +279,10 @@ export class ServiceNode { if (this.args?.shard) { if (this.args?.shard.length > 1) throw "More that one shard passed, not supported"; - const pubsubTopic = formatPubsubTopic( - this.args.clusterId ?? DefaultTestNetworkConfig.clusterId, - this.args?.shard[0] - ); + const pubsubTopic = StaticShardingRoutingInfo.fromShard( + this.args?.shard[0], + { clusterId: this.args.clusterId ?? DefaultTestNetworkConfig.clusterId } + ).pubsubTopic; return this.pubsubTopicMessages(pubsubTopic); } diff --git a/packages/tests/src/utils/nodes.ts b/packages/tests/src/utils/nodes.ts index 26f29e27c3..b0305759bf 100644 --- a/packages/tests/src/utils/nodes.ts +++ b/packages/tests/src/utils/nodes.ts @@ -6,10 +6,10 @@ import { } from "@waku/interfaces"; import { createLightNode } from "@waku/sdk"; import { - contentTopicToPubsubTopic, - formatPubsubTopic, + AutoShardingRoutingInfo, isAutoShardingRoutingInfo, - RoutingInfo + RoutingInfo, + StaticShardingRoutingInfo } from "@waku/utils"; import { Context } from "mocha"; import pRetry from "p-retry"; @@ -75,7 +75,11 @@ export async function runMultipleNodes( if (customArgs?.shard) { const shards = customArgs?.shard ?? []; for (const s of shards) { - pubsubTopics.push(formatPubsubTopic(routingInfo.clusterId, s)); + pubsubTopics.push( + StaticShardingRoutingInfo.fromShard(s, { + clusterId: routingInfo.clusterId + }).pubsubTopic + ); } } @@ -83,11 +87,8 @@ export async function runMultipleNodes( const contentTopics = customArgs?.contentTopic ?? []; for (const ct of contentTopics) { pubsubTopics.push( - contentTopicToPubsubTopic( - ct, - routingInfo.clusterId, - routingInfo.networkConfig.numShardsInCluster - ) + AutoShardingRoutingInfo.fromContentTopic(ct, routingInfo.networkConfig) + .pubsubTopic ); } } diff --git a/packages/tests/tests/filter/subscribe-static-sharding.node.spec.ts b/packages/tests/tests/filter/subscribe-static-sharding.node.spec.ts index fa97915584..9009a5dd4f 100644 --- a/packages/tests/tests/filter/subscribe-static-sharding.node.spec.ts +++ b/packages/tests/tests/filter/subscribe-static-sharding.node.spec.ts @@ -1,7 +1,7 @@ import { createDecoder, createEncoder } from "@waku/core"; import { LightNode } from "@waku/interfaces"; import { Protocols, utf8ToBytes } from "@waku/sdk"; -import { createRoutingInfo, formatPubsubTopic } from "@waku/utils"; +import { createRoutingInfo, StaticShardingRoutingInfo } from "@waku/utils"; import { afterEachCustom, @@ -78,7 +78,9 @@ const runTests = (strictCheckNodes: boolean): void => { await waku.waitForPeers([Protocols.Filter, Protocols.LightPush]); await nwaku2.ensureSubscriptions([ - formatPubsubTopic(TestClusterId, shardId) + StaticShardingRoutingInfo.fromShard(shardId, { + clusterId: TestClusterId + }).pubsubTopic ]); const messageCollector2 = new MessageCollector(); diff --git a/packages/tests/tests/filter/utils.ts b/packages/tests/tests/filter/utils.ts index 111ad5f420..5b35fe2054 100644 --- a/packages/tests/tests/filter/utils.ts +++ b/packages/tests/tests/filter/utils.ts @@ -1,6 +1,6 @@ import { createDecoder, createEncoder } from "@waku/core"; import { - contentTopicToShardIndex, + AutoShardingRoutingInfo, createRoutingInfo, Logger } from "@waku/utils"; @@ -11,14 +11,14 @@ export const log = new Logger("test:filter"); export const TestContentTopic = "/test/1/waku-filter/default"; export const TestClusterId = 2; export const TestNumShardsInCluster = 8; -export const TestShardIndex = contentTopicToShardIndex( - TestContentTopic, - TestNumShardsInCluster -); export const TestNetworkConfig = { clusterId: TestClusterId, numShardsInCluster: TestNumShardsInCluster }; +export const TestShardIndex = AutoShardingRoutingInfo.fromContentTopic( + TestContentTopic, + TestNetworkConfig +).shardId; export const TestRoutingInfo = createRoutingInfo(TestNetworkConfig, { contentTopic: TestContentTopic }); diff --git a/packages/tests/tests/sharding/auto_sharding.spec.ts b/packages/tests/tests/sharding/auto_sharding.spec.ts index 306c211a11..9e739542f9 100644 --- a/packages/tests/tests/sharding/auto_sharding.spec.ts +++ b/packages/tests/tests/sharding/auto_sharding.spec.ts @@ -1,6 +1,6 @@ import { AutoSharding, LightNode } from "@waku/interfaces"; import { createEncoder, utf8ToBytes } from "@waku/sdk"; -import { contentTopicToPubsubTopic, createRoutingInfo } from "@waku/utils"; +import { AutoShardingRoutingInfo, createRoutingInfo } from "@waku/utils"; import { expect } from "chai"; import { @@ -136,7 +136,10 @@ describe("Autosharding: Running Nodes", function () { it("Wrong topic", async function () { const wrongTopic = "wrong_format"; try { - contentTopicToPubsubTopic(wrongTopic, clusterId, 8); + AutoShardingRoutingInfo.fromContentTopic(wrongTopic, { + clusterId, + numShardsInCluster: 8 + }); throw new Error("Wrong topic should've thrown an error"); } catch (err) { if ( diff --git a/packages/tests/tests/sharding/peer_management.spec.ts b/packages/tests/tests/sharding/peer_management.spec.ts index a02c973e3c..7af9171ed1 100644 --- a/packages/tests/tests/sharding/peer_management.spec.ts +++ b/packages/tests/tests/sharding/peer_management.spec.ts @@ -3,7 +3,7 @@ import type { PeerId } from "@libp2p/interface"; import { wakuPeerExchangeDiscovery } from "@waku/discovery"; import type { AutoSharding, StaticSharding } from "@waku/interfaces"; import { createLightNode, LightNode, Tags } from "@waku/sdk"; -import { contentTopicToShardIndex } from "@waku/utils"; +import { AutoShardingRoutingInfo } from "@waku/utils"; import chai, { expect } from "chai"; import chaiAsPromised from "chai-as-promised"; import Sinon, { SinonSpy } from "sinon"; @@ -201,7 +201,14 @@ describe("Autosharding: Peer Management", function () { const ContentTopic = "/myapp/1/latest/proto"; const clusterId = 8; const numShardsInCluster = 8; - const Shard = [contentTopicToShardIndex(ContentTopic, numShardsInCluster)]; + const networkConfig: AutoSharding = { + clusterId, + numShardsInCluster + }; + const Shard = [ + AutoShardingRoutingInfo.fromContentTopic(ContentTopic, networkConfig) + .shardId + ]; describe("Peer Exchange", function () { let waku: LightNode; diff --git a/packages/tests/tests/store/error_handling.node.spec.ts b/packages/tests/tests/store/error_handling.node.spec.ts index ebb554fd6f..e84f04c44b 100644 --- a/packages/tests/tests/store/error_handling.node.spec.ts +++ b/packages/tests/tests/store/error_handling.node.spec.ts @@ -1,5 +1,5 @@ import { IMessage, type LightNode } from "@waku/interfaces"; -import { formatPubsubTopic } from "@waku/utils"; +import { StaticShardingRoutingInfo } from "@waku/utils"; import { expect } from "chai"; import { @@ -68,7 +68,9 @@ describe("Waku Store, error handling", function () { }); it("Query Generator, No message returned", async function () { - const WrongTestPubsubTopic = formatPubsubTopic(43, 53); + const WrongTestPubsubTopic = StaticShardingRoutingInfo.fromShard(53, { + clusterId: 43 + }).pubsubTopic; const messages = await processQueriedMessages( waku, [TestDecoder], diff --git a/packages/tests/tests/wait_for_remote_peer.node.spec.ts b/packages/tests/tests/wait_for_remote_peer.node.spec.ts index 55e15160b3..280f9b077c 100644 --- a/packages/tests/tests/wait_for_remote_peer.node.spec.ts +++ b/packages/tests/tests/wait_for_remote_peer.node.spec.ts @@ -2,7 +2,7 @@ import type { LightNode, RelayNode } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces"; import { createRelayNode } from "@waku/relay"; import { createLightNode } from "@waku/sdk"; -import { formatPubsubTopic } from "@waku/utils"; +import { StaticShardingRoutingInfo } from "@waku/utils"; import { expect } from "chai"; import { @@ -261,7 +261,9 @@ describe("Wait for remote peer", function () { [nwaku, waku1] = await runRelayNodes(this, { clusterId: 0 }, [0]); const multiAddrWithId = await nwaku.getMultiaddrWithId(); - const peers = waku1.relay.getMeshPeers(formatPubsubTopic(0, 0)); + const peers = waku1.relay.getMeshPeers( + StaticShardingRoutingInfo.fromShard(0, { clusterId: 0 }).pubsubTopic + ); const nimPeerId = multiAddrWithId.getPeerId(); diff --git a/packages/utils/src/common/sharding/index.spec.ts b/packages/utils/src/common/sharding/index.spec.ts deleted file mode 100644 index 28b5aeba2e..0000000000 --- a/packages/utils/src/common/sharding/index.spec.ts +++ /dev/null @@ -1,369 +0,0 @@ -import { DEFAULT_CLUSTER_ID } from "@waku/interfaces"; -import { expect } from "chai"; - -import { - contentTopicsByPubsubTopic, - contentTopicToPubsubTopic, - contentTopicToShardIndex, - ensureValidContentTopic, - pubsubTopicToSingleShardInfo -} from "./index.js"; - -const ClusterId = 0; -const NumShardsInCluster = 8; - -const testInvalidCases = ( - contentTopics: string[], - expectedError: string -): void => { - for (const invalidTopic of contentTopics) { - expect(() => ensureValidContentTopic(invalidTopic)).to.throw(expectedError); - } -}; - -describe("ensureValidContentTopic", () => { - it("does not throw on valid cases", () => { - const validTopics = [ - "/0/myapp/1/mytopic/cbor", - "/myapp/1/mytopic/cbor", - "/myapp/v1.1/mytopic/cbor" - ]; - for (const validTopic of validTopics) { - expect(() => ensureValidContentTopic(validTopic)).to.not.throw; - } - }); - it("throws on empty content topic", () => { - testInvalidCases(["", " ", " "], "Content topic format is invalid"); - }); - - it("throws on content topic with too few or too many fields", () => { - testInvalidCases( - [ - "myContentTopic", - "myapp1mytopiccbor/", - " /myapp/1/mytopic", - "/myapp/1/mytopic", - "/0/myapp/1/mytopic/cbor/extra" - ], - "Content topic format is invalid" - ); - }); - - it("throws on content topic with non-number generation field", () => { - testInvalidCases( - [ - "/a/myapp/1/mytopic/cbor", - "/ /myapp/1/mytopic/cbor", - "/_/myapp/1/mytopic/cbor", - "/$/myapp/1/mytopic/cbor" - ], - "Invalid generation field in content topic" - ); - }); - - // Note that this test case should be removed once Waku supports other generations - it("throws on content topic with generation field greater than 0", () => { - testInvalidCases( - [ - "/1/myapp/1/mytopic/cbor", - "/2/myapp/1/mytopic/cbor", - "/3/myapp/1/mytopic/cbor", - "/1000/myapp/1/mytopic/cbor", - "/1/toychat/2/huilong/proto", - "/1/statusim/1/community/cbor" - ], - "Generation greater than 0 is not supported" - ); - }); - - it("throws on content topic with empty application field", () => { - testInvalidCases( - ["/0//1/mytopic/cbor"], - "Application field cannot be empty" - ); - }); - - it("throws on content topic with empty version field", () => { - testInvalidCases( - ["/0/myapp//mytopic/cbor"], - "Version field cannot be empty" - ); - }); - - it("throws on content topic with empty topic name field", () => { - testInvalidCases(["/0/myapp/1//cbor"], "Topic name field cannot be empty"); - }); - - it("throws on content topic with empty encoding field", () => { - testInvalidCases(["/0/myapp/1/mytopic/"], "Encoding field cannot be empty"); - }); -}); - -describe("contentTopicToShardIndex", () => { - const contentTopicsWithExpectedShards: [string, number][] = [ - ["/toychat/2/huilong/proto", 3], - ["/myapp/1/latest/proto", 0], - ["/waku/2/content/test.js", 1], - ["/toychat/2/huilong/proto", 3], - ["/0/toychat/2/huilong/proto", 3], - ["/statusim/1/community/cbor", 4], - ["/0/statusim/1/community/cbor", 4], - ["/app/22/sometopic/someencoding", 2], - ["/app/27/sometopic/someencoding", 5], - ["/app/20/sometopic/someencoding", 7], - ["/app/29/sometopic/someencoding", 6] - ]; - contentTopicsWithExpectedShards.forEach(([topic, expectedShard]) => { - it(`should correctly map ${topic} to shard index ${expectedShard}`, () => { - expect(contentTopicToShardIndex(topic, NumShardsInCluster)).to.eq( - expectedShard - ); - }); - }); - - const testCases: [number, string, number][] = [ - [16, "/app/20/sometopic/someencoding", 15], - [2, "/app/20/sometopic/someencoding", 1], - [1, "/app/20/sometopic/someencoding", 0] - ]; - - testCases.forEach(([networkShards, topic, expectedShard]) => { - it(`should correctly map ${topic} to shard index ${expectedShard} with networkShards ${networkShards}`, () => { - expect(contentTopicToShardIndex(topic, networkShards)).to.eq( - expectedShard - ); - }); - }); - - it("topics with same application and version share the same shard", () => { - const contentTopics: [string, string][] = [ - ["/toychat/2/huilong/proto", "/toychat/2/othertopic/otherencoding"], - ["/myapp/1/latest/proto", "/myapp/1/new/proto"], - ["/waku/2/content/test.js", "/waku/2/users/proto"] - ]; - for (const [topic1, topic2] of contentTopics) { - expect(contentTopicToShardIndex(topic1, NumShardsInCluster)).to.eq( - contentTopicToShardIndex(topic2, NumShardsInCluster) - ); - } - }); -}); - -describe("contentTopicsByPubsubTopic", () => { - it("groups content topics by expected pubsub topic", () => { - const contentTopics = ["/toychat/2/huilong/proto", "/myapp/1/latest/proto"]; - const grouped = contentTopicsByPubsubTopic( - contentTopics, - ClusterId, - NumShardsInCluster - ); - - for (const contentTopic of contentTopics) { - const pubsubTopic = contentTopicToPubsubTopic(contentTopic, 0, 8); - - expect(grouped.get(pubsubTopic)?.includes(contentTopic)).to.be.true; - } - }); - - it("groups multiple content topics into the same pubsub topic when they share the same shard index", () => { - const contentTopics = [ - "/app/22/sometopic/someencoding", - "/app/22/anothertopic/otherencoding" - ]; - const grouped = contentTopicsByPubsubTopic( - contentTopics, - ClusterId, - NumShardsInCluster - ); - expect(grouped.size).to.eq(1); // Only one pubsub topic expected - const pubsubTopic = contentTopicToPubsubTopic(contentTopics[0], 0, 8); - expect(grouped.get(pubsubTopic)?.length).to.eq(2); // Both topics should be grouped under the same pubsub topic - }); - - it("handles different clusterIds correctly", () => { - const contentTopics = ["/app/22/sometopic/someencoding"]; - const clusterId1 = 3; - const clusterId2 = 2; - const grouped1 = contentTopicsByPubsubTopic( - contentTopics, - clusterId1, - NumShardsInCluster - ); - const grouped2 = contentTopicsByPubsubTopic( - contentTopics, - clusterId2, - NumShardsInCluster - ); - const pubsubTopic1 = contentTopicToPubsubTopic( - contentTopics[0], - clusterId1, - 8 - ); - const pubsubTopic2 = contentTopicToPubsubTopic( - contentTopics[0], - clusterId2, - 8 - ); - expect(pubsubTopic1).not.to.equal(pubsubTopic2); - expect(grouped1.has(pubsubTopic1)).to.be.true; - expect(grouped1.has(pubsubTopic2)).to.be.false; - expect(grouped2.has(pubsubTopic1)).to.be.false; - expect(grouped2.has(pubsubTopic2)).to.be.true; - }); - - it("handles different networkShards values correctly", () => { - const contentTopics = ["/app/20/sometopic/someencoding"]; - const networkShards1 = 8; - const networkShards2 = 16; - const grouped1 = contentTopicsByPubsubTopic( - contentTopics, - DEFAULT_CLUSTER_ID, - networkShards1 - ); - const grouped2 = contentTopicsByPubsubTopic( - contentTopics, - DEFAULT_CLUSTER_ID, - networkShards2 - ); - const pubsubTopic1 = contentTopicToPubsubTopic( - contentTopics[0], - DEFAULT_CLUSTER_ID, - networkShards1 - ); - const pubsubTopic2 = contentTopicToPubsubTopic( - contentTopics[0], - DEFAULT_CLUSTER_ID, - networkShards2 - ); - expect(pubsubTopic1).not.to.equal(pubsubTopic2); - expect(grouped1.has(pubsubTopic1)).to.be.true; - expect(grouped1.has(pubsubTopic2)).to.be.false; - expect(grouped2.has(pubsubTopic1)).to.be.false; - expect(grouped2.has(pubsubTopic2)).to.be.true; - }); - - it("throws an error for improperly formatted content topics", () => { - const invalidContentTopics = ["/invalid/format"]; - expect(() => - contentTopicsByPubsubTopic( - invalidContentTopics, - ClusterId, - NumShardsInCluster - ) - ).to.throw(); - }); -}); - -describe("pubsubTopicToSingleShardInfo with various invalid formats", () => { - const invalidTopics = [ - "/waku/1/rs/1/2", // Invalid Waku version - "/waku/2/r/1/2", // Invalid path segment - "/incorrect/format", // Completely incorrect format - "/waku/2/rs", // Missing both clusterId and shard - "/waku/2/rs/1/2/extra" // Extra trailing data - ]; - - it("should extract SingleShardInfo from a valid PubsubTopic", () => { - const topic = "/waku/2/rs/2/2"; - const expectedInfo = { clusterId: 2, shard: 2 }; - expect(pubsubTopicToSingleShardInfo(topic)).to.deep.equal(expectedInfo); - }); - - invalidTopics.forEach((topic) => { - it(`should throw an error for invalid PubsubTopic format: ${topic}`, () => { - expect(() => pubsubTopicToSingleShardInfo(topic)).to.throw( - "Invalid pubsub topic" - ); - }); - }); - - const nonNumericValues = ["x", "y", "$", "!", "\\", "-", "", " "]; - nonNumericValues.forEach((value) => { - it(`should throw an error for non-numeric clusterId: /waku/2/rs/${value}/1`, () => { - expect(() => - pubsubTopicToSingleShardInfo(`/waku/2/rs/${value}/1`) - ).to.throw("Invalid clusterId or shard"); - }); - - it(`should throw an error for non-numeric shard: /waku/2/rs/1/${value}`, () => { - expect(() => - pubsubTopicToSingleShardInfo(`/waku/2/rs/1/${value}`) - ).to.throw("Invalid clusterId or shard"); - }); - }); -}); - -// describe("ensureShardingConfigured", () => { -// it("should return valid sharding parameters for static sharding", () => { -// const shardInfo = { clusterId: 1, shards: [0, 1] }; -// const result = ensureShardingConfigured(shardInfo); -// expect(result.shardInfo).to.deep.include({ -// clusterId: 1, -// shards: [0, 1] -// }); -// expect(result.shardInfo).to.deep.include({ clusterId: 1, shards: [0, 1] }); -// expect(result.pubsubTopics).to.have.members([ -// "/waku/2/rs/1/0", -// "/waku/2/rs/1/1" -// ]); -// }); -// -// it("should return valid sharding parameters for content topics autosharding", () => { -// const contentTopicInfo = { contentTopics: ["/app/v1/topic1/proto"] }; -// const result = ensureShardingConfigured(contentTopicInfo); -// const expectedPubsubTopic = contentTopicToPubsubTopic( -// "/app/v1/topic1/proto", -// DEFAULT_CLUSTER_ID -// ); -// expect(result.shardInfo.shards).to.include( -// contentTopicToShardIndex("/app/v1/topic1/proto") -// ); -// expect(result.pubsubTopics).to.include(expectedPubsubTopic); -// }); -// -// it("should throw an error for missing sharding configuration", () => { -// const shardInfo = {} as any as NetworkConfig; -// expect(() => ensureShardingConfigured(shardInfo)).to.throw(); -// }); -// -// it("handles empty shards array correctly", () => { -// const shardInfo = { clusterId: 1, shards: [] }; -// expect(() => ensureShardingConfigured(shardInfo)).to.throw(); -// }); -// -// it("handles empty contentTopics array correctly", () => { -// const shardInfo = { contentTopics: [] }; -// expect(() => ensureShardingConfigured(shardInfo)).to.throw(); -// }); -// }); -// -// describe("contentTopicToPubsubTopic", () => { -// it("should correctly map a content topic to a pubsub topic", () => { -// const contentTopic = "/app/v1/topic1/proto"; -// expect(contentTopicToPubsubTopic(contentTopic)).to.equal("/waku/2/rs/1/4"); -// }); -// -// it("should map different content topics to different pubsub topics based on shard index", () => { -// const contentTopic1 = "/app/v1/topic1/proto"; -// const contentTopic2 = "/app/v2/topic2/proto"; -// const pubsubTopic1 = contentTopicToPubsubTopic(contentTopic1); -// const pubsubTopic2 = contentTopicToPubsubTopic(contentTopic2); -// expect(pubsubTopic1).not.to.equal(pubsubTopic2); -// }); -// -// it("should use the provided clusterId for the pubsub topic", () => { -// const contentTopic = "/app/v1/topic1/proto"; -// const clusterId = 2; -// expect(contentTopicToPubsubTopic(contentTopic, clusterId)).to.equal( -// "/waku/2/rs/2/4" -// ); -// }); -// -// it("should correctly map a content topic to a pubsub topic for different network shard sizes", () => { -// const contentTopic = "/app/v1/topic1/proto"; -// const networkShards = 16; -// expect(contentTopicToPubsubTopic(contentTopic, 1, networkShards)).to.equal( -// "/waku/2/rs/1/4" -// ); -// }); -// }); diff --git a/packages/utils/src/common/sharding/index.ts b/packages/utils/src/common/sharding/index.ts index 495ea42c6e..b09c4a38b4 100644 --- a/packages/utils/src/common/sharding/index.ts +++ b/packages/utils/src/common/sharding/index.ts @@ -1,165 +1,2 @@ -import { sha256 } from "@noble/hashes/sha256"; -import { - type ClusterId, - ContentTopic, - PubsubTopic, - type ShardId -} from "@waku/interfaces"; - -import { concat, utf8ToBytes } from "../../bytes/index.js"; - export * from "./type_guards.js"; export * from "./routing_info.js"; - -export const formatPubsubTopic = ( - clusterId: ClusterId, - shard: ShardId -): PubsubTopic => { - return `/waku/2/rs/${clusterId}/${shard}`; -}; - -/** - * @deprecated will be removed - */ -export const pubsubTopicToSingleShardInfo = ( - pubsubTopics: PubsubTopic -): { clusterId: ClusterId; shard: ShardId } => { - const parts = pubsubTopics.split("/"); - - if ( - parts.length != 6 || - parts[1] !== "waku" || - parts[2] !== "2" || - parts[3] !== "rs" - ) - throw new Error("Invalid pubsub topic"); - - const clusterId = parseInt(parts[4]); - const shard = parseInt(parts[5]); - - if (isNaN(clusterId) || isNaN(shard)) - throw new Error("Invalid clusterId or shard"); - - return { - clusterId, - shard - }; -}; - -interface ParsedContentTopic { - generation: number; - application: string; - version: string; - topicName: string; - encoding: string; -} - -/** - * Given a string, will throw an error if it is not formatted as a valid content topic for autosharding based on https://rfc.vac.dev/spec/51/ - * @param contentTopic String to validate - * @returns Object with each content topic field as an attribute - */ -export function ensureValidContentTopic( - contentTopic: ContentTopic -): ParsedContentTopic { - const parts = (contentTopic as string).split("/"); - if (parts.length < 5 || parts.length > 6) { - throw Error(`Content topic format is invalid: ${contentTopic}`); - } - // Validate generation field if present - let generation = 0; - if (parts.length == 6) { - generation = parseInt(parts[1]); - if (isNaN(generation)) { - throw new Error( - `Invalid generation field in content topic: ${contentTopic}` - ); - } - if (generation > 0) { - throw new Error( - `Generation greater than 0 is not supported: ${contentTopic}` - ); - } - } - // Validate remaining fields - const fields = parts.splice(-4); - // Validate application field - if (fields[0].length == 0) { - throw new Error(`Application field cannot be empty: ${contentTopic}`); - } - // Validate version field - if (fields[1].length == 0) { - throw new Error(`Version field cannot be empty: ${contentTopic}`); - } - // Validate topic name field - if (fields[2].length == 0) { - throw new Error(`Topic name field cannot be empty: ${contentTopic}`); - } - // Validate encoding field - if (fields[3].length == 0) { - throw new Error(`Encoding field cannot be empty: ${contentTopic}`); - } - - return { - generation, - application: fields[0], - version: fields[1], - topicName: fields[2], - encoding: fields[3] - }; -} - -/** - * Given a string, determines which autoshard index to use for its pubsub topic. - * Based on the algorithm described in the RFC: https://rfc.vac.dev/spec/51//#algorithm - */ -export function contentTopicToShardIndex( - contentTopic: ContentTopic, - numShardsInCluster: number -): number { - const { application, version } = ensureValidContentTopic(contentTopic); - const digest = sha256( - concat([utf8ToBytes(application), utf8ToBytes(version)]) - ); - const dataview = new DataView(digest.buffer.slice(-8)); - return Number(dataview.getBigUint64(0, false) % BigInt(numShardsInCluster)); -} - -export function contentTopicToPubsubTopic( - contentTopic: ContentTopic, - clusterId: number, - numShardsInCluster: number -): string { - if (!contentTopic) { - throw Error("Content topic must be specified"); - } - - const shardIndex = contentTopicToShardIndex(contentTopic, numShardsInCluster); - return `/waku/2/rs/${clusterId}/${shardIndex}`; -} - -/** - * Given an array of content topics, groups them together by their Pubsub topic as derived using the algorithm for autosharding. - * If any of the content topics are not properly formatted, the function will throw an error. - */ -export function contentTopicsByPubsubTopic( - contentTopics: ContentTopic[], - clusterId: number, - networkShards: number -): Map> { - const groupedContentTopics = new Map(); - for (const contentTopic of contentTopics) { - const pubsubTopic = contentTopicToPubsubTopic( - contentTopic, - clusterId, - networkShards - ); - let topics = groupedContentTopics.get(pubsubTopic); - if (!topics) { - groupedContentTopics.set(pubsubTopic, []); - topics = groupedContentTopics.get(pubsubTopic); - } - topics.push(contentTopic); - } - return groupedContentTopics; -} diff --git a/packages/utils/src/common/sharding/routing_info.ts b/packages/utils/src/common/sharding/routing_info.ts index a51de7cfc3..f653c93aba 100644 --- a/packages/utils/src/common/sharding/routing_info.ts +++ b/packages/utils/src/common/sharding/routing_info.ts @@ -1,3 +1,4 @@ +import { sha256 } from "@noble/hashes/sha256"; import type { AutoSharding, ClusterId, @@ -9,25 +10,133 @@ import type { StaticSharding } from "@waku/interfaces"; -import { - contentTopicToShardIndex, - ensureValidContentTopic, - formatPubsubTopic, - isAutoSharding, - pubsubTopicToSingleShardInfo -} from "./index.js"; +import { concat, utf8ToBytes } from "../../bytes/index.js"; + +import { isAutoSharding } from "./index.js"; + +const formatPubsubTopic = ( + clusterId: ClusterId, + shard: ShardId +): PubsubTopic => { + return `/waku/2/rs/${clusterId}/${shard}`; +}; + +interface ParsedContentTopic { + generation: number; + application: string; + version: string; + topicName: string; + encoding: string; +} + +function ensureValidContentTopic( + contentTopic: ContentTopic +): ParsedContentTopic { + const parts = (contentTopic as string).split("/"); + if (parts.length < 5 || parts.length > 6) { + throw Error(`Content topic format is invalid: ${contentTopic}`); + } + // Validate generation field if present + let generation = 0; + if (parts.length == 6) { + generation = parseInt(parts[1]); + if (isNaN(generation)) { + throw new Error( + `Invalid generation field in content topic: ${contentTopic}` + ); + } + if (generation > 0) { + throw new Error( + `Generation greater than 0 is not supported: ${contentTopic}` + ); + } + } + // Validate remaining fields + const fields = parts.splice(-4); + // Validate application field + if (fields[0].length == 0) { + throw new Error(`Application field cannot be empty: ${contentTopic}`); + } + // Validate version field + if (fields[1].length == 0) { + throw new Error(`Version field cannot be empty: ${contentTopic}`); + } + // Validate topic name field + if (fields[2].length == 0) { + throw new Error(`Topic name field cannot be empty: ${contentTopic}`); + } + // Validate encoding field + if (fields[3].length == 0) { + throw new Error(`Encoding field cannot be empty: ${contentTopic}`); + } + + return { + generation, + application: fields[0], + version: fields[1], + topicName: fields[2], + encoding: fields[3] + }; +} + +function contentTopicToShardIndex( + contentTopic: ContentTopic, + numShardsInCluster: number +): number { + const { application, version } = ensureValidContentTopic(contentTopic); + const digest = sha256( + concat([utf8ToBytes(application), utf8ToBytes(version)]) + ); + const dataview = new DataView(digest.buffer.slice(-8)); + return Number(dataview.getBigUint64(0, false) % BigInt(numShardsInCluster)); +} + +/** + * @deprecated will be removed + */ +const pubsubTopicToSingleShardInfo = ( + pubsubTopics: PubsubTopic +): { clusterId: ClusterId; shard: ShardId } => { + const parts = pubsubTopics.split("/"); + + if ( + parts.length != 6 || + parts[1] !== "waku" || + parts[2] !== "2" || + parts[3] !== "rs" + ) + throw new Error("Invalid pubsub topic"); + + const clusterId = parseInt(parts[4]); + const shard = parseInt(parts[5]); + + if (isNaN(clusterId) || isNaN(shard)) + throw new Error("Invalid clusterId or shard"); + + return { + clusterId, + shard + }; +}; export type RoutingInfo = AutoShardingRoutingInfo | StaticShardingRoutingInfo; export abstract class BaseRoutingInfo { + public pubsubTopic: PubsubTopic; + public shardId: ShardId; + protected constructor( public networkConfig: NetworkConfig, - public pubsubTopic: PubsubTopic, - public shardId: ShardId - ) {} + pubsubTopic: PubsubTopic, + shardId: ShardId + ) { + this.pubsubTopic = pubsubTopic; + this.shardId = shardId; + } - public abstract get isAutoSharding(): boolean; - public abstract get isStaticSharding(): boolean; + public get clusterId(): ClusterId { + return this.networkConfig.clusterId; + } } export class AutoShardingRoutingInfo @@ -61,24 +170,12 @@ export class AutoShardingRoutingInfo */ private constructor( public networkConfig: AutoSharding, - public pubsubTopic: PubsubTopic, - public shardId: ShardId, + pubsubTopic: PubsubTopic, + shardId: ShardId, public contentTopic: string ) { super(networkConfig, pubsubTopic, shardId); } - - public get clusterId(): number { - return this.networkConfig.clusterId; - } - - public get isAutoSharding(): boolean { - return true; - } - - public get isStaticSharding(): boolean { - return false; - } } export class StaticShardingRoutingInfo @@ -127,35 +224,23 @@ export class StaticShardingRoutingInfo */ private constructor( public networkConfig: StaticSharding, - public pubsubTopic: PubsubTopic, - public shardId: ShardId + pubsubTopic: PubsubTopic, + shardId: ShardId ) { super(networkConfig, pubsubTopic, shardId); } - - public get clusterId(): ClusterId { - return this.networkConfig.clusterId; - } - - public get isAutoSharding(): boolean { - return false; - } - - public get isStaticSharding(): boolean { - return true; - } } export function isAutoShardingRoutingInfo( routingInfo: BaseRoutingInfo ): routingInfo is AutoShardingRoutingInfo { - return routingInfo.isAutoSharding; + return routingInfo instanceof AutoShardingRoutingInfo; } export function isStaticShardingRoutingInfo( routingInfo: BaseRoutingInfo ): routingInfo is StaticShardingRoutingInfo { - return routingInfo.isStaticSharding; + return routingInfo instanceof StaticShardingRoutingInfo; } export function createRoutingInfo(