danisharora099 2bc3735e4d
feat: add support for autosharded pubsub topics
tests: use a generator for sharded pubsub topics
set pubsub topic in encoder/decoder based on sharding type
add function for grouping content topics by pubsub topic
add autosharding config to create options
add autoshard rpc endpoints to nwaku and use in tests
set autoshard pubsub topics in all protocols
fix rebase with static sharding
removes unused function
remove console logs
remove autosharding from ShardInfo, add to EncoderOptions
fix enr and encoder/decoder options
test that same application/version hashes to same shard index
update comment on shard field
fix spelling of autosharding
fix content topic protocol in tests
add sharding type alias and function to determine topic in encoders/decoders
move DefaultPubsubTopic from core to interfaces
2023-12-21 10:03:22 -08:00

139 lines
3.7 KiB
TypeScript

import {
createDecoder,
createEncoder,
DecodedMessage,
Decoder,
waitForRemotePeer
} from "@waku/core";
import {
DefaultPubsubTopic,
LightNode,
Protocols,
ShardInfo,
ShardingParams
} from "@waku/interfaces";
import { createLightNode } from "@waku/sdk";
import { Logger, singleShardInfoToPubsubTopic } from "@waku/utils";
import { expect } from "chai";
import { delay, NimGoNode, NOISE_KEY_1 } from "../../src";
export const log = new Logger("test:store");
export const TestContentTopic = "/test/1/waku-store/utf8";
export const TestEncoder = createEncoder({ contentTopic: TestContentTopic });
export const TestDecoder = createDecoder(TestContentTopic);
export const customShardedPubsubTopic1 = singleShardInfoToPubsubTopic({
clusterId: 3,
shard: 1
});
export const customShardedPubsubTopic2 = singleShardInfoToPubsubTopic({
clusterId: 3,
shard: 2
});
export const shardInfo1: ShardInfo = { clusterId: 3, shards: [1] };
export const customContentTopic1 = "/test/2/waku-store/utf8";
export const customContentTopic2 = "/test/3/waku-store/utf8";
export const customDecoder1 = createDecoder(customContentTopic1, {
clusterId: 3,
shard: 1
});
export const customDecoder2 = createDecoder(customContentTopic2, {
clusterId: 3,
shard: 2
});
export const shardInfoBothShards: ShardInfo = { clusterId: 3, shards: [1, 2] };
export const totalMsgs = 20;
export const messageText = "Store Push works!";
export async function sendMessages(
instance: NimGoNode,
numMessages: number,
contentTopic: string,
pubsubTopic: string
): Promise<void> {
for (let i = 0; i < numMessages; i++) {
expect(
await instance.sendMessage(
NimGoNode.toMessageRpcQuery({
payload: new Uint8Array([i]),
contentTopic: contentTopic
}),
pubsubTopic
)
).to.eq(true);
await delay(1); // to ensure each timestamp is unique.
}
}
export async function sendMessagesAutosharding(
instance: NimGoNode,
numMessages: number,
contentTopic: string
): Promise<void> {
for (let i = 0; i < numMessages; i++) {
expect(
await instance.sendMessageAutosharding(
NimGoNode.toMessageRpcQuery({
payload: new Uint8Array([i]),
contentTopic: contentTopic
})
)
).to.eq(true);
await delay(1); // to ensure each timestamp is unique.
}
}
export async function processQueriedMessages(
instance: LightNode,
decoders: Array<Decoder>,
expectedTopic?: string
): Promise<DecodedMessage[]> {
const localMessages: DecodedMessage[] = [];
for await (const query of instance.store.queryGenerator(decoders)) {
for await (const msg of query) {
if (msg) {
expect(msg.pubsubTopic).to.eq(expectedTopic);
localMessages.push(msg as DecodedMessage);
}
}
}
return localMessages;
}
export async function startAndConnectLightNode(
instance: NimGoNode,
pubsubTopics: string[] = [DefaultPubsubTopic],
shardInfo?: ShardingParams
): Promise<LightNode> {
const waku = await createLightNode({
...((pubsubTopics.length !== 1 ||
pubsubTopics[0] !== DefaultPubsubTopic) && {
shardInfo: shardInfo
}),
staticNoiseKey: NOISE_KEY_1
});
await waku.start();
await waku.dial(await instance.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Store]);
log.info("Waku node created");
return waku;
}
export function chunkAndReverseArray(
arr: number[],
chunkSize: number
): number[] {
const result: number[] = [];
for (let i = 0; i < arr.length; i += chunkSize) {
result.push(...arr.slice(i, i + chunkSize).reverse());
}
return result.reverse();
}
export const adjustDate = (baseDate: Date, adjustMs: number): Date => {
const adjusted = new Date(baseDate);
adjusted.setTime(adjusted.getTime() + adjustMs);
return adjusted;
};