From 78c856d0796a73848815b615bea24d3f5395da78 Mon Sep 17 00:00:00 2001 From: fryorcraken <110212804+fryorcraken@users.noreply.github.com> Date: Mon, 25 Aug 2025 10:49:05 +1000 Subject: [PATCH] fix: remove sharding circular dependency (#2590) --- packages/utils/src/common/sharding/index.ts | 164 +----------------- .../utils/src/common/sharding/routing_info.ts | 4 +- .../{index.spec.ts => topics.spec.ts} | 77 +------- packages/utils/src/common/sharding/topics.ts | 162 +++++++++++++++++ 4 files changed, 166 insertions(+), 241 deletions(-) rename packages/utils/src/common/sharding/{index.spec.ts => topics.spec.ts} (75%) create mode 100644 packages/utils/src/common/sharding/topics.ts diff --git a/packages/utils/src/common/sharding/index.ts b/packages/utils/src/common/sharding/index.ts index 495ea42c6e..c4e816f389 100644 --- a/packages/utils/src/common/sharding/index.ts +++ b/packages/utils/src/common/sharding/index.ts @@ -1,165 +1,3 @@ -import { sha256 } from "@noble/hashes/sha256"; -import { - type ClusterId, - ContentTopic, - PubsubTopic, - type ShardId -} from "@waku/interfaces"; - -import { concat, utf8ToBytes } from "../../bytes/index.js"; - export * from "./type_guards.js"; export * from "./routing_info.js"; - -export const formatPubsubTopic = ( - clusterId: ClusterId, - shard: ShardId -): PubsubTopic => { - return `/waku/2/rs/${clusterId}/${shard}`; -}; - -/** - * @deprecated will be removed - */ -export const pubsubTopicToSingleShardInfo = ( - pubsubTopics: PubsubTopic -): { clusterId: ClusterId; shard: ShardId } => { - const parts = pubsubTopics.split("/"); - - if ( - parts.length != 6 || - parts[1] !== "waku" || - parts[2] !== "2" || - parts[3] !== "rs" - ) - throw new Error("Invalid pubsub topic"); - - const clusterId = parseInt(parts[4]); - const shard = parseInt(parts[5]); - - if (isNaN(clusterId) || isNaN(shard)) - throw new Error("Invalid clusterId or shard"); - - return { - clusterId, - shard - }; -}; - -interface ParsedContentTopic { - generation: number; - application: string; - version: string; - topicName: string; - encoding: string; -} - -/** - * Given a string, will throw an error if it is not formatted as a valid content topic for autosharding based on https://rfc.vac.dev/spec/51/ - * @param contentTopic String to validate - * @returns Object with each content topic field as an attribute - */ -export function ensureValidContentTopic( - contentTopic: ContentTopic -): ParsedContentTopic { - const parts = (contentTopic as string).split("/"); - if (parts.length < 5 || parts.length > 6) { - throw Error(`Content topic format is invalid: ${contentTopic}`); - } - // Validate generation field if present - let generation = 0; - if (parts.length == 6) { - generation = parseInt(parts[1]); - if (isNaN(generation)) { - throw new Error( - `Invalid generation field in content topic: ${contentTopic}` - ); - } - if (generation > 0) { - throw new Error( - `Generation greater than 0 is not supported: ${contentTopic}` - ); - } - } - // Validate remaining fields - const fields = parts.splice(-4); - // Validate application field - if (fields[0].length == 0) { - throw new Error(`Application field cannot be empty: ${contentTopic}`); - } - // Validate version field - if (fields[1].length == 0) { - throw new Error(`Version field cannot be empty: ${contentTopic}`); - } - // Validate topic name field - if (fields[2].length == 0) { - throw new Error(`Topic name field cannot be empty: ${contentTopic}`); - } - // Validate encoding field - if (fields[3].length == 0) { - throw new Error(`Encoding field cannot be empty: ${contentTopic}`); - } - - return { - generation, - application: fields[0], - version: fields[1], - topicName: fields[2], - encoding: fields[3] - }; -} - -/** - * Given a string, determines which autoshard index to use for its pubsub topic. - * Based on the algorithm described in the RFC: https://rfc.vac.dev/spec/51//#algorithm - */ -export function contentTopicToShardIndex( - contentTopic: ContentTopic, - numShardsInCluster: number -): number { - const { application, version } = ensureValidContentTopic(contentTopic); - const digest = sha256( - concat([utf8ToBytes(application), utf8ToBytes(version)]) - ); - const dataview = new DataView(digest.buffer.slice(-8)); - return Number(dataview.getBigUint64(0, false) % BigInt(numShardsInCluster)); -} - -export function contentTopicToPubsubTopic( - contentTopic: ContentTopic, - clusterId: number, - numShardsInCluster: number -): string { - if (!contentTopic) { - throw Error("Content topic must be specified"); - } - - const shardIndex = contentTopicToShardIndex(contentTopic, numShardsInCluster); - return `/waku/2/rs/${clusterId}/${shardIndex}`; -} - -/** - * Given an array of content topics, groups them together by their Pubsub topic as derived using the algorithm for autosharding. - * If any of the content topics are not properly formatted, the function will throw an error. - */ -export function contentTopicsByPubsubTopic( - contentTopics: ContentTopic[], - clusterId: number, - networkShards: number -): Map> { - const groupedContentTopics = new Map(); - for (const contentTopic of contentTopics) { - const pubsubTopic = contentTopicToPubsubTopic( - contentTopic, - clusterId, - networkShards - ); - let topics = groupedContentTopics.get(pubsubTopic); - if (!topics) { - groupedContentTopics.set(pubsubTopic, []); - topics = groupedContentTopics.get(pubsubTopic); - } - topics.push(contentTopic); - } - return groupedContentTopics; -} +export * from "./topics.js"; diff --git a/packages/utils/src/common/sharding/routing_info.ts b/packages/utils/src/common/sharding/routing_info.ts index a51de7cfc3..45194ba360 100644 --- a/packages/utils/src/common/sharding/routing_info.ts +++ b/packages/utils/src/common/sharding/routing_info.ts @@ -13,9 +13,9 @@ import { contentTopicToShardIndex, ensureValidContentTopic, formatPubsubTopic, - isAutoSharding, pubsubTopicToSingleShardInfo -} from "./index.js"; +} from "./topics.js"; +import { isAutoSharding } from "./type_guards.js"; export type RoutingInfo = AutoShardingRoutingInfo | StaticShardingRoutingInfo; diff --git a/packages/utils/src/common/sharding/index.spec.ts b/packages/utils/src/common/sharding/topics.spec.ts similarity index 75% rename from packages/utils/src/common/sharding/index.spec.ts rename to packages/utils/src/common/sharding/topics.spec.ts index 28b5aeba2e..2460983aab 100644 --- a/packages/utils/src/common/sharding/index.spec.ts +++ b/packages/utils/src/common/sharding/topics.spec.ts @@ -7,7 +7,7 @@ import { contentTopicToShardIndex, ensureValidContentTopic, pubsubTopicToSingleShardInfo -} from "./index.js"; +} from "./topics.js"; const ClusterId = 0; const NumShardsInCluster = 8; @@ -292,78 +292,3 @@ describe("pubsubTopicToSingleShardInfo with various invalid formats", () => { }); }); }); - -// describe("ensureShardingConfigured", () => { -// it("should return valid sharding parameters for static sharding", () => { -// const shardInfo = { clusterId: 1, shards: [0, 1] }; -// const result = ensureShardingConfigured(shardInfo); -// expect(result.shardInfo).to.deep.include({ -// clusterId: 1, -// shards: [0, 1] -// }); -// expect(result.shardInfo).to.deep.include({ clusterId: 1, shards: [0, 1] }); -// expect(result.pubsubTopics).to.have.members([ -// "/waku/2/rs/1/0", -// "/waku/2/rs/1/1" -// ]); -// }); -// -// it("should return valid sharding parameters for content topics autosharding", () => { -// const contentTopicInfo = { contentTopics: ["/app/v1/topic1/proto"] }; -// const result = ensureShardingConfigured(contentTopicInfo); -// const expectedPubsubTopic = contentTopicToPubsubTopic( -// "/app/v1/topic1/proto", -// DEFAULT_CLUSTER_ID -// ); -// expect(result.shardInfo.shards).to.include( -// contentTopicToShardIndex("/app/v1/topic1/proto") -// ); -// expect(result.pubsubTopics).to.include(expectedPubsubTopic); -// }); -// -// it("should throw an error for missing sharding configuration", () => { -// const shardInfo = {} as any as NetworkConfig; -// expect(() => ensureShardingConfigured(shardInfo)).to.throw(); -// }); -// -// it("handles empty shards array correctly", () => { -// const shardInfo = { clusterId: 1, shards: [] }; -// expect(() => ensureShardingConfigured(shardInfo)).to.throw(); -// }); -// -// it("handles empty contentTopics array correctly", () => { -// const shardInfo = { contentTopics: [] }; -// expect(() => ensureShardingConfigured(shardInfo)).to.throw(); -// }); -// }); -// -// describe("contentTopicToPubsubTopic", () => { -// it("should correctly map a content topic to a pubsub topic", () => { -// const contentTopic = "/app/v1/topic1/proto"; -// expect(contentTopicToPubsubTopic(contentTopic)).to.equal("/waku/2/rs/1/4"); -// }); -// -// it("should map different content topics to different pubsub topics based on shard index", () => { -// const contentTopic1 = "/app/v1/topic1/proto"; -// const contentTopic2 = "/app/v2/topic2/proto"; -// const pubsubTopic1 = contentTopicToPubsubTopic(contentTopic1); -// const pubsubTopic2 = contentTopicToPubsubTopic(contentTopic2); -// expect(pubsubTopic1).not.to.equal(pubsubTopic2); -// }); -// -// it("should use the provided clusterId for the pubsub topic", () => { -// const contentTopic = "/app/v1/topic1/proto"; -// const clusterId = 2; -// expect(contentTopicToPubsubTopic(contentTopic, clusterId)).to.equal( -// "/waku/2/rs/2/4" -// ); -// }); -// -// it("should correctly map a content topic to a pubsub topic for different network shard sizes", () => { -// const contentTopic = "/app/v1/topic1/proto"; -// const networkShards = 16; -// expect(contentTopicToPubsubTopic(contentTopic, 1, networkShards)).to.equal( -// "/waku/2/rs/1/4" -// ); -// }); -// }); diff --git a/packages/utils/src/common/sharding/topics.ts b/packages/utils/src/common/sharding/topics.ts new file mode 100644 index 0000000000..fb377315e7 --- /dev/null +++ b/packages/utils/src/common/sharding/topics.ts @@ -0,0 +1,162 @@ +import { sha256 } from "@noble/hashes/sha256"; +import { + type ClusterId, + ContentTopic, + PubsubTopic, + type ShardId +} from "@waku/interfaces"; + +import { concat, utf8ToBytes } from "../../bytes/index.js"; + +export const formatPubsubTopic = ( + clusterId: ClusterId, + shard: ShardId +): PubsubTopic => { + return `/waku/2/rs/${clusterId}/${shard}`; +}; + +/** + * @deprecated will be removed + */ +export const pubsubTopicToSingleShardInfo = ( + pubsubTopics: PubsubTopic +): { clusterId: ClusterId; shard: ShardId } => { + const parts = pubsubTopics.split("/"); + + if ( + parts.length != 6 || + parts[1] !== "waku" || + parts[2] !== "2" || + parts[3] !== "rs" + ) + throw new Error("Invalid pubsub topic"); + + const clusterId = parseInt(parts[4]); + const shard = parseInt(parts[5]); + + if (isNaN(clusterId) || isNaN(shard)) + throw new Error("Invalid clusterId or shard"); + + return { + clusterId, + shard + }; +}; + +interface ParsedContentTopic { + generation: number; + application: string; + version: string; + topicName: string; + encoding: string; +} + +/** + * Given a string, will throw an error if it is not formatted as a valid content topic for autosharding based on https://rfc.vac.dev/spec/51/ + * @param contentTopic String to validate + * @returns Object with each content topic field as an attribute + */ +export function ensureValidContentTopic( + contentTopic: ContentTopic +): ParsedContentTopic { + const parts = (contentTopic as string).split("/"); + if (parts.length < 5 || parts.length > 6) { + throw Error(`Content topic format is invalid: ${contentTopic}`); + } + // Validate generation field if present + let generation = 0; + if (parts.length == 6) { + generation = parseInt(parts[1]); + if (isNaN(generation)) { + throw new Error( + `Invalid generation field in content topic: ${contentTopic}` + ); + } + if (generation > 0) { + throw new Error( + `Generation greater than 0 is not supported: ${contentTopic}` + ); + } + } + // Validate remaining fields + const fields = parts.splice(-4); + // Validate application field + if (fields[0].length == 0) { + throw new Error(`Application field cannot be empty: ${contentTopic}`); + } + // Validate version field + if (fields[1].length == 0) { + throw new Error(`Version field cannot be empty: ${contentTopic}`); + } + // Validate topic name field + if (fields[2].length == 0) { + throw new Error(`Topic name field cannot be empty: ${contentTopic}`); + } + // Validate encoding field + if (fields[3].length == 0) { + throw new Error(`Encoding field cannot be empty: ${contentTopic}`); + } + + return { + generation, + application: fields[0], + version: fields[1], + topicName: fields[2], + encoding: fields[3] + }; +} + +/** + * Given a string, determines which autoshard index to use for its pubsub topic. + * Based on the algorithm described in the RFC: https://rfc.vac.dev/spec/51//#algorithm + */ +export function contentTopicToShardIndex( + contentTopic: ContentTopic, + numShardsInCluster: number +): number { + const { application, version } = ensureValidContentTopic(contentTopic); + const digest = sha256( + concat([utf8ToBytes(application), utf8ToBytes(version)]) + ); + const dataview = new DataView(digest.buffer.slice(-8)); + return Number(dataview.getBigUint64(0, false) % BigInt(numShardsInCluster)); +} + +export function contentTopicToPubsubTopic( + contentTopic: ContentTopic, + clusterId: number, + numShardsInCluster: number +): string { + if (!contentTopic) { + throw Error("Content topic must be specified"); + } + + const shardIndex = contentTopicToShardIndex(contentTopic, numShardsInCluster); + return `/waku/2/rs/${clusterId}/${shardIndex}`; +} + +/** + * Given an array of content topics, groups them together by their Pubsub topic as derived using the algorithm for autosharding. + * If any of the content topics are not properly formatted, the function will throw an error. + */ +export function contentTopicsByPubsubTopic( + contentTopics: ContentTopic[], + clusterId: number, + networkShards: number +): Map> { + const groupedContentTopics = new Map(); + for (const contentTopic of contentTopics) { + const pubsubTopic = contentTopicToPubsubTopic( + contentTopic, + clusterId, + networkShards + ); + let topics = groupedContentTopics.get(pubsubTopic); + if (!topics) { + groupedContentTopics.set(pubsubTopic, []); + topics = groupedContentTopics.get(pubsubTopic); + } + topics.push(contentTopic); + } + return groupedContentTopics; +}