240 lines
6.8 KiB
TypeScript
Raw Normal View History

import { sha256 } from "@noble/hashes/sha256";
import {
DefaultPubsubTopic,
PubsubTopic,
ShardInfo,
ShardingParams,
SingleShardInfo
} from "@waku/interfaces";
import { concat, utf8ToBytes } from "../bytes/index.js";
export const singleShardInfoToPubsubTopic = (
shardInfo: SingleShardInfo
): PubsubTopic => {
if (shardInfo.clusterId === undefined || shardInfo.shard === undefined)
throw new Error("Invalid shard");
return `/waku/2/rs/${shardInfo.clusterId}/${shardInfo.shard}`;
};
export const singleShardInfosToShardInfo = (
singleShardInfos: SingleShardInfo[]
): ShardInfo => {
if (singleShardInfos.length === 0) throw new Error("Invalid shard");
const clusterIds = singleShardInfos.map((shardInfo) => shardInfo.clusterId);
if (new Set(clusterIds).size !== 1) {
throw new Error("Passed shard infos have different clusterIds");
}
const shards = singleShardInfos
.map((shardInfo) => shardInfo.shard)
.filter((shard): shard is number => shard !== undefined);
return {
clusterId: singleShardInfos[0].clusterId,
shards
};
};
export const shardInfoToPubsubTopics = (
shardInfo: ShardingParams
): PubsubTopic[] => {
if (shardInfo.clusterId === undefined)
throw new Error("Cluster ID must be specified");
if ("contentTopics" in shardInfo) {
// Autosharding: explicitly defined content topics
return Array.from(
new Set(
shardInfo.contentTopics.map((contentTopic) =>
contentTopicToPubsubTopic(contentTopic, shardInfo.clusterId)
)
)
);
} else if ("shards" in shardInfo) {
// Static sharding
if (shardInfo.shards === undefined) throw new Error("Invalid shard");
return Array.from(
new Set(
shardInfo.shards.map(
(index) => `/waku/2/rs/${shardInfo.clusterId}/${index}`
)
)
);
} else {
// Autosharding: single shard from application and version
return [
contentTopicToPubsubTopic(
`/${shardInfo.application}/${shardInfo.version}/default/default`
)
];
}
};
feat(static-sharding)!: allow multiple pubSubTopics (#1586) * `ProtocolCreateOptions` now has `pubSubTopic` as `pubSubTopic[]` * chore: update encoder & decoder to support `PubSubTopic` * feat(protocols): allow multiple `PubSubTopic[]` * feat(relay): allow multiple `PubSubTopic[]` * chore(tests): update for new API * chore: minor fixes * chore: make store more robust * fix(relay): correctly set types * chore(address comments): update terminology around configured pubsub topics * chore(address comments): minor refactoring * chore(relay): split `subscribe` into smaller functions for readability & modularity * chore(address comments): refactor `waitForGossipSubPeerInMesh` * chore(store): only allow to query one `pubSubTopic` * fix: `store` bug * feat(tests): add some basic tests * sharding utils * address comments * feat(relay): re-add API for `getMeshPeers` * update error message Co-authored-by: fryorcraken <110212804+fryorcraken@users.noreply.github.com> * refactor for new API * feat: simplify handling of observers (#1614) * refactor: simplify handling of observers * refactor: Remove redundant PubSubTopic from Observer * use `??` instead of `||` * update `pubsubTopic` to `pubSubTopic` * update `interval` typo * change occurence of `pubsubTopic` to `pubSubTopic` * relay: rm `getAllMeshPeers` and make `pubSubTopics` public * relay: use `push_or_init_map` and move to `utils` * fix: update API for tests * fix: relay waitForRemotePeer --------- Co-authored-by: fryorcraken <110212804+fryorcraken@users.noreply.github.com>
2023-09-27 15:28:07 +05:30
export const pubsubTopicToSingleShardInfo = (
pubsubTopics: PubsubTopic
): SingleShardInfo => {
const parts = pubsubTopics.split("/");
if (
parts.length != 6 ||
parts[1] !== "waku" ||
parts[2] !== "2" ||
parts[3] !== "rs"
)
throw new Error("Invalid pubsub topic");
const clusterId = parseInt(parts[4]);
const shard = parseInt(parts[5]);
if (isNaN(clusterId) || isNaN(shard))
throw new Error("Invalid clusterId or shard");
return {
clusterId,
shard
};
};
feat(static-sharding)!: allow multiple pubSubTopics (#1586) * `ProtocolCreateOptions` now has `pubSubTopic` as `pubSubTopic[]` * chore: update encoder & decoder to support `PubSubTopic` * feat(protocols): allow multiple `PubSubTopic[]` * feat(relay): allow multiple `PubSubTopic[]` * chore(tests): update for new API * chore: minor fixes * chore: make store more robust * fix(relay): correctly set types * chore(address comments): update terminology around configured pubsub topics * chore(address comments): minor refactoring * chore(relay): split `subscribe` into smaller functions for readability & modularity * chore(address comments): refactor `waitForGossipSubPeerInMesh` * chore(store): only allow to query one `pubSubTopic` * fix: `store` bug * feat(tests): add some basic tests * sharding utils * address comments * feat(relay): re-add API for `getMeshPeers` * update error message Co-authored-by: fryorcraken <110212804+fryorcraken@users.noreply.github.com> * refactor for new API * feat: simplify handling of observers (#1614) * refactor: simplify handling of observers * refactor: Remove redundant PubSubTopic from Observer * use `??` instead of `||` * update `pubsubTopic` to `pubSubTopic` * update `interval` typo * change occurence of `pubsubTopic` to `pubSubTopic` * relay: rm `getAllMeshPeers` and make `pubSubTopics` public * relay: use `push_or_init_map` and move to `utils` * fix: update API for tests * fix: relay waitForRemotePeer --------- Co-authored-by: fryorcraken <110212804+fryorcraken@users.noreply.github.com>
2023-09-27 15:28:07 +05:30
export function ensurePubsubTopicIsConfigured(
pubsubTopic: PubsubTopic,
configuredTopics: PubsubTopic[]
feat(static-sharding)!: allow multiple pubSubTopics (#1586) * `ProtocolCreateOptions` now has `pubSubTopic` as `pubSubTopic[]` * chore: update encoder & decoder to support `PubSubTopic` * feat(protocols): allow multiple `PubSubTopic[]` * feat(relay): allow multiple `PubSubTopic[]` * chore(tests): update for new API * chore: minor fixes * chore: make store more robust * fix(relay): correctly set types * chore(address comments): update terminology around configured pubsub topics * chore(address comments): minor refactoring * chore(relay): split `subscribe` into smaller functions for readability & modularity * chore(address comments): refactor `waitForGossipSubPeerInMesh` * chore(store): only allow to query one `pubSubTopic` * fix: `store` bug * feat(tests): add some basic tests * sharding utils * address comments * feat(relay): re-add API for `getMeshPeers` * update error message Co-authored-by: fryorcraken <110212804+fryorcraken@users.noreply.github.com> * refactor for new API * feat: simplify handling of observers (#1614) * refactor: simplify handling of observers * refactor: Remove redundant PubSubTopic from Observer * use `??` instead of `||` * update `pubsubTopic` to `pubSubTopic` * update `interval` typo * change occurence of `pubsubTopic` to `pubSubTopic` * relay: rm `getAllMeshPeers` and make `pubSubTopics` public * relay: use `push_or_init_map` and move to `utils` * fix: update API for tests * fix: relay waitForRemotePeer --------- Co-authored-by: fryorcraken <110212804+fryorcraken@users.noreply.github.com>
2023-09-27 15:28:07 +05:30
): void {
if (!configuredTopics.includes(pubsubTopic)) {
throw new Error(
`Pubsub topic ${pubsubTopic} has not been configured on this instance. Configured topics are: ${configuredTopics}. Please update your configuration by passing in the topic during Waku node instantiation.`
feat(static-sharding)!: allow multiple pubSubTopics (#1586) * `ProtocolCreateOptions` now has `pubSubTopic` as `pubSubTopic[]` * chore: update encoder & decoder to support `PubSubTopic` * feat(protocols): allow multiple `PubSubTopic[]` * feat(relay): allow multiple `PubSubTopic[]` * chore(tests): update for new API * chore: minor fixes * chore: make store more robust * fix(relay): correctly set types * chore(address comments): update terminology around configured pubsub topics * chore(address comments): minor refactoring * chore(relay): split `subscribe` into smaller functions for readability & modularity * chore(address comments): refactor `waitForGossipSubPeerInMesh` * chore(store): only allow to query one `pubSubTopic` * fix: `store` bug * feat(tests): add some basic tests * sharding utils * address comments * feat(relay): re-add API for `getMeshPeers` * update error message Co-authored-by: fryorcraken <110212804+fryorcraken@users.noreply.github.com> * refactor for new API * feat: simplify handling of observers (#1614) * refactor: simplify handling of observers * refactor: Remove redundant PubSubTopic from Observer * use `??` instead of `||` * update `pubsubTopic` to `pubSubTopic` * update `interval` typo * change occurence of `pubsubTopic` to `pubSubTopic` * relay: rm `getAllMeshPeers` and make `pubSubTopics` public * relay: use `push_or_init_map` and move to `utils` * fix: update API for tests * fix: relay waitForRemotePeer --------- Co-authored-by: fryorcraken <110212804+fryorcraken@users.noreply.github.com>
2023-09-27 15:28:07 +05:30
);
}
}
interface ContentTopic {
generation: number;
application: string;
version: string;
topicName: string;
encoding: string;
}
/**
* Given a string, will throw an error if it is not formatted as a valid content topic for autosharding based on https://rfc.vac.dev/spec/51/
* @param contentTopic String to validate
* @returns Object with each content topic field as an attribute
*/
export function ensureValidContentTopic(contentTopic: string): ContentTopic {
const parts = contentTopic.split("/");
if (parts.length < 5 || parts.length > 6) {
throw Error("Content topic format is invalid");
}
// Validate generation field if present
let generation = 0;
if (parts.length == 6) {
generation = parseInt(parts[1]);
if (isNaN(generation)) {
throw new Error("Invalid generation field in content topic");
}
if (generation > 0) {
throw new Error("Generation greater than 0 is not supported");
}
}
// Validate remaining fields
const fields = parts.splice(-4);
// Validate application field
if (fields[0].length == 0) {
throw new Error("Application field cannot be empty");
}
// Validate version field
if (fields[1].length == 0) {
throw new Error("Version field cannot be empty");
}
// Validate topic name field
if (fields[2].length == 0) {
throw new Error("Topic name field cannot be empty");
}
// Validate encoding field
if (fields[3].length == 0) {
throw new Error("Encoding field cannot be empty");
}
return {
generation,
application: fields[0],
version: fields[1],
topicName: fields[2],
encoding: fields[3]
};
}
/**
* Given a string, determines which autoshard index to use for its pubsub topic.
* Based on the algorithm described in the RFC: https://rfc.vac.dev/spec/51//#algorithm
*/
export function contentTopicToShardIndex(
contentTopic: string,
networkShards: number = 8
): number {
const { application, version } = ensureValidContentTopic(contentTopic);
const digest = sha256(
concat([utf8ToBytes(application), utf8ToBytes(version)])
);
const dataview = new DataView(digest.buffer.slice(-8));
return Number(dataview.getBigUint64(0, false) % BigInt(networkShards));
}
export function contentTopicToPubsubTopic(
contentTopic: string,
clusterId: number = 1,
networkShards: number = 8
): string {
const shardIndex = contentTopicToShardIndex(contentTopic, networkShards);
return `/waku/2/rs/${clusterId}/${shardIndex}`;
}
/**
* Given an array of content topics, groups them together by their Pubsub topic as derived using the algorithm for autosharding.
* If any of the content topics are not properly formatted, the function will throw an error.
*/
export function contentTopicsByPubsubTopic(
contentTopics: string[],
clusterId: number = 1,
networkShards: number = 8
): Map<string, Array<string>> {
const groupedContentTopics = new Map();
for (const contentTopic of contentTopics) {
const pubsubTopic = contentTopicToPubsubTopic(
contentTopic,
clusterId,
networkShards
);
let topics = groupedContentTopics.get(pubsubTopic);
if (!topics) {
groupedContentTopics.set(pubsubTopic, []);
topics = groupedContentTopics.get(pubsubTopic);
}
topics.push(contentTopic);
}
return groupedContentTopics;
}
/**
* Used when creating encoders/decoders to determine which pubsub topic to use
*/
export function determinePubsubTopic(
contentTopic: string,
pubsubTopicShardInfo: SingleShardInfo | PubsubTopic = DefaultPubsubTopic
): string {
if (typeof pubsubTopicShardInfo == "string") {
return pubsubTopicShardInfo;
} else {
return pubsubTopicShardInfo
? pubsubTopicShardInfo.shard
? singleShardInfoToPubsubTopic(pubsubTopicShardInfo)
: contentTopicToPubsubTopic(
contentTopic,
pubsubTopicShardInfo.clusterId
)
: DefaultPubsubTopic;
}
}