mirror of
https://github.com/logos-messaging/js-waku.git
synced 2026-01-05 23:33:08 +00:00
feat: consolidate router info and sharding utils
This commit is contained in:
parent
0be4861c79
commit
01cd00221f
@ -6,7 +6,7 @@ import {
|
||||
PubsubTopic,
|
||||
ShardInfo
|
||||
} from "@waku/interfaces";
|
||||
import { contentTopicToShardIndex, encodeRelayShard } from "@waku/utils";
|
||||
import { AutoShardingRoutingInfo, encodeRelayShard } from "@waku/utils";
|
||||
import { expect } from "chai";
|
||||
import { Libp2p } from "libp2p";
|
||||
import sinon from "sinon";
|
||||
@ -29,16 +29,17 @@ describe("ShardReader", function () {
|
||||
|
||||
const testContentTopic = "/test/1/waku-light-push/utf8";
|
||||
const testClusterId = 3;
|
||||
const testShardIndex = contentTopicToShardIndex(
|
||||
testContentTopic,
|
||||
DEFAULT_NUM_SHARDS
|
||||
);
|
||||
|
||||
const testNetworkConfig: AutoSharding = {
|
||||
clusterId: testClusterId,
|
||||
numShardsInCluster: DEFAULT_NUM_SHARDS
|
||||
};
|
||||
|
||||
const testShardIndex = AutoShardingRoutingInfo.fromContentTopic(
|
||||
testContentTopic,
|
||||
testNetworkConfig
|
||||
).shardId;
|
||||
|
||||
const testShardInfo: ShardInfo = {
|
||||
clusterId: testClusterId,
|
||||
shards: [testShardIndex]
|
||||
|
||||
@ -6,11 +6,7 @@ import type {
|
||||
ShardId,
|
||||
ShardInfo
|
||||
} from "@waku/interfaces";
|
||||
import {
|
||||
decodeRelayShard,
|
||||
Logger,
|
||||
pubsubTopicToSingleShardInfo
|
||||
} from "@waku/utils";
|
||||
import { decodeRelayShard, Logger } from "@waku/utils";
|
||||
import { Libp2p } from "libp2p";
|
||||
|
||||
const log = new Logger("shard-reader");
|
||||
@ -65,7 +61,7 @@ export class ShardReader implements IShardReader {
|
||||
pubsubTopic: PubsubTopic
|
||||
): Promise<boolean> {
|
||||
try {
|
||||
const { clusterId, shard } = pubsubTopicToSingleShardInfo(pubsubTopic);
|
||||
const { clusterId, shard } = this.parsePubsubTopic(pubsubTopic);
|
||||
if (clusterId !== this.clusterId) return false;
|
||||
return await this.isPeerOnShard(id, shard);
|
||||
} catch (error) {
|
||||
@ -93,6 +89,34 @@ export class ShardReader implements IShardReader {
|
||||
);
|
||||
}
|
||||
|
||||
private parsePubsubTopic(pubsubTopic: PubsubTopic): {
|
||||
clusterId: ClusterId;
|
||||
shard: ShardId;
|
||||
} {
|
||||
const parts = pubsubTopic.split("/");
|
||||
|
||||
if (
|
||||
parts.length !== 6 ||
|
||||
parts[1] !== "waku" ||
|
||||
parts[2] !== "2" ||
|
||||
parts[3] !== "rs"
|
||||
) {
|
||||
throw new Error("Invalid pubsub topic");
|
||||
}
|
||||
|
||||
const clusterId = parseInt(parts[4], 10);
|
||||
const shard = parseInt(parts[5], 10);
|
||||
|
||||
if (isNaN(clusterId) || isNaN(shard)) {
|
||||
throw new Error("Invalid clusterId or shard");
|
||||
}
|
||||
|
||||
return {
|
||||
clusterId,
|
||||
shard
|
||||
};
|
||||
}
|
||||
|
||||
private async getRelayShards(id: PeerId): Promise<ShardInfo | undefined> {
|
||||
try {
|
||||
const peer = await this.libp2p.peerStore.get(id);
|
||||
|
||||
@ -18,3 +18,4 @@ export * from "./constants.js";
|
||||
export * from "./local_storage.js";
|
||||
export * from "./sharding.js";
|
||||
export * from "./health_status.js";
|
||||
export type { ShardInfo } from "./sharding.js";
|
||||
|
||||
@ -1,10 +1,6 @@
|
||||
import { LightNode, Protocols } from "@waku/interfaces";
|
||||
import { createDecoder, createLightNode, utf8ToBytes } from "@waku/sdk";
|
||||
import {
|
||||
contentTopicToPubsubTopic,
|
||||
createRoutingInfo,
|
||||
delay
|
||||
} from "@waku/utils";
|
||||
import { createRoutingInfo, delay } from "@waku/utils";
|
||||
import { expect } from "chai";
|
||||
|
||||
import {
|
||||
@ -64,13 +60,7 @@ describe("High Throughput Messaging", function () {
|
||||
|
||||
await delay(1000);
|
||||
|
||||
await nwaku.ensureSubscriptions([
|
||||
contentTopicToPubsubTopic(
|
||||
ContentTopic,
|
||||
NetworkConfig.clusterId,
|
||||
NetworkConfig.numShardsInCluster
|
||||
)
|
||||
]);
|
||||
await nwaku.ensureSubscriptions([RoutingInfo.pubsubTopic]);
|
||||
|
||||
waku = await createLightNode({ networkConfig: NetworkConfig });
|
||||
await waku.start();
|
||||
|
||||
@ -1,10 +1,6 @@
|
||||
import { LightNode, Protocols } from "@waku/interfaces";
|
||||
import { createDecoder, createLightNode, utf8ToBytes } from "@waku/sdk";
|
||||
import {
|
||||
contentTopicToPubsubTopic,
|
||||
createRoutingInfo,
|
||||
delay
|
||||
} from "@waku/utils";
|
||||
import { AutoShardingRoutingInfo, createRoutingInfo, delay } from "@waku/utils";
|
||||
import { expect } from "chai";
|
||||
|
||||
import {
|
||||
@ -62,11 +58,8 @@ describe("Longevity", function () {
|
||||
);
|
||||
|
||||
await nwaku.ensureSubscriptions([
|
||||
contentTopicToPubsubTopic(
|
||||
ContentTopic,
|
||||
networkConfig.clusterId,
|
||||
networkConfig.numShardsInCluster
|
||||
)
|
||||
AutoShardingRoutingInfo.fromContentTopic(ContentTopic, networkConfig)
|
||||
.pubsubTopic
|
||||
]);
|
||||
|
||||
waku = await createLightNode({ networkConfig });
|
||||
|
||||
@ -1,10 +1,6 @@
|
||||
import { LightNode, Protocols } from "@waku/interfaces";
|
||||
import { createDecoder, createLightNode, utf8ToBytes } from "@waku/sdk";
|
||||
import {
|
||||
contentTopicToPubsubTopic,
|
||||
createRoutingInfo,
|
||||
delay
|
||||
} from "@waku/utils";
|
||||
import { AutoShardingRoutingInfo, createRoutingInfo, delay } from "@waku/utils";
|
||||
import { expect } from "chai";
|
||||
|
||||
import {
|
||||
@ -68,11 +64,8 @@ describe("Throughput Sanity Checks - Different Message Sizes", function () {
|
||||
await delay(1000);
|
||||
|
||||
await nwaku.ensureSubscriptions([
|
||||
contentTopicToPubsubTopic(
|
||||
ContentTopic,
|
||||
networkConfig.clusterId,
|
||||
networkConfig.numShardsInCluster
|
||||
)
|
||||
AutoShardingRoutingInfo.fromContentTopic(ContentTopic, networkConfig)
|
||||
.pubsubTopic
|
||||
]);
|
||||
|
||||
waku = await createLightNode({ networkConfig });
|
||||
|
||||
@ -3,11 +3,11 @@ import { peerIdFromString } from "@libp2p/peer-id";
|
||||
import { Multiaddr, multiaddr } from "@multiformats/multiaddr";
|
||||
import { ContentTopic, PubsubTopic } from "@waku/interfaces";
|
||||
import {
|
||||
formatPubsubTopic,
|
||||
isAutoSharding,
|
||||
isDefined,
|
||||
isStaticSharding,
|
||||
RoutingInfo
|
||||
RoutingInfo,
|
||||
StaticShardingRoutingInfo
|
||||
} from "@waku/utils";
|
||||
import { Logger } from "@waku/utils";
|
||||
import pRetry from "p-retry";
|
||||
@ -279,10 +279,10 @@ export class ServiceNode {
|
||||
if (this.args?.shard) {
|
||||
if (this.args?.shard.length > 1)
|
||||
throw "More that one shard passed, not supported";
|
||||
const pubsubTopic = formatPubsubTopic(
|
||||
this.args.clusterId ?? DefaultTestNetworkConfig.clusterId,
|
||||
this.args?.shard[0]
|
||||
);
|
||||
const pubsubTopic = StaticShardingRoutingInfo.fromShard(
|
||||
this.args?.shard[0],
|
||||
{ clusterId: this.args.clusterId ?? DefaultTestNetworkConfig.clusterId }
|
||||
).pubsubTopic;
|
||||
return this.pubsubTopicMessages(pubsubTopic);
|
||||
}
|
||||
|
||||
|
||||
@ -6,10 +6,10 @@ import {
|
||||
} from "@waku/interfaces";
|
||||
import { createLightNode } from "@waku/sdk";
|
||||
import {
|
||||
contentTopicToPubsubTopic,
|
||||
formatPubsubTopic,
|
||||
AutoShardingRoutingInfo,
|
||||
isAutoShardingRoutingInfo,
|
||||
RoutingInfo
|
||||
RoutingInfo,
|
||||
StaticShardingRoutingInfo
|
||||
} from "@waku/utils";
|
||||
import { Context } from "mocha";
|
||||
import pRetry from "p-retry";
|
||||
@ -75,7 +75,11 @@ export async function runMultipleNodes(
|
||||
if (customArgs?.shard) {
|
||||
const shards = customArgs?.shard ?? [];
|
||||
for (const s of shards) {
|
||||
pubsubTopics.push(formatPubsubTopic(routingInfo.clusterId, s));
|
||||
pubsubTopics.push(
|
||||
StaticShardingRoutingInfo.fromShard(s, {
|
||||
clusterId: routingInfo.clusterId
|
||||
}).pubsubTopic
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@ -83,11 +87,8 @@ export async function runMultipleNodes(
|
||||
const contentTopics = customArgs?.contentTopic ?? [];
|
||||
for (const ct of contentTopics) {
|
||||
pubsubTopics.push(
|
||||
contentTopicToPubsubTopic(
|
||||
ct,
|
||||
routingInfo.clusterId,
|
||||
routingInfo.networkConfig.numShardsInCluster
|
||||
)
|
||||
AutoShardingRoutingInfo.fromContentTopic(ct, routingInfo.networkConfig)
|
||||
.pubsubTopic
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
import { createDecoder, createEncoder } from "@waku/core";
|
||||
import { LightNode } from "@waku/interfaces";
|
||||
import { Protocols, utf8ToBytes } from "@waku/sdk";
|
||||
import { createRoutingInfo, formatPubsubTopic } from "@waku/utils";
|
||||
import { createRoutingInfo, StaticShardingRoutingInfo } from "@waku/utils";
|
||||
|
||||
import {
|
||||
afterEachCustom,
|
||||
@ -78,7 +78,9 @@ const runTests = (strictCheckNodes: boolean): void => {
|
||||
await waku.waitForPeers([Protocols.Filter, Protocols.LightPush]);
|
||||
|
||||
await nwaku2.ensureSubscriptions([
|
||||
formatPubsubTopic(TestClusterId, shardId)
|
||||
StaticShardingRoutingInfo.fromShard(shardId, {
|
||||
clusterId: TestClusterId
|
||||
}).pubsubTopic
|
||||
]);
|
||||
|
||||
const messageCollector2 = new MessageCollector();
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
import { createDecoder, createEncoder } from "@waku/core";
|
||||
import {
|
||||
contentTopicToShardIndex,
|
||||
AutoShardingRoutingInfo,
|
||||
createRoutingInfo,
|
||||
Logger
|
||||
} from "@waku/utils";
|
||||
@ -11,14 +11,14 @@ export const log = new Logger("test:filter");
|
||||
export const TestContentTopic = "/test/1/waku-filter/default";
|
||||
export const TestClusterId = 2;
|
||||
export const TestNumShardsInCluster = 8;
|
||||
export const TestShardIndex = contentTopicToShardIndex(
|
||||
TestContentTopic,
|
||||
TestNumShardsInCluster
|
||||
);
|
||||
export const TestNetworkConfig = {
|
||||
clusterId: TestClusterId,
|
||||
numShardsInCluster: TestNumShardsInCluster
|
||||
};
|
||||
export const TestShardIndex = AutoShardingRoutingInfo.fromContentTopic(
|
||||
TestContentTopic,
|
||||
TestNetworkConfig
|
||||
).shardId;
|
||||
export const TestRoutingInfo = createRoutingInfo(TestNetworkConfig, {
|
||||
contentTopic: TestContentTopic
|
||||
});
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
import { AutoSharding, LightNode } from "@waku/interfaces";
|
||||
import { createEncoder, utf8ToBytes } from "@waku/sdk";
|
||||
import { contentTopicToPubsubTopic, createRoutingInfo } from "@waku/utils";
|
||||
import { AutoShardingRoutingInfo, createRoutingInfo } from "@waku/utils";
|
||||
import { expect } from "chai";
|
||||
|
||||
import {
|
||||
@ -136,7 +136,10 @@ describe("Autosharding: Running Nodes", function () {
|
||||
it("Wrong topic", async function () {
|
||||
const wrongTopic = "wrong_format";
|
||||
try {
|
||||
contentTopicToPubsubTopic(wrongTopic, clusterId, 8);
|
||||
AutoShardingRoutingInfo.fromContentTopic(wrongTopic, {
|
||||
clusterId,
|
||||
numShardsInCluster: 8
|
||||
});
|
||||
throw new Error("Wrong topic should've thrown an error");
|
||||
} catch (err) {
|
||||
if (
|
||||
|
||||
@ -3,7 +3,7 @@ import type { PeerId } from "@libp2p/interface";
|
||||
import { wakuPeerExchangeDiscovery } from "@waku/discovery";
|
||||
import type { AutoSharding, StaticSharding } from "@waku/interfaces";
|
||||
import { createLightNode, LightNode, Tags } from "@waku/sdk";
|
||||
import { contentTopicToShardIndex } from "@waku/utils";
|
||||
import { AutoShardingRoutingInfo } from "@waku/utils";
|
||||
import chai, { expect } from "chai";
|
||||
import chaiAsPromised from "chai-as-promised";
|
||||
import Sinon, { SinonSpy } from "sinon";
|
||||
@ -201,7 +201,14 @@ describe("Autosharding: Peer Management", function () {
|
||||
const ContentTopic = "/myapp/1/latest/proto";
|
||||
const clusterId = 8;
|
||||
const numShardsInCluster = 8;
|
||||
const Shard = [contentTopicToShardIndex(ContentTopic, numShardsInCluster)];
|
||||
const networkConfig: AutoSharding = {
|
||||
clusterId,
|
||||
numShardsInCluster
|
||||
};
|
||||
const Shard = [
|
||||
AutoShardingRoutingInfo.fromContentTopic(ContentTopic, networkConfig)
|
||||
.shardId
|
||||
];
|
||||
|
||||
describe("Peer Exchange", function () {
|
||||
let waku: LightNode;
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
import { IMessage, type LightNode } from "@waku/interfaces";
|
||||
import { formatPubsubTopic } from "@waku/utils";
|
||||
import { StaticShardingRoutingInfo } from "@waku/utils";
|
||||
import { expect } from "chai";
|
||||
|
||||
import {
|
||||
@ -68,7 +68,9 @@ describe("Waku Store, error handling", function () {
|
||||
});
|
||||
|
||||
it("Query Generator, No message returned", async function () {
|
||||
const WrongTestPubsubTopic = formatPubsubTopic(43, 53);
|
||||
const WrongTestPubsubTopic = StaticShardingRoutingInfo.fromShard(53, {
|
||||
clusterId: 43
|
||||
}).pubsubTopic;
|
||||
const messages = await processQueriedMessages(
|
||||
waku,
|
||||
[TestDecoder],
|
||||
|
||||
@ -2,7 +2,7 @@ import type { LightNode, RelayNode } from "@waku/interfaces";
|
||||
import { Protocols } from "@waku/interfaces";
|
||||
import { createRelayNode } from "@waku/relay";
|
||||
import { createLightNode } from "@waku/sdk";
|
||||
import { formatPubsubTopic } from "@waku/utils";
|
||||
import { StaticShardingRoutingInfo } from "@waku/utils";
|
||||
import { expect } from "chai";
|
||||
|
||||
import {
|
||||
@ -261,7 +261,9 @@ describe("Wait for remote peer", function () {
|
||||
[nwaku, waku1] = await runRelayNodes(this, { clusterId: 0 }, [0]);
|
||||
const multiAddrWithId = await nwaku.getMultiaddrWithId();
|
||||
|
||||
const peers = waku1.relay.getMeshPeers(formatPubsubTopic(0, 0));
|
||||
const peers = waku1.relay.getMeshPeers(
|
||||
StaticShardingRoutingInfo.fromShard(0, { clusterId: 0 }).pubsubTopic
|
||||
);
|
||||
|
||||
const nimPeerId = multiAddrWithId.getPeerId();
|
||||
|
||||
|
||||
@ -1,369 +0,0 @@
|
||||
import { DEFAULT_CLUSTER_ID } from "@waku/interfaces";
|
||||
import { expect } from "chai";
|
||||
|
||||
import {
|
||||
contentTopicsByPubsubTopic,
|
||||
contentTopicToPubsubTopic,
|
||||
contentTopicToShardIndex,
|
||||
ensureValidContentTopic,
|
||||
pubsubTopicToSingleShardInfo
|
||||
} from "./index.js";
|
||||
|
||||
const ClusterId = 0;
|
||||
const NumShardsInCluster = 8;
|
||||
|
||||
const testInvalidCases = (
|
||||
contentTopics: string[],
|
||||
expectedError: string
|
||||
): void => {
|
||||
for (const invalidTopic of contentTopics) {
|
||||
expect(() => ensureValidContentTopic(invalidTopic)).to.throw(expectedError);
|
||||
}
|
||||
};
|
||||
|
||||
describe("ensureValidContentTopic", () => {
|
||||
it("does not throw on valid cases", () => {
|
||||
const validTopics = [
|
||||
"/0/myapp/1/mytopic/cbor",
|
||||
"/myapp/1/mytopic/cbor",
|
||||
"/myapp/v1.1/mytopic/cbor"
|
||||
];
|
||||
for (const validTopic of validTopics) {
|
||||
expect(() => ensureValidContentTopic(validTopic)).to.not.throw;
|
||||
}
|
||||
});
|
||||
it("throws on empty content topic", () => {
|
||||
testInvalidCases(["", " ", " "], "Content topic format is invalid");
|
||||
});
|
||||
|
||||
it("throws on content topic with too few or too many fields", () => {
|
||||
testInvalidCases(
|
||||
[
|
||||
"myContentTopic",
|
||||
"myapp1mytopiccbor/",
|
||||
" /myapp/1/mytopic",
|
||||
"/myapp/1/mytopic",
|
||||
"/0/myapp/1/mytopic/cbor/extra"
|
||||
],
|
||||
"Content topic format is invalid"
|
||||
);
|
||||
});
|
||||
|
||||
it("throws on content topic with non-number generation field", () => {
|
||||
testInvalidCases(
|
||||
[
|
||||
"/a/myapp/1/mytopic/cbor",
|
||||
"/ /myapp/1/mytopic/cbor",
|
||||
"/_/myapp/1/mytopic/cbor",
|
||||
"/$/myapp/1/mytopic/cbor"
|
||||
],
|
||||
"Invalid generation field in content topic"
|
||||
);
|
||||
});
|
||||
|
||||
// Note that this test case should be removed once Waku supports other generations
|
||||
it("throws on content topic with generation field greater than 0", () => {
|
||||
testInvalidCases(
|
||||
[
|
||||
"/1/myapp/1/mytopic/cbor",
|
||||
"/2/myapp/1/mytopic/cbor",
|
||||
"/3/myapp/1/mytopic/cbor",
|
||||
"/1000/myapp/1/mytopic/cbor",
|
||||
"/1/toychat/2/huilong/proto",
|
||||
"/1/statusim/1/community/cbor"
|
||||
],
|
||||
"Generation greater than 0 is not supported"
|
||||
);
|
||||
});
|
||||
|
||||
it("throws on content topic with empty application field", () => {
|
||||
testInvalidCases(
|
||||
["/0//1/mytopic/cbor"],
|
||||
"Application field cannot be empty"
|
||||
);
|
||||
});
|
||||
|
||||
it("throws on content topic with empty version field", () => {
|
||||
testInvalidCases(
|
||||
["/0/myapp//mytopic/cbor"],
|
||||
"Version field cannot be empty"
|
||||
);
|
||||
});
|
||||
|
||||
it("throws on content topic with empty topic name field", () => {
|
||||
testInvalidCases(["/0/myapp/1//cbor"], "Topic name field cannot be empty");
|
||||
});
|
||||
|
||||
it("throws on content topic with empty encoding field", () => {
|
||||
testInvalidCases(["/0/myapp/1/mytopic/"], "Encoding field cannot be empty");
|
||||
});
|
||||
});
|
||||
|
||||
describe("contentTopicToShardIndex", () => {
|
||||
const contentTopicsWithExpectedShards: [string, number][] = [
|
||||
["/toychat/2/huilong/proto", 3],
|
||||
["/myapp/1/latest/proto", 0],
|
||||
["/waku/2/content/test.js", 1],
|
||||
["/toychat/2/huilong/proto", 3],
|
||||
["/0/toychat/2/huilong/proto", 3],
|
||||
["/statusim/1/community/cbor", 4],
|
||||
["/0/statusim/1/community/cbor", 4],
|
||||
["/app/22/sometopic/someencoding", 2],
|
||||
["/app/27/sometopic/someencoding", 5],
|
||||
["/app/20/sometopic/someencoding", 7],
|
||||
["/app/29/sometopic/someencoding", 6]
|
||||
];
|
||||
contentTopicsWithExpectedShards.forEach(([topic, expectedShard]) => {
|
||||
it(`should correctly map ${topic} to shard index ${expectedShard}`, () => {
|
||||
expect(contentTopicToShardIndex(topic, NumShardsInCluster)).to.eq(
|
||||
expectedShard
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
const testCases: [number, string, number][] = [
|
||||
[16, "/app/20/sometopic/someencoding", 15],
|
||||
[2, "/app/20/sometopic/someencoding", 1],
|
||||
[1, "/app/20/sometopic/someencoding", 0]
|
||||
];
|
||||
|
||||
testCases.forEach(([networkShards, topic, expectedShard]) => {
|
||||
it(`should correctly map ${topic} to shard index ${expectedShard} with networkShards ${networkShards}`, () => {
|
||||
expect(contentTopicToShardIndex(topic, networkShards)).to.eq(
|
||||
expectedShard
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
it("topics with same application and version share the same shard", () => {
|
||||
const contentTopics: [string, string][] = [
|
||||
["/toychat/2/huilong/proto", "/toychat/2/othertopic/otherencoding"],
|
||||
["/myapp/1/latest/proto", "/myapp/1/new/proto"],
|
||||
["/waku/2/content/test.js", "/waku/2/users/proto"]
|
||||
];
|
||||
for (const [topic1, topic2] of contentTopics) {
|
||||
expect(contentTopicToShardIndex(topic1, NumShardsInCluster)).to.eq(
|
||||
contentTopicToShardIndex(topic2, NumShardsInCluster)
|
||||
);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe("contentTopicsByPubsubTopic", () => {
|
||||
it("groups content topics by expected pubsub topic", () => {
|
||||
const contentTopics = ["/toychat/2/huilong/proto", "/myapp/1/latest/proto"];
|
||||
const grouped = contentTopicsByPubsubTopic(
|
||||
contentTopics,
|
||||
ClusterId,
|
||||
NumShardsInCluster
|
||||
);
|
||||
|
||||
for (const contentTopic of contentTopics) {
|
||||
const pubsubTopic = contentTopicToPubsubTopic(contentTopic, 0, 8);
|
||||
|
||||
expect(grouped.get(pubsubTopic)?.includes(contentTopic)).to.be.true;
|
||||
}
|
||||
});
|
||||
|
||||
it("groups multiple content topics into the same pubsub topic when they share the same shard index", () => {
|
||||
const contentTopics = [
|
||||
"/app/22/sometopic/someencoding",
|
||||
"/app/22/anothertopic/otherencoding"
|
||||
];
|
||||
const grouped = contentTopicsByPubsubTopic(
|
||||
contentTopics,
|
||||
ClusterId,
|
||||
NumShardsInCluster
|
||||
);
|
||||
expect(grouped.size).to.eq(1); // Only one pubsub topic expected
|
||||
const pubsubTopic = contentTopicToPubsubTopic(contentTopics[0], 0, 8);
|
||||
expect(grouped.get(pubsubTopic)?.length).to.eq(2); // Both topics should be grouped under the same pubsub topic
|
||||
});
|
||||
|
||||
it("handles different clusterIds correctly", () => {
|
||||
const contentTopics = ["/app/22/sometopic/someencoding"];
|
||||
const clusterId1 = 3;
|
||||
const clusterId2 = 2;
|
||||
const grouped1 = contentTopicsByPubsubTopic(
|
||||
contentTopics,
|
||||
clusterId1,
|
||||
NumShardsInCluster
|
||||
);
|
||||
const grouped2 = contentTopicsByPubsubTopic(
|
||||
contentTopics,
|
||||
clusterId2,
|
||||
NumShardsInCluster
|
||||
);
|
||||
const pubsubTopic1 = contentTopicToPubsubTopic(
|
||||
contentTopics[0],
|
||||
clusterId1,
|
||||
8
|
||||
);
|
||||
const pubsubTopic2 = contentTopicToPubsubTopic(
|
||||
contentTopics[0],
|
||||
clusterId2,
|
||||
8
|
||||
);
|
||||
expect(pubsubTopic1).not.to.equal(pubsubTopic2);
|
||||
expect(grouped1.has(pubsubTopic1)).to.be.true;
|
||||
expect(grouped1.has(pubsubTopic2)).to.be.false;
|
||||
expect(grouped2.has(pubsubTopic1)).to.be.false;
|
||||
expect(grouped2.has(pubsubTopic2)).to.be.true;
|
||||
});
|
||||
|
||||
it("handles different networkShards values correctly", () => {
|
||||
const contentTopics = ["/app/20/sometopic/someencoding"];
|
||||
const networkShards1 = 8;
|
||||
const networkShards2 = 16;
|
||||
const grouped1 = contentTopicsByPubsubTopic(
|
||||
contentTopics,
|
||||
DEFAULT_CLUSTER_ID,
|
||||
networkShards1
|
||||
);
|
||||
const grouped2 = contentTopicsByPubsubTopic(
|
||||
contentTopics,
|
||||
DEFAULT_CLUSTER_ID,
|
||||
networkShards2
|
||||
);
|
||||
const pubsubTopic1 = contentTopicToPubsubTopic(
|
||||
contentTopics[0],
|
||||
DEFAULT_CLUSTER_ID,
|
||||
networkShards1
|
||||
);
|
||||
const pubsubTopic2 = contentTopicToPubsubTopic(
|
||||
contentTopics[0],
|
||||
DEFAULT_CLUSTER_ID,
|
||||
networkShards2
|
||||
);
|
||||
expect(pubsubTopic1).not.to.equal(pubsubTopic2);
|
||||
expect(grouped1.has(pubsubTopic1)).to.be.true;
|
||||
expect(grouped1.has(pubsubTopic2)).to.be.false;
|
||||
expect(grouped2.has(pubsubTopic1)).to.be.false;
|
||||
expect(grouped2.has(pubsubTopic2)).to.be.true;
|
||||
});
|
||||
|
||||
it("throws an error for improperly formatted content topics", () => {
|
||||
const invalidContentTopics = ["/invalid/format"];
|
||||
expect(() =>
|
||||
contentTopicsByPubsubTopic(
|
||||
invalidContentTopics,
|
||||
ClusterId,
|
||||
NumShardsInCluster
|
||||
)
|
||||
).to.throw();
|
||||
});
|
||||
});
|
||||
|
||||
describe("pubsubTopicToSingleShardInfo with various invalid formats", () => {
|
||||
const invalidTopics = [
|
||||
"/waku/1/rs/1/2", // Invalid Waku version
|
||||
"/waku/2/r/1/2", // Invalid path segment
|
||||
"/incorrect/format", // Completely incorrect format
|
||||
"/waku/2/rs", // Missing both clusterId and shard
|
||||
"/waku/2/rs/1/2/extra" // Extra trailing data
|
||||
];
|
||||
|
||||
it("should extract SingleShardInfo from a valid PubsubTopic", () => {
|
||||
const topic = "/waku/2/rs/2/2";
|
||||
const expectedInfo = { clusterId: 2, shard: 2 };
|
||||
expect(pubsubTopicToSingleShardInfo(topic)).to.deep.equal(expectedInfo);
|
||||
});
|
||||
|
||||
invalidTopics.forEach((topic) => {
|
||||
it(`should throw an error for invalid PubsubTopic format: ${topic}`, () => {
|
||||
expect(() => pubsubTopicToSingleShardInfo(topic)).to.throw(
|
||||
"Invalid pubsub topic"
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
const nonNumericValues = ["x", "y", "$", "!", "\\", "-", "", " "];
|
||||
nonNumericValues.forEach((value) => {
|
||||
it(`should throw an error for non-numeric clusterId: /waku/2/rs/${value}/1`, () => {
|
||||
expect(() =>
|
||||
pubsubTopicToSingleShardInfo(`/waku/2/rs/${value}/1`)
|
||||
).to.throw("Invalid clusterId or shard");
|
||||
});
|
||||
|
||||
it(`should throw an error for non-numeric shard: /waku/2/rs/1/${value}`, () => {
|
||||
expect(() =>
|
||||
pubsubTopicToSingleShardInfo(`/waku/2/rs/1/${value}`)
|
||||
).to.throw("Invalid clusterId or shard");
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
// describe("ensureShardingConfigured", () => {
|
||||
// it("should return valid sharding parameters for static sharding", () => {
|
||||
// const shardInfo = { clusterId: 1, shards: [0, 1] };
|
||||
// const result = ensureShardingConfigured(shardInfo);
|
||||
// expect(result.shardInfo).to.deep.include({
|
||||
// clusterId: 1,
|
||||
// shards: [0, 1]
|
||||
// });
|
||||
// expect(result.shardInfo).to.deep.include({ clusterId: 1, shards: [0, 1] });
|
||||
// expect(result.pubsubTopics).to.have.members([
|
||||
// "/waku/2/rs/1/0",
|
||||
// "/waku/2/rs/1/1"
|
||||
// ]);
|
||||
// });
|
||||
//
|
||||
// it("should return valid sharding parameters for content topics autosharding", () => {
|
||||
// const contentTopicInfo = { contentTopics: ["/app/v1/topic1/proto"] };
|
||||
// const result = ensureShardingConfigured(contentTopicInfo);
|
||||
// const expectedPubsubTopic = contentTopicToPubsubTopic(
|
||||
// "/app/v1/topic1/proto",
|
||||
// DEFAULT_CLUSTER_ID
|
||||
// );
|
||||
// expect(result.shardInfo.shards).to.include(
|
||||
// contentTopicToShardIndex("/app/v1/topic1/proto")
|
||||
// );
|
||||
// expect(result.pubsubTopics).to.include(expectedPubsubTopic);
|
||||
// });
|
||||
//
|
||||
// it("should throw an error for missing sharding configuration", () => {
|
||||
// const shardInfo = {} as any as NetworkConfig;
|
||||
// expect(() => ensureShardingConfigured(shardInfo)).to.throw();
|
||||
// });
|
||||
//
|
||||
// it("handles empty shards array correctly", () => {
|
||||
// const shardInfo = { clusterId: 1, shards: [] };
|
||||
// expect(() => ensureShardingConfigured(shardInfo)).to.throw();
|
||||
// });
|
||||
//
|
||||
// it("handles empty contentTopics array correctly", () => {
|
||||
// const shardInfo = { contentTopics: [] };
|
||||
// expect(() => ensureShardingConfigured(shardInfo)).to.throw();
|
||||
// });
|
||||
// });
|
||||
//
|
||||
// describe("contentTopicToPubsubTopic", () => {
|
||||
// it("should correctly map a content topic to a pubsub topic", () => {
|
||||
// const contentTopic = "/app/v1/topic1/proto";
|
||||
// expect(contentTopicToPubsubTopic(contentTopic)).to.equal("/waku/2/rs/1/4");
|
||||
// });
|
||||
//
|
||||
// it("should map different content topics to different pubsub topics based on shard index", () => {
|
||||
// const contentTopic1 = "/app/v1/topic1/proto";
|
||||
// const contentTopic2 = "/app/v2/topic2/proto";
|
||||
// const pubsubTopic1 = contentTopicToPubsubTopic(contentTopic1);
|
||||
// const pubsubTopic2 = contentTopicToPubsubTopic(contentTopic2);
|
||||
// expect(pubsubTopic1).not.to.equal(pubsubTopic2);
|
||||
// });
|
||||
//
|
||||
// it("should use the provided clusterId for the pubsub topic", () => {
|
||||
// const contentTopic = "/app/v1/topic1/proto";
|
||||
// const clusterId = 2;
|
||||
// expect(contentTopicToPubsubTopic(contentTopic, clusterId)).to.equal(
|
||||
// "/waku/2/rs/2/4"
|
||||
// );
|
||||
// });
|
||||
//
|
||||
// it("should correctly map a content topic to a pubsub topic for different network shard sizes", () => {
|
||||
// const contentTopic = "/app/v1/topic1/proto";
|
||||
// const networkShards = 16;
|
||||
// expect(contentTopicToPubsubTopic(contentTopic, 1, networkShards)).to.equal(
|
||||
// "/waku/2/rs/1/4"
|
||||
// );
|
||||
// });
|
||||
// });
|
||||
@ -1,165 +1,2 @@
|
||||
import { sha256 } from "@noble/hashes/sha256";
|
||||
import {
|
||||
type ClusterId,
|
||||
ContentTopic,
|
||||
PubsubTopic,
|
||||
type ShardId
|
||||
} from "@waku/interfaces";
|
||||
|
||||
import { concat, utf8ToBytes } from "../../bytes/index.js";
|
||||
|
||||
export * from "./type_guards.js";
|
||||
export * from "./routing_info.js";
|
||||
|
||||
export const formatPubsubTopic = (
|
||||
clusterId: ClusterId,
|
||||
shard: ShardId
|
||||
): PubsubTopic => {
|
||||
return `/waku/2/rs/${clusterId}/${shard}`;
|
||||
};
|
||||
|
||||
/**
|
||||
* @deprecated will be removed
|
||||
*/
|
||||
export const pubsubTopicToSingleShardInfo = (
|
||||
pubsubTopics: PubsubTopic
|
||||
): { clusterId: ClusterId; shard: ShardId } => {
|
||||
const parts = pubsubTopics.split("/");
|
||||
|
||||
if (
|
||||
parts.length != 6 ||
|
||||
parts[1] !== "waku" ||
|
||||
parts[2] !== "2" ||
|
||||
parts[3] !== "rs"
|
||||
)
|
||||
throw new Error("Invalid pubsub topic");
|
||||
|
||||
const clusterId = parseInt(parts[4]);
|
||||
const shard = parseInt(parts[5]);
|
||||
|
||||
if (isNaN(clusterId) || isNaN(shard))
|
||||
throw new Error("Invalid clusterId or shard");
|
||||
|
||||
return {
|
||||
clusterId,
|
||||
shard
|
||||
};
|
||||
};
|
||||
|
||||
interface ParsedContentTopic {
|
||||
generation: number;
|
||||
application: string;
|
||||
version: string;
|
||||
topicName: string;
|
||||
encoding: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Given a string, will throw an error if it is not formatted as a valid content topic for autosharding based on https://rfc.vac.dev/spec/51/
|
||||
* @param contentTopic String to validate
|
||||
* @returns Object with each content topic field as an attribute
|
||||
*/
|
||||
export function ensureValidContentTopic(
|
||||
contentTopic: ContentTopic
|
||||
): ParsedContentTopic {
|
||||
const parts = (contentTopic as string).split("/");
|
||||
if (parts.length < 5 || parts.length > 6) {
|
||||
throw Error(`Content topic format is invalid: ${contentTopic}`);
|
||||
}
|
||||
// Validate generation field if present
|
||||
let generation = 0;
|
||||
if (parts.length == 6) {
|
||||
generation = parseInt(parts[1]);
|
||||
if (isNaN(generation)) {
|
||||
throw new Error(
|
||||
`Invalid generation field in content topic: ${contentTopic}`
|
||||
);
|
||||
}
|
||||
if (generation > 0) {
|
||||
throw new Error(
|
||||
`Generation greater than 0 is not supported: ${contentTopic}`
|
||||
);
|
||||
}
|
||||
}
|
||||
// Validate remaining fields
|
||||
const fields = parts.splice(-4);
|
||||
// Validate application field
|
||||
if (fields[0].length == 0) {
|
||||
throw new Error(`Application field cannot be empty: ${contentTopic}`);
|
||||
}
|
||||
// Validate version field
|
||||
if (fields[1].length == 0) {
|
||||
throw new Error(`Version field cannot be empty: ${contentTopic}`);
|
||||
}
|
||||
// Validate topic name field
|
||||
if (fields[2].length == 0) {
|
||||
throw new Error(`Topic name field cannot be empty: ${contentTopic}`);
|
||||
}
|
||||
// Validate encoding field
|
||||
if (fields[3].length == 0) {
|
||||
throw new Error(`Encoding field cannot be empty: ${contentTopic}`);
|
||||
}
|
||||
|
||||
return {
|
||||
generation,
|
||||
application: fields[0],
|
||||
version: fields[1],
|
||||
topicName: fields[2],
|
||||
encoding: fields[3]
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Given a string, determines which autoshard index to use for its pubsub topic.
|
||||
* Based on the algorithm described in the RFC: https://rfc.vac.dev/spec/51//#algorithm
|
||||
*/
|
||||
export function contentTopicToShardIndex(
|
||||
contentTopic: ContentTopic,
|
||||
numShardsInCluster: number
|
||||
): number {
|
||||
const { application, version } = ensureValidContentTopic(contentTopic);
|
||||
const digest = sha256(
|
||||
concat([utf8ToBytes(application), utf8ToBytes(version)])
|
||||
);
|
||||
const dataview = new DataView(digest.buffer.slice(-8));
|
||||
return Number(dataview.getBigUint64(0, false) % BigInt(numShardsInCluster));
|
||||
}
|
||||
|
||||
export function contentTopicToPubsubTopic(
|
||||
contentTopic: ContentTopic,
|
||||
clusterId: number,
|
||||
numShardsInCluster: number
|
||||
): string {
|
||||
if (!contentTopic) {
|
||||
throw Error("Content topic must be specified");
|
||||
}
|
||||
|
||||
const shardIndex = contentTopicToShardIndex(contentTopic, numShardsInCluster);
|
||||
return `/waku/2/rs/${clusterId}/${shardIndex}`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Given an array of content topics, groups them together by their Pubsub topic as derived using the algorithm for autosharding.
|
||||
* If any of the content topics are not properly formatted, the function will throw an error.
|
||||
*/
|
||||
export function contentTopicsByPubsubTopic(
|
||||
contentTopics: ContentTopic[],
|
||||
clusterId: number,
|
||||
networkShards: number
|
||||
): Map<string, Array<string>> {
|
||||
const groupedContentTopics = new Map();
|
||||
for (const contentTopic of contentTopics) {
|
||||
const pubsubTopic = contentTopicToPubsubTopic(
|
||||
contentTopic,
|
||||
clusterId,
|
||||
networkShards
|
||||
);
|
||||
let topics = groupedContentTopics.get(pubsubTopic);
|
||||
if (!topics) {
|
||||
groupedContentTopics.set(pubsubTopic, []);
|
||||
topics = groupedContentTopics.get(pubsubTopic);
|
||||
}
|
||||
topics.push(contentTopic);
|
||||
}
|
||||
return groupedContentTopics;
|
||||
}
|
||||
|
||||
@ -1,3 +1,4 @@
|
||||
import { sha256 } from "@noble/hashes/sha256";
|
||||
import type {
|
||||
AutoSharding,
|
||||
ClusterId,
|
||||
@ -9,25 +10,133 @@ import type {
|
||||
StaticSharding
|
||||
} from "@waku/interfaces";
|
||||
|
||||
import {
|
||||
contentTopicToShardIndex,
|
||||
ensureValidContentTopic,
|
||||
formatPubsubTopic,
|
||||
isAutoSharding,
|
||||
pubsubTopicToSingleShardInfo
|
||||
} from "./index.js";
|
||||
import { concat, utf8ToBytes } from "../../bytes/index.js";
|
||||
|
||||
import { isAutoSharding } from "./index.js";
|
||||
|
||||
const formatPubsubTopic = (
|
||||
clusterId: ClusterId,
|
||||
shard: ShardId
|
||||
): PubsubTopic => {
|
||||
return `/waku/2/rs/${clusterId}/${shard}`;
|
||||
};
|
||||
|
||||
interface ParsedContentTopic {
|
||||
generation: number;
|
||||
application: string;
|
||||
version: string;
|
||||
topicName: string;
|
||||
encoding: string;
|
||||
}
|
||||
|
||||
function ensureValidContentTopic(
|
||||
contentTopic: ContentTopic
|
||||
): ParsedContentTopic {
|
||||
const parts = (contentTopic as string).split("/");
|
||||
if (parts.length < 5 || parts.length > 6) {
|
||||
throw Error(`Content topic format is invalid: ${contentTopic}`);
|
||||
}
|
||||
// Validate generation field if present
|
||||
let generation = 0;
|
||||
if (parts.length == 6) {
|
||||
generation = parseInt(parts[1]);
|
||||
if (isNaN(generation)) {
|
||||
throw new Error(
|
||||
`Invalid generation field in content topic: ${contentTopic}`
|
||||
);
|
||||
}
|
||||
if (generation > 0) {
|
||||
throw new Error(
|
||||
`Generation greater than 0 is not supported: ${contentTopic}`
|
||||
);
|
||||
}
|
||||
}
|
||||
// Validate remaining fields
|
||||
const fields = parts.splice(-4);
|
||||
// Validate application field
|
||||
if (fields[0].length == 0) {
|
||||
throw new Error(`Application field cannot be empty: ${contentTopic}`);
|
||||
}
|
||||
// Validate version field
|
||||
if (fields[1].length == 0) {
|
||||
throw new Error(`Version field cannot be empty: ${contentTopic}`);
|
||||
}
|
||||
// Validate topic name field
|
||||
if (fields[2].length == 0) {
|
||||
throw new Error(`Topic name field cannot be empty: ${contentTopic}`);
|
||||
}
|
||||
// Validate encoding field
|
||||
if (fields[3].length == 0) {
|
||||
throw new Error(`Encoding field cannot be empty: ${contentTopic}`);
|
||||
}
|
||||
|
||||
return {
|
||||
generation,
|
||||
application: fields[0],
|
||||
version: fields[1],
|
||||
topicName: fields[2],
|
||||
encoding: fields[3]
|
||||
};
|
||||
}
|
||||
|
||||
function contentTopicToShardIndex(
|
||||
contentTopic: ContentTopic,
|
||||
numShardsInCluster: number
|
||||
): number {
|
||||
const { application, version } = ensureValidContentTopic(contentTopic);
|
||||
const digest = sha256(
|
||||
concat([utf8ToBytes(application), utf8ToBytes(version)])
|
||||
);
|
||||
const dataview = new DataView(digest.buffer.slice(-8));
|
||||
return Number(dataview.getBigUint64(0, false) % BigInt(numShardsInCluster));
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated will be removed
|
||||
*/
|
||||
const pubsubTopicToSingleShardInfo = (
|
||||
pubsubTopics: PubsubTopic
|
||||
): { clusterId: ClusterId; shard: ShardId } => {
|
||||
const parts = pubsubTopics.split("/");
|
||||
|
||||
if (
|
||||
parts.length != 6 ||
|
||||
parts[1] !== "waku" ||
|
||||
parts[2] !== "2" ||
|
||||
parts[3] !== "rs"
|
||||
)
|
||||
throw new Error("Invalid pubsub topic");
|
||||
|
||||
const clusterId = parseInt(parts[4]);
|
||||
const shard = parseInt(parts[5]);
|
||||
|
||||
if (isNaN(clusterId) || isNaN(shard))
|
||||
throw new Error("Invalid clusterId or shard");
|
||||
|
||||
return {
|
||||
clusterId,
|
||||
shard
|
||||
};
|
||||
};
|
||||
|
||||
export type RoutingInfo = AutoShardingRoutingInfo | StaticShardingRoutingInfo;
|
||||
|
||||
export abstract class BaseRoutingInfo {
|
||||
public pubsubTopic: PubsubTopic;
|
||||
public shardId: ShardId;
|
||||
|
||||
protected constructor(
|
||||
public networkConfig: NetworkConfig,
|
||||
public pubsubTopic: PubsubTopic,
|
||||
public shardId: ShardId
|
||||
) {}
|
||||
pubsubTopic: PubsubTopic,
|
||||
shardId: ShardId
|
||||
) {
|
||||
this.pubsubTopic = pubsubTopic;
|
||||
this.shardId = shardId;
|
||||
}
|
||||
|
||||
public abstract get isAutoSharding(): boolean;
|
||||
public abstract get isStaticSharding(): boolean;
|
||||
public get clusterId(): ClusterId {
|
||||
return this.networkConfig.clusterId;
|
||||
}
|
||||
}
|
||||
|
||||
export class AutoShardingRoutingInfo
|
||||
@ -61,24 +170,12 @@ export class AutoShardingRoutingInfo
|
||||
*/
|
||||
private constructor(
|
||||
public networkConfig: AutoSharding,
|
||||
public pubsubTopic: PubsubTopic,
|
||||
public shardId: ShardId,
|
||||
pubsubTopic: PubsubTopic,
|
||||
shardId: ShardId,
|
||||
public contentTopic: string
|
||||
) {
|
||||
super(networkConfig, pubsubTopic, shardId);
|
||||
}
|
||||
|
||||
public get clusterId(): number {
|
||||
return this.networkConfig.clusterId;
|
||||
}
|
||||
|
||||
public get isAutoSharding(): boolean {
|
||||
return true;
|
||||
}
|
||||
|
||||
public get isStaticSharding(): boolean {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
export class StaticShardingRoutingInfo
|
||||
@ -127,35 +224,23 @@ export class StaticShardingRoutingInfo
|
||||
*/
|
||||
private constructor(
|
||||
public networkConfig: StaticSharding,
|
||||
public pubsubTopic: PubsubTopic,
|
||||
public shardId: ShardId
|
||||
pubsubTopic: PubsubTopic,
|
||||
shardId: ShardId
|
||||
) {
|
||||
super(networkConfig, pubsubTopic, shardId);
|
||||
}
|
||||
|
||||
public get clusterId(): ClusterId {
|
||||
return this.networkConfig.clusterId;
|
||||
}
|
||||
|
||||
public get isAutoSharding(): boolean {
|
||||
return false;
|
||||
}
|
||||
|
||||
public get isStaticSharding(): boolean {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
export function isAutoShardingRoutingInfo(
|
||||
routingInfo: BaseRoutingInfo
|
||||
): routingInfo is AutoShardingRoutingInfo {
|
||||
return routingInfo.isAutoSharding;
|
||||
return routingInfo instanceof AutoShardingRoutingInfo;
|
||||
}
|
||||
|
||||
export function isStaticShardingRoutingInfo(
|
||||
routingInfo: BaseRoutingInfo
|
||||
): routingInfo is StaticShardingRoutingInfo {
|
||||
return routingInfo.isStaticSharding;
|
||||
return routingInfo instanceof StaticShardingRoutingInfo;
|
||||
}
|
||||
|
||||
export function createRoutingInfo(
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user