mirror of https://github.com/waku-org/js-waku.git
add tests
This commit is contained in:
parent
f718a0fdad
commit
a814389773
|
@ -107,6 +107,11 @@ export class ServiceNodesFleet {
|
|||
message: MessageRpcQuery,
|
||||
pubsubTopic: string = DefaultPubsubTopic
|
||||
): Promise<boolean> {
|
||||
message = {
|
||||
...message,
|
||||
timestamp:
|
||||
message.timestamp || BigInt(new Date().valueOf()) * BigInt(1_000_000)
|
||||
};
|
||||
const relayMessagePromises: Promise<boolean>[] = this.nodes.map((node) =>
|
||||
node.sendMessage(message, pubsubTopic)
|
||||
);
|
||||
|
@ -141,7 +146,7 @@ class MultipleNodesMessageCollector {
|
|||
callback: (msg: DecodedMessage) => void = () => {};
|
||||
messageList: Array<DecodedMessage> = [];
|
||||
constructor(
|
||||
private messageCollectors: MessageCollector[],
|
||||
public messageCollectors: MessageCollector[],
|
||||
private relayNodes?: ServiceNode[],
|
||||
private strictChecking: boolean = false
|
||||
) {
|
||||
|
|
|
@ -0,0 +1,223 @@
|
|||
import { DecodedMessage } from "@waku/core";
|
||||
import type { LightNode } from "@waku/interfaces";
|
||||
import { bytesToUtf8 } from "@waku/utils/bytes";
|
||||
import { expect } from "chai";
|
||||
|
||||
import {
|
||||
afterEachCustom,
|
||||
beforeEachCustom,
|
||||
ServiceNodesFleet,
|
||||
tearDownNodes
|
||||
} from "../../src";
|
||||
|
||||
import {
|
||||
TestDecoder,
|
||||
TestDecoder2,
|
||||
TestShardInfo,
|
||||
totalMsgs
|
||||
} from "./single_node/utils";
|
||||
import {
|
||||
runMultipleNodes,
|
||||
sendMessagesToMultipleNodes,
|
||||
startAndConnectLightNodeWithMultipleServiceNodes
|
||||
} from "./utils";
|
||||
|
||||
describe("Waku Store: Multiple Nodes: cursor", function () {
|
||||
this.timeout(15000);
|
||||
let waku: LightNode;
|
||||
let waku2: LightNode;
|
||||
let serviceNodesFleet: ServiceNodesFleet;
|
||||
|
||||
beforeEachCustom(this, async () => {
|
||||
[serviceNodesFleet, waku] = await runMultipleNodes(this.ctx, TestShardInfo);
|
||||
});
|
||||
|
||||
afterEachCustom(this, async () => {
|
||||
await tearDownNodes(serviceNodesFleet.nodes, [waku, waku2]);
|
||||
});
|
||||
|
||||
[
|
||||
[2, 4],
|
||||
[0, 20],
|
||||
[10, 40],
|
||||
[19, 20],
|
||||
[19, 50],
|
||||
[110, 120]
|
||||
].forEach(([cursorIndex, messageCount]) => {
|
||||
it(`Passing a valid cursor at ${cursorIndex} index when there are ${messageCount} messages`, async function () {
|
||||
await sendMessagesToMultipleNodes(
|
||||
serviceNodesFleet.nodes,
|
||||
messageCount,
|
||||
TestDecoder.contentTopic,
|
||||
TestDecoder.pubsubTopic
|
||||
);
|
||||
|
||||
// messages in reversed order (first message at last index)
|
||||
const messages: DecodedMessage[] = [];
|
||||
for await (const page of waku.store.queryGenerator([TestDecoder])) {
|
||||
for await (const msg of page.reverse()) {
|
||||
messages.push(msg as DecodedMessage);
|
||||
}
|
||||
}
|
||||
|
||||
// create cursor to extract messages after the cursorIndex
|
||||
const cursor = waku.store.createCursor(messages[cursorIndex]);
|
||||
|
||||
const messagesAfterCursor: DecodedMessage[] = [];
|
||||
for await (const page of waku.store.queryGenerator([TestDecoder], {
|
||||
cursor
|
||||
})) {
|
||||
for await (const msg of page.reverse()) {
|
||||
if (msg) {
|
||||
messagesAfterCursor.push(msg as DecodedMessage);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
expect(messages.length).be.eql(messageCount);
|
||||
expect(messagesAfterCursor.length).be.eql(messageCount - cursorIndex - 1);
|
||||
if (cursorIndex == messages.length - 1) {
|
||||
// in this case the cursor will return nothin because it points at the end of the list
|
||||
expect(messagesAfterCursor).be.eql([]);
|
||||
} else {
|
||||
expect(bytesToUtf8(messagesAfterCursor[0].payload)).to.be.eq(
|
||||
bytesToUtf8(messages[cursorIndex + 1].payload)
|
||||
);
|
||||
expect(
|
||||
bytesToUtf8(
|
||||
messagesAfterCursor[messagesAfterCursor.length - 1].payload
|
||||
)
|
||||
).to.be.eq(bytesToUtf8(messages[messages.length - 1].payload));
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
it("Reusing cursor across nodes", async function () {
|
||||
await sendMessagesToMultipleNodes(
|
||||
serviceNodesFleet.nodes,
|
||||
totalMsgs,
|
||||
TestDecoder.contentTopic,
|
||||
TestDecoder.pubsubTopic
|
||||
);
|
||||
waku2 = await startAndConnectLightNodeWithMultipleServiceNodes(
|
||||
serviceNodesFleet.nodes,
|
||||
TestShardInfo
|
||||
);
|
||||
|
||||
// messages in reversed order (first message at last index)
|
||||
const messages: DecodedMessage[] = [];
|
||||
for await (const page of waku.store.queryGenerator([TestDecoder])) {
|
||||
for await (const msg of page.reverse()) {
|
||||
messages.push(msg as DecodedMessage);
|
||||
}
|
||||
}
|
||||
|
||||
// create cursor to extract messages after the cursorIndex
|
||||
const cursor = waku.store.createCursor(messages[5]);
|
||||
|
||||
// query node2 with the cursor from node1
|
||||
const messagesAfterCursor: DecodedMessage[] = [];
|
||||
for await (const page of waku2.store.queryGenerator([TestDecoder], {
|
||||
cursor
|
||||
})) {
|
||||
for await (const msg of page.reverse()) {
|
||||
if (msg) {
|
||||
messagesAfterCursor.push(msg as DecodedMessage);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
expect(messages.length).be.eql(totalMsgs);
|
||||
expect(messagesAfterCursor.length).be.eql(totalMsgs - 6);
|
||||
expect(bytesToUtf8(messagesAfterCursor[0].payload)).to.be.eq(
|
||||
bytesToUtf8(messages[6].payload)
|
||||
);
|
||||
expect(
|
||||
bytesToUtf8(messagesAfterCursor[messagesAfterCursor.length - 1].payload)
|
||||
).to.be.eq(bytesToUtf8(messages[messages.length - 1].payload));
|
||||
});
|
||||
|
||||
it("Passing cursor with wrong message digest", async function () {
|
||||
await sendMessagesToMultipleNodes(
|
||||
serviceNodesFleet.nodes,
|
||||
totalMsgs,
|
||||
TestDecoder.contentTopic,
|
||||
TestDecoder.pubsubTopic
|
||||
);
|
||||
|
||||
const messages: DecodedMessage[] = [];
|
||||
for await (const page of waku.store.queryGenerator([TestDecoder])) {
|
||||
for await (const msg of page.reverse()) {
|
||||
messages.push(msg as DecodedMessage);
|
||||
}
|
||||
}
|
||||
const cursor = waku.store.createCursor(messages[5]);
|
||||
|
||||
// setting a wrong digest
|
||||
cursor.digest = new Uint8Array([]);
|
||||
|
||||
const messagesAfterCursor: DecodedMessage[] = [];
|
||||
try {
|
||||
for await (const page of waku.store.queryGenerator([TestDecoder], {
|
||||
cursor
|
||||
})) {
|
||||
for await (const msg of page.reverse()) {
|
||||
if (msg) {
|
||||
messagesAfterCursor.push(msg as DecodedMessage);
|
||||
}
|
||||
}
|
||||
}
|
||||
// Should return same as go-waku. Raised bug: https://github.com/waku-org/nwaku/issues/2117
|
||||
expect(messagesAfterCursor.length).to.eql(0);
|
||||
} catch (error) {
|
||||
for (const node of serviceNodesFleet.nodes) {
|
||||
if (
|
||||
node.type === "go-waku" &&
|
||||
typeof error === "string" &&
|
||||
error.includes("History response contains an Error: INVALID_CURSOR")
|
||||
) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
throw error instanceof Error
|
||||
? new Error(`Unexpected error: ${error.message}`)
|
||||
: error;
|
||||
}
|
||||
});
|
||||
|
||||
it("Passing cursor with wrong pubsubTopic", async function () {
|
||||
await sendMessagesToMultipleNodes(
|
||||
serviceNodesFleet.nodes,
|
||||
totalMsgs,
|
||||
TestDecoder.contentTopic,
|
||||
TestDecoder.pubsubTopic
|
||||
);
|
||||
|
||||
const messages: DecodedMessage[] = [];
|
||||
for await (const page of waku.store.queryGenerator([TestDecoder])) {
|
||||
for await (const msg of page.reverse()) {
|
||||
messages.push(msg as DecodedMessage);
|
||||
}
|
||||
}
|
||||
messages[5].pubsubTopic = TestDecoder2.pubsubTopic;
|
||||
const cursor = waku.store.createCursor(messages[5]);
|
||||
|
||||
try {
|
||||
for await (const page of waku.store.queryGenerator([TestDecoder], {
|
||||
cursor
|
||||
})) {
|
||||
void page;
|
||||
}
|
||||
throw new Error("Cursor with wrong pubsubtopic was accepted");
|
||||
} catch (err) {
|
||||
if (
|
||||
!(err instanceof Error) ||
|
||||
!err.message.includes(
|
||||
`Cursor pubsub topic (${TestDecoder2.pubsubTopic}) does not match decoder pubsub topic (${TestDecoder.pubsubTopic})`
|
||||
)
|
||||
) {
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
|
@ -0,0 +1,225 @@
|
|||
import { createDecoder } from "@waku/core";
|
||||
import { IMessage, type LightNode } from "@waku/interfaces";
|
||||
import { determinePubsubTopic } from "@waku/utils";
|
||||
import { expect } from "chai";
|
||||
|
||||
import {
|
||||
afterEachCustom,
|
||||
beforeEachCustom,
|
||||
ServiceNodesFleet,
|
||||
tearDownNodes
|
||||
} from "../../src";
|
||||
|
||||
import {
|
||||
processQueriedMessages,
|
||||
TestContentTopic1,
|
||||
TestDecoder,
|
||||
TestDecoder2,
|
||||
TestShardInfo
|
||||
} from "./single_node/utils";
|
||||
import { runMultipleNodes } from "./utils";
|
||||
|
||||
describe("Waku Store: Multiple Peers: error handling", function () {
|
||||
this.timeout(15000);
|
||||
let waku: LightNode;
|
||||
let ServiceNodesFleet: ServiceNodesFleet;
|
||||
|
||||
beforeEachCustom(this, async () => {
|
||||
[ServiceNodesFleet, waku] = await runMultipleNodes(this.ctx, TestShardInfo);
|
||||
});
|
||||
|
||||
afterEachCustom(this, async () => {
|
||||
await tearDownNodes(ServiceNodesFleet.nodes, waku);
|
||||
});
|
||||
|
||||
it("Query Generator, Wrong PubsubTopic", async function () {
|
||||
const wrongDecoder = createDecoder(TestContentTopic1, "WrongPubsubTopic");
|
||||
|
||||
try {
|
||||
for await (const msgPromises of waku.store.queryGenerator([
|
||||
wrongDecoder
|
||||
])) {
|
||||
void msgPromises;
|
||||
}
|
||||
throw new Error("QueryGenerator was successful but was expected to fail");
|
||||
} catch (err) {
|
||||
if (
|
||||
!(err instanceof Error) ||
|
||||
!err.message.includes(
|
||||
`Pubsub topic ${wrongDecoder.pubsubTopic} has not been configured on this instance. Configured topics are: ${TestDecoder.pubsubTopic}`
|
||||
)
|
||||
) {
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
it("Query Generator, Multiple PubsubTopics", async function () {
|
||||
try {
|
||||
for await (const msgPromises of waku.store.queryGenerator([
|
||||
TestDecoder,
|
||||
TestDecoder2
|
||||
])) {
|
||||
void msgPromises;
|
||||
}
|
||||
throw new Error("QueryGenerator was successful but was expected to fail");
|
||||
} catch (err) {
|
||||
if (
|
||||
!(err instanceof Error) ||
|
||||
!err.message.includes(
|
||||
"API does not support querying multiple pubsub topics at once"
|
||||
)
|
||||
) {
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
it("Query Generator, No Decoder", async function () {
|
||||
try {
|
||||
for await (const msgPromises of waku.store.queryGenerator([])) {
|
||||
void msgPromises;
|
||||
}
|
||||
throw new Error("QueryGenerator was successful but was expected to fail");
|
||||
} catch (err) {
|
||||
if (
|
||||
!(err instanceof Error) ||
|
||||
!err.message.includes("No decoders provided")
|
||||
) {
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
it("Query Generator, No message returned", async function () {
|
||||
const WrongTestPubsubTopic = determinePubsubTopic("/test/1/wrong/utf8");
|
||||
const messages = await processQueriedMessages(
|
||||
waku,
|
||||
[TestDecoder],
|
||||
WrongTestPubsubTopic
|
||||
);
|
||||
expect(messages?.length).eq(0);
|
||||
});
|
||||
|
||||
it("Query with Ordered Callback, Wrong PubsubTopic", async function () {
|
||||
const wrongDecoder = createDecoder(TestContentTopic1, "WrongPubsubTopic");
|
||||
try {
|
||||
await waku.store.queryWithOrderedCallback([wrongDecoder], async () => {});
|
||||
throw new Error("QueryGenerator was successful but was expected to fail");
|
||||
} catch (err) {
|
||||
if (
|
||||
!(err instanceof Error) ||
|
||||
!err.message.includes(
|
||||
`Pubsub topic ${wrongDecoder.pubsubTopic} has not been configured on this instance. Configured topics are: ${TestDecoder.pubsubTopic}`
|
||||
)
|
||||
) {
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
it("Query with Ordered Callback, Multiple PubsubTopics", async function () {
|
||||
try {
|
||||
await waku.store.queryWithOrderedCallback(
|
||||
[TestDecoder, TestDecoder2],
|
||||
async () => {}
|
||||
);
|
||||
throw new Error("QueryGenerator was successful but was expected to fail");
|
||||
} catch (err) {
|
||||
if (
|
||||
!(err instanceof Error) ||
|
||||
!err.message.includes(
|
||||
"API does not support querying multiple pubsub topics at once"
|
||||
)
|
||||
) {
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
it("Query with Ordered Callback, No Decoder", async function () {
|
||||
try {
|
||||
await waku.store.queryWithOrderedCallback([], async () => {});
|
||||
throw new Error("QueryGenerator was successful but was expected to fail");
|
||||
} catch (err) {
|
||||
if (
|
||||
!(err instanceof Error) ||
|
||||
!err.message.includes("No decoders provided")
|
||||
) {
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
it("Query with Ordered Callback, No message returned", async function () {
|
||||
const messages: IMessage[] = [];
|
||||
await waku.store.queryWithOrderedCallback([TestDecoder], async (msg) => {
|
||||
messages.push(msg);
|
||||
});
|
||||
expect(messages?.length).eq(0);
|
||||
});
|
||||
|
||||
it("Query with Promise Callback, Wrong PubsubTopic", async function () {
|
||||
const wrongDecoder = createDecoder(TestContentTopic1, "WrongPubsubTopic");
|
||||
try {
|
||||
await waku.store.queryWithPromiseCallback([wrongDecoder], async () => {});
|
||||
throw new Error("QueryGenerator was successful but was expected to fail");
|
||||
} catch (err) {
|
||||
if (
|
||||
!(err instanceof Error) ||
|
||||
!err.message.includes(
|
||||
`Pubsub topic ${wrongDecoder.pubsubTopic} has not been configured on this instance. Configured topics are: ${TestDecoder.pubsubTopic}`
|
||||
)
|
||||
) {
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
it("Query with Promise Callback, Multiple PubsubTopics", async function () {
|
||||
try {
|
||||
await waku.store.queryWithPromiseCallback(
|
||||
[TestDecoder, TestDecoder2],
|
||||
async () => {}
|
||||
);
|
||||
throw new Error("QueryGenerator was successful but was expected to fail");
|
||||
} catch (err) {
|
||||
if (
|
||||
!(err instanceof Error) ||
|
||||
!err.message.includes(
|
||||
"API does not support querying multiple pubsub topics at once"
|
||||
)
|
||||
) {
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
it("Query with Promise Callback, No Decoder", async function () {
|
||||
try {
|
||||
await waku.store.queryWithPromiseCallback([], async () => {});
|
||||
throw new Error("QueryGenerator was successful but was expected to fail");
|
||||
} catch (err) {
|
||||
if (
|
||||
!(err instanceof Error) ||
|
||||
!err.message.includes("No decoders provided")
|
||||
) {
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
it("Query with Promise Callback, No message returned", async function () {
|
||||
const messages: IMessage[] = [];
|
||||
await waku.store.queryWithPromiseCallback(
|
||||
[TestDecoder],
|
||||
async (msgPromise) => {
|
||||
const msg = await msgPromise;
|
||||
if (msg) {
|
||||
messages.push(msg);
|
||||
}
|
||||
}
|
||||
);
|
||||
expect(messages?.length).eq(0);
|
||||
});
|
||||
});
|
|
@ -0,0 +1,371 @@
|
|||
import { createDecoder, DecodedMessage, waitForRemotePeer } from "@waku/core";
|
||||
import type { IMessage, LightNode } from "@waku/interfaces";
|
||||
import { Protocols } from "@waku/interfaces";
|
||||
import {
|
||||
generatePrivateKey,
|
||||
generateSymmetricKey,
|
||||
getPublicKey
|
||||
} from "@waku/message-encryption";
|
||||
import {
|
||||
createDecoder as createEciesDecoder,
|
||||
createEncoder as createEciesEncoder
|
||||
} from "@waku/message-encryption/ecies";
|
||||
import {
|
||||
createDecoder as createSymDecoder,
|
||||
createEncoder as createSymEncoder
|
||||
} from "@waku/message-encryption/symmetric";
|
||||
import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes";
|
||||
import { expect } from "chai";
|
||||
import { equals } from "uint8arrays/equals";
|
||||
|
||||
import {
|
||||
afterEachCustom,
|
||||
beforeEachCustom,
|
||||
delay,
|
||||
ServiceNode,
|
||||
ServiceNodesFleet,
|
||||
tearDownNodes,
|
||||
TEST_STRING
|
||||
} from "../../src";
|
||||
|
||||
import {
|
||||
messageText,
|
||||
processQueriedMessages,
|
||||
TestContentTopic1,
|
||||
TestDecoder,
|
||||
TestDecoder2,
|
||||
TestEncoder,
|
||||
TestPubsubTopic1,
|
||||
TestShardInfo,
|
||||
totalMsgs
|
||||
} from "./single_node/utils";
|
||||
import {
|
||||
runMultipleNodes,
|
||||
sendMessagesToMultipleNodes,
|
||||
startAndConnectLightNodeWithMultipleServiceNodes
|
||||
} from "./utils";
|
||||
|
||||
describe.only("Waku Store, general", function () {
|
||||
this.timeout(15000);
|
||||
let waku: LightNode;
|
||||
let waku2: LightNode;
|
||||
let serviceNodesFleet: ServiceNodesFleet;
|
||||
|
||||
beforeEachCustom(this, async () => {
|
||||
[serviceNodesFleet, waku] = await runMultipleNodes(this.ctx, TestShardInfo);
|
||||
});
|
||||
|
||||
afterEachCustom(this, async () => {
|
||||
await tearDownNodes(serviceNodesFleet.nodes, [waku, waku2]);
|
||||
});
|
||||
|
||||
it("Query generator for multiple messages", async function () {
|
||||
await sendMessagesToMultipleNodes(
|
||||
serviceNodesFleet.nodes,
|
||||
totalMsgs,
|
||||
TestDecoder.contentTopic,
|
||||
TestDecoder.pubsubTopic
|
||||
);
|
||||
|
||||
const messages = await processQueriedMessages(
|
||||
waku,
|
||||
[TestDecoder],
|
||||
TestDecoder.pubsubTopic
|
||||
);
|
||||
|
||||
expect(messages?.length).eq(totalMsgs);
|
||||
|
||||
// checking that the message with text 0 exists
|
||||
const result = messages?.findIndex((msg) => {
|
||||
return msg.payload[0]! === 0;
|
||||
});
|
||||
expect(result).to.not.eq(-1);
|
||||
});
|
||||
|
||||
it.only("Query generator for multiple messages with different message text format", async function () {
|
||||
for (const testItem of TEST_STRING) {
|
||||
expect(
|
||||
await serviceNodesFleet.sendRelayMessage(
|
||||
ServiceNode.toMessageRpcQuery({
|
||||
payload: utf8ToBytes(testItem["value"]),
|
||||
contentTopic: TestDecoder.contentTopic
|
||||
}),
|
||||
TestDecoder.pubsubTopic
|
||||
)
|
||||
).to.eq(true);
|
||||
await delay(1); // to ensure each timestamp is unique.
|
||||
}
|
||||
|
||||
serviceNodesFleet.messageCollector.messageList =
|
||||
await processQueriedMessages(
|
||||
waku,
|
||||
[TestDecoder],
|
||||
TestDecoder.pubsubTopic
|
||||
);
|
||||
|
||||
serviceNodesFleet.messageCollector.messageCollectors.forEach(
|
||||
(messageCollector) =>
|
||||
(messageCollector.list = serviceNodesFleet.messageCollector.messageList)
|
||||
);
|
||||
|
||||
// checking that all message sent were retrieved
|
||||
TEST_STRING.forEach((testItem) => {
|
||||
expect(
|
||||
serviceNodesFleet.messageCollector.hasMessage(
|
||||
TestDecoder.contentTopic,
|
||||
testItem["value"]
|
||||
)
|
||||
).to.eq(true);
|
||||
});
|
||||
});
|
||||
|
||||
it("Query generator for multiple messages with multiple decoders", async function () {
|
||||
const SecondDecoder = createDecoder(
|
||||
TestDecoder2.contentTopic,
|
||||
TestDecoder.pubsubTopic
|
||||
);
|
||||
|
||||
await serviceNodesFleet.sendRelayMessage(
|
||||
ServiceNode.toMessageRpcQuery({
|
||||
payload: utf8ToBytes("M1"),
|
||||
contentTopic: TestDecoder.contentTopic
|
||||
}),
|
||||
TestDecoder.pubsubTopic
|
||||
);
|
||||
await serviceNodesFleet.sendRelayMessage(
|
||||
ServiceNode.toMessageRpcQuery({
|
||||
payload: utf8ToBytes("M2"),
|
||||
contentTopic: SecondDecoder.contentTopic
|
||||
}),
|
||||
SecondDecoder.pubsubTopic
|
||||
);
|
||||
|
||||
serviceNodesFleet.messageCollector.messageList =
|
||||
await processQueriedMessages(
|
||||
waku,
|
||||
[TestDecoder, SecondDecoder],
|
||||
TestDecoder.pubsubTopic
|
||||
);
|
||||
expect(
|
||||
serviceNodesFleet.messageCollector.hasMessage(
|
||||
TestDecoder.contentTopic,
|
||||
"M1"
|
||||
)
|
||||
).to.eq(true);
|
||||
expect(
|
||||
serviceNodesFleet.messageCollector.hasMessage(
|
||||
SecondDecoder.contentTopic,
|
||||
"M2"
|
||||
)
|
||||
).to.eq(true);
|
||||
});
|
||||
|
||||
it("Query generator for multiple messages with different content topic format", async function () {
|
||||
for (const testItem of TEST_STRING) {
|
||||
expect(
|
||||
await serviceNodesFleet.sendRelayMessage(
|
||||
ServiceNode.toMessageRpcQuery({
|
||||
payload: utf8ToBytes(messageText),
|
||||
contentTopic: testItem["value"]
|
||||
}),
|
||||
TestDecoder.pubsubTopic
|
||||
)
|
||||
).to.eq(true);
|
||||
await delay(1); // to ensure each timestamp is unique.
|
||||
}
|
||||
|
||||
for (const testItem of TEST_STRING) {
|
||||
for await (const query of waku.store.queryGenerator([
|
||||
createDecoder(testItem["value"], TestDecoder.pubsubTopic)
|
||||
])) {
|
||||
for await (const msg of query) {
|
||||
expect(equals(msg!.payload, utf8ToBytes(messageText))).to.eq(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
it("Callback on promise", async function () {
|
||||
await sendMessagesToMultipleNodes(
|
||||
serviceNodesFleet.nodes,
|
||||
totalMsgs,
|
||||
TestDecoder.contentTopic,
|
||||
TestDecoder.pubsubTopic
|
||||
);
|
||||
|
||||
const messages: IMessage[] = [];
|
||||
await waku.store.queryWithPromiseCallback(
|
||||
[TestDecoder],
|
||||
async (msgPromise) => {
|
||||
const msg = await msgPromise;
|
||||
if (msg) {
|
||||
messages.push(msg);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
expect(messages?.length).eq(totalMsgs);
|
||||
const result = messages?.findIndex((msg) => {
|
||||
return msg.payload[0]! === 0;
|
||||
});
|
||||
expect(result).to.not.eq(-1);
|
||||
});
|
||||
|
||||
it("Callback on promise, aborts when callback returns true", async function () {
|
||||
await sendMessagesToMultipleNodes(
|
||||
serviceNodesFleet.nodes,
|
||||
totalMsgs,
|
||||
TestDecoder.contentTopic,
|
||||
TestDecoder.pubsubTopic
|
||||
);
|
||||
|
||||
const desiredMsgs = 14;
|
||||
const messages: IMessage[] = [];
|
||||
await waku.store.queryWithPromiseCallback(
|
||||
[TestDecoder],
|
||||
async (msgPromise) => {
|
||||
const msg = await msgPromise;
|
||||
if (msg) {
|
||||
messages.push(msg);
|
||||
}
|
||||
return messages.length >= desiredMsgs;
|
||||
},
|
||||
{ pageSize: 7 }
|
||||
);
|
||||
|
||||
expect(messages?.length).eq(desiredMsgs);
|
||||
});
|
||||
|
||||
it("Generator, with asymmetric & symmetric encrypted messages", async function () {
|
||||
const asymText = "This message is encrypted for me using asymmetric";
|
||||
const asymTopic = "/test/1/asymmetric/proto";
|
||||
const symText =
|
||||
"This message is encrypted for me using symmetric encryption";
|
||||
const symTopic = "/test/1/symmetric/proto";
|
||||
const clearText = "This is a clear text message for everyone to read";
|
||||
const otherText =
|
||||
"This message is not for and I must not be able to read it";
|
||||
|
||||
const timestamp = new Date();
|
||||
|
||||
const asymMsg = { payload: utf8ToBytes(asymText), timestamp };
|
||||
const symMsg = {
|
||||
payload: utf8ToBytes(symText),
|
||||
timestamp: new Date(timestamp.valueOf() + 1)
|
||||
};
|
||||
const clearMsg = {
|
||||
payload: utf8ToBytes(clearText),
|
||||
timestamp: new Date(timestamp.valueOf() + 2)
|
||||
};
|
||||
const otherMsg = {
|
||||
payload: utf8ToBytes(otherText),
|
||||
timestamp: new Date(timestamp.valueOf() + 3)
|
||||
};
|
||||
|
||||
const privateKey = generatePrivateKey();
|
||||
const symKey = generateSymmetricKey();
|
||||
const publicKey = getPublicKey(privateKey);
|
||||
|
||||
const eciesEncoder = createEciesEncoder({
|
||||
contentTopic: asymTopic,
|
||||
publicKey,
|
||||
pubsubTopic: TestPubsubTopic1
|
||||
});
|
||||
const symEncoder = createSymEncoder({
|
||||
contentTopic: symTopic,
|
||||
symKey,
|
||||
pubsubTopic: TestPubsubTopic1
|
||||
});
|
||||
|
||||
const otherEncoder = createEciesEncoder({
|
||||
contentTopic: TestContentTopic1,
|
||||
pubsubTopic: TestPubsubTopic1,
|
||||
publicKey: getPublicKey(generatePrivateKey())
|
||||
});
|
||||
|
||||
const eciesDecoder = createEciesDecoder(
|
||||
asymTopic,
|
||||
privateKey,
|
||||
TestDecoder.pubsubTopic
|
||||
);
|
||||
const symDecoder = createSymDecoder(
|
||||
symTopic,
|
||||
symKey,
|
||||
TestDecoder.pubsubTopic
|
||||
);
|
||||
|
||||
waku2 = await startAndConnectLightNodeWithMultipleServiceNodes(
|
||||
serviceNodesFleet.nodes,
|
||||
TestShardInfo
|
||||
);
|
||||
|
||||
await Promise.all([
|
||||
waku.lightPush.send(eciesEncoder, asymMsg),
|
||||
waku.lightPush.send(symEncoder, symMsg),
|
||||
waku.lightPush.send(otherEncoder, otherMsg),
|
||||
waku.lightPush.send(TestEncoder, clearMsg)
|
||||
]);
|
||||
|
||||
await waitForRemotePeer(waku2, [Protocols.Store]);
|
||||
|
||||
const messages: DecodedMessage[] = [];
|
||||
|
||||
for await (const query of waku2.store.queryGenerator([
|
||||
eciesDecoder,
|
||||
symDecoder,
|
||||
TestDecoder
|
||||
])) {
|
||||
for await (const msg of query) {
|
||||
if (msg) {
|
||||
messages.push(msg as DecodedMessage);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Messages are ordered from oldest to latest within a page (1 page query)
|
||||
expect(bytesToUtf8(messages[0].payload!)).to.eq(asymText);
|
||||
expect(bytesToUtf8(messages[1].payload!)).to.eq(symText);
|
||||
expect(bytesToUtf8(messages[2].payload!)).to.eq(clearText);
|
||||
expect(messages?.length).eq(3);
|
||||
});
|
||||
|
||||
it("Ordered callback, aborts when callback returns true", async function () {
|
||||
await sendMessagesToMultipleNodes(
|
||||
serviceNodesFleet.nodes,
|
||||
totalMsgs,
|
||||
TestDecoder.contentTopic,
|
||||
TestDecoder.pubsubTopic
|
||||
);
|
||||
|
||||
const desiredMsgs = 14;
|
||||
const messages: IMessage[] = [];
|
||||
await waku.store.queryWithOrderedCallback(
|
||||
[TestDecoder],
|
||||
async (msg) => {
|
||||
messages.push(msg);
|
||||
return messages.length >= desiredMsgs;
|
||||
},
|
||||
{ pageSize: 7 }
|
||||
);
|
||||
|
||||
expect(messages?.length).eq(desiredMsgs);
|
||||
});
|
||||
|
||||
it("Query generator for 2000 messages", async function () {
|
||||
this.timeout(40000);
|
||||
await sendMessagesToMultipleNodes(
|
||||
serviceNodesFleet.nodes,
|
||||
2000,
|
||||
TestDecoder.contentTopic,
|
||||
TestDecoder.pubsubTopic
|
||||
);
|
||||
|
||||
const messages = await processQueriedMessages(
|
||||
waku,
|
||||
[TestDecoder],
|
||||
TestDecoder.pubsubTopic
|
||||
);
|
||||
|
||||
expect(messages?.length).eq(2000);
|
||||
});
|
||||
});
|
Loading…
Reference in New Issue