new store tests

This commit is contained in:
fbarbu15 2023-10-04 15:40:50 +03:00
parent 00cb2c6b28
commit 8fbb9c0c1b
No known key found for this signature in database
GPG Key ID: D75221C8DEA22501
7 changed files with 539 additions and 135 deletions

View File

@ -0,0 +1,147 @@
import { createCursor, DecodedMessage, DefaultPubSubTopic } from "@waku/core";
import type { LightNode } from "@waku/interfaces";
import { bytesToUtf8 } from "@waku/utils/bytes";
import { expect } from "chai";
import { makeLogFileName, NimGoNode, tearDownNodes } from "../../src/index.js";
import {
customPubSubTopic,
sendMessages,
startAndConnectLightNode,
TestContentTopic,
TestDecoder,
totalMsgs
} from "./utils.js";
describe("Waku Store, cursor", function () {
this.timeout(15000);
let waku: LightNode;
let nwaku: NimGoNode;
beforeEach(async function () {
this.timeout(15000);
nwaku = new NimGoNode(makeLogFileName(this));
await nwaku.startWithRetries({ store: true, lightpush: true, relay: true });
await nwaku.ensureSubscriptions();
});
afterEach(async function () {
this.timeout(15000);
await tearDownNodes([nwaku], [waku]);
});
[
[2, 4],
[0, 20],
[10, 40],
[19, 20],
[19, 50],
[110, 120]
].forEach(([cursorIndex, messageCount]) => {
it(`Passing a valid cursor at ${cursorIndex} index when there are ${messageCount} messages`, async function () {
await sendMessages(
nwaku,
messageCount,
TestContentTopic,
DefaultPubSubTopic
);
waku = await startAndConnectLightNode(nwaku);
// messages in reversed order (first message at last index)
const messages: DecodedMessage[] = [];
for await (const page of waku.store.queryGenerator([TestDecoder])) {
for await (const msg of page.reverse()) {
messages.push(msg as DecodedMessage);
}
}
// create cursor to extract messages after the cursorIndex
const cursor = await createCursor(messages[cursorIndex]);
// cursor.digest = new Uint8Array([]);
const messagesAfterCursor: DecodedMessage[] = [];
for await (const page of waku.store.queryGenerator([TestDecoder], {
cursor
})) {
for await (const msg of page.reverse()) {
if (msg) {
messagesAfterCursor.push(msg as DecodedMessage);
}
}
}
expect(messages.length).be.eql(messageCount);
expect(messagesAfterCursor.length).be.eql(messageCount - cursorIndex - 1);
if (cursorIndex == messages.length - 1) {
// in this case the cursor will return nothin because it points at the end of the list
expect(messagesAfterCursor).be.eql([]);
} else {
expect(bytesToUtf8(messagesAfterCursor[0].payload)).to.be.eq(
bytesToUtf8(messages[cursorIndex + 1].payload)
);
expect(
bytesToUtf8(
messagesAfterCursor[messagesAfterCursor.length - 1].payload
)
).to.be.eq(bytesToUtf8(messages[messages.length - 1].payload));
}
});
});
it("Passing cursor with wrong message digest", async function () {
await sendMessages(nwaku, totalMsgs, TestContentTopic, DefaultPubSubTopic);
waku = await startAndConnectLightNode(nwaku);
const messages: DecodedMessage[] = [];
for await (const page of waku.store.queryGenerator([TestDecoder])) {
for await (const msg of page.reverse()) {
messages.push(msg as DecodedMessage);
}
}
const cursor = await createCursor(messages[5]);
// setting a wrong digest
cursor.digest = new Uint8Array([]);
const messagesAfterCursor: DecodedMessage[] = [];
for await (const page of waku.store.queryGenerator([TestDecoder], {
cursor
})) {
for await (const msg of page.reverse()) {
if (msg) {
messagesAfterCursor.push(msg as DecodedMessage);
}
}
}
expect(messagesAfterCursor.length).be.eql(0);
});
// Skipped because of strange results. Generator retrieves messages even if cursor is using a different customPubSubTopic.
// My guess is that pubsubTopic is not used. Need to confirm
it("Passing cursor with wrong pubSubTopic", async function () {
await sendMessages(nwaku, totalMsgs, TestContentTopic, DefaultPubSubTopic);
waku = await startAndConnectLightNode(nwaku);
const messages: DecodedMessage[] = [];
for await (const page of waku.store.queryGenerator([TestDecoder])) {
for await (const msg of page.reverse()) {
messages.push(msg as DecodedMessage);
}
}
const cursor = await createCursor(messages[5], customPubSubTopic);
const messagesAfterCursor: DecodedMessage[] = [];
for await (const page of waku.store.queryGenerator([TestDecoder], {
cursor
})) {
for await (const msg of page.reverse()) {
if (msg) {
messagesAfterCursor.push(msg as DecodedMessage);
}
}
}
expect(messagesAfterCursor.length).be.eql(0);
});
});

View File

@ -0,0 +1,224 @@
import { DefaultPubSubTopic } from "@waku/core";
import { IMessage, type LightNode } from "@waku/interfaces";
import { expect } from "chai";
import { makeLogFileName, NimGoNode, tearDownNodes } from "../../src/index.js";
import {
customPubSubTopic,
customTestDecoder,
processQueriedMessages,
startAndConnectLightNode,
TestDecoder
} from "./utils.js";
describe("Waku Store, Error Handling", function () {
this.timeout(15000);
let waku: LightNode;
let nwaku: NimGoNode;
beforeEach(async function () {
this.timeout(15000);
nwaku = new NimGoNode(makeLogFileName(this));
await nwaku.startWithRetries({ store: true, lightpush: true, relay: true });
await nwaku.ensureSubscriptions();
waku = await startAndConnectLightNode(nwaku);
});
afterEach(async function () {
this.timeout(15000);
await tearDownNodes([nwaku], [waku]);
});
it("Query Generator, Wrong PubSubTopic", async function () {
try {
for await (const msgPromises of waku.store.queryGenerator([
customTestDecoder
])) {
msgPromises;
}
throw new Error("QueryGenerator was successful but was expected to fail");
} catch (err) {
if (
!(err instanceof Error) ||
!err.message.includes(
`PubSub topic ${customPubSubTopic} has not been configured on this instance. Configured topics are: ${DefaultPubSubTopic}`
)
) {
throw err;
}
}
});
it("Query Generator, Multiple PubSubTopics", async function () {
try {
for await (const msgPromises of waku.store.queryGenerator([
TestDecoder,
customTestDecoder
])) {
msgPromises;
}
throw new Error("QueryGenerator was successful but was expected to fail");
} catch (err) {
if (
!(err instanceof Error) ||
!err.message.includes(
"API does not support querying multiple pubsub topics at once"
)
) {
throw err;
}
}
});
it("Query Generator, No Decoder", async function () {
try {
for await (const msgPromises of waku.store.queryGenerator([])) {
msgPromises;
}
throw new Error("QueryGenerator was successful but was expected to fail");
} catch (err) {
if (
!(err instanceof Error) ||
!err.message.includes("No decoders provided")
) {
throw err;
}
}
});
it("Query Generator, No message returned", async function () {
const messages = await processQueriedMessages(
waku,
[TestDecoder],
DefaultPubSubTopic
);
expect(messages?.length).eq(0);
});
it("Query with Ordered Callback, Wrong PubSubTopic", async function () {
try {
await waku.store.queryWithOrderedCallback(
[customTestDecoder],
async () => {}
);
throw new Error("QueryGenerator was successful but was expected to fail");
} catch (err) {
if (
!(err instanceof Error) ||
!err.message.includes(
`PubSub topic ${customPubSubTopic} has not been configured on this instance. Configured topics are: ${DefaultPubSubTopic}`
)
) {
throw err;
}
}
});
it("Query with Ordered Callback, Multiple PubSubTopics", async function () {
try {
await waku.store.queryWithOrderedCallback(
[TestDecoder, customTestDecoder],
async () => {}
);
throw new Error("QueryGenerator was successful but was expected to fail");
} catch (err) {
if (
!(err instanceof Error) ||
!err.message.includes(
"API does not support querying multiple pubsub topics at once"
)
) {
throw err;
}
}
});
it("Query with Ordered Callback, No Decoder", async function () {
try {
await waku.store.queryWithOrderedCallback([], async () => {});
throw new Error("QueryGenerator was successful but was expected to fail");
} catch (err) {
if (
!(err instanceof Error) ||
!err.message.includes("No decoders provided")
) {
throw err;
}
}
});
it("Query with Ordered Callback, No message returned", async function () {
const messages: IMessage[] = [];
await waku.store.queryWithOrderedCallback([TestDecoder], async (msg) => {
messages.push(msg);
});
expect(messages?.length).eq(0);
});
it("Query with Promise Callback, Wrong PubSubTopic", async function () {
try {
await waku.store.queryWithPromiseCallback(
[customTestDecoder],
async () => {}
);
throw new Error("QueryGenerator was successful but was expected to fail");
} catch (err) {
if (
!(err instanceof Error) ||
!err.message.includes(
`PubSub topic ${customPubSubTopic} has not been configured on this instance. Configured topics are: ${DefaultPubSubTopic}`
)
) {
throw err;
}
}
});
it("Query with Promise Callback, Multiple PubSubTopics", async function () {
try {
await waku.store.queryWithPromiseCallback(
[TestDecoder, customTestDecoder],
async () => {}
);
throw new Error("QueryGenerator was successful but was expected to fail");
} catch (err) {
if (
!(err instanceof Error) ||
!err.message.includes(
"API does not support querying multiple pubsub topics at once"
)
) {
throw err;
}
}
});
it("Query with Promise Callback, No Decoder", async function () {
try {
await waku.store.queryWithPromiseCallback([], async () => {});
throw new Error("QueryGenerator was successful but was expected to fail");
} catch (err) {
if (
!(err instanceof Error) ||
!err.message.includes("No decoders provided")
) {
throw err;
}
}
});
it("Query with Promise Callback, No message returned", async function () {
const messages: IMessage[] = [];
await waku.store.queryWithPromiseCallback(
[TestDecoder],
async (msgPromise) => {
const msg = await msgPromise;
if (msg) {
messages.push(msg);
}
}
);
expect(messages?.length).eq(0);
});
});

View File

@ -1,9 +1,7 @@
import {
createCursor,
createDecoder,
DecodedMessage,
DefaultPubSubTopic,
PageDirection,
waitForRemotePeer
} from "@waku/core";
import type { IMessage, LightNode } from "@waku/interfaces";
@ -36,7 +34,7 @@ import {
customContentTopic,
log,
messageText,
processMessages,
processQueriedMessages,
sendMessages,
startAndConnectLightNode,
TestContentTopic,
@ -45,6 +43,8 @@ import {
totalMsgs
} from "./utils.js";
const secondDecoder = createDecoder(customContentTopic, DefaultPubSubTopic);
describe("Waku Store", function () {
this.timeout(15000);
let waku: LightNode;
@ -55,6 +55,7 @@ describe("Waku Store", function () {
this.timeout(15000);
nwaku = new NimGoNode(makeLogFileName(this));
await nwaku.startWithRetries({ store: true, lightpush: true, relay: true });
await nwaku.ensureSubscriptions();
});
afterEach(async function () {
@ -65,7 +66,7 @@ describe("Waku Store", function () {
it("Query generator for multiple messages", async function () {
await sendMessages(nwaku, totalMsgs, TestContentTopic, DefaultPubSubTopic);
waku = await startAndConnectLightNode(nwaku);
const messages = await processMessages(
const messages = await processQueriedMessages(
waku,
[TestDecoder],
DefaultPubSubTopic
@ -96,7 +97,7 @@ describe("Waku Store", function () {
waku = await startAndConnectLightNode(nwaku);
const messageCollector = new MessageCollector();
messageCollector.list = await processMessages(
messageCollector.list = await processQueriedMessages(
waku,
[TestDecoder],
DefaultPubSubTopic
@ -127,10 +128,8 @@ describe("Waku Store", function () {
);
waku = await startAndConnectLightNode(nwaku);
const secondDecoder = createDecoder(customContentTopic, DefaultPubSubTopic);
const messageCollector = new MessageCollector();
messageCollector.list = await processMessages(
messageCollector.list = await processQueriedMessages(
waku,
[TestDecoder, secondDecoder],
DefaultPubSubTopic
@ -155,73 +154,17 @@ describe("Waku Store", function () {
waku = await startAndConnectLightNode(nwaku);
let localPromises: Promise<void>[] = [];
for (const testItem of TEST_STRING) {
for await (const msgPromises of waku.store.queryGenerator([
for await (const query of waku.store.queryGenerator([
createDecoder(testItem["value"])
])) {
const _promises = msgPromises.map(async (promise) => {
const msg = await promise;
if (msg) {
expect(
areUint8ArraysEqual(msg.payload, utf8ToBytes(messageText))
).to.eq(true);
}
});
localPromises = localPromises.concat(_promises);
}
await Promise.all(localPromises);
}
});
it("Query generator, no message returned", async function () {
waku = await startAndConnectLightNode(nwaku);
const messages = await processMessages(
waku,
[TestDecoder],
DefaultPubSubTopic
);
expect(messages?.length).eq(0);
});
it("Passing a cursor", async function () {
await sendMessages(nwaku, totalMsgs, TestContentTopic, DefaultPubSubTopic);
waku = await startAndConnectLightNode(nwaku);
const query = waku.store.queryGenerator([TestDecoder]);
// messages in reversed order (first message at last index)
const messages: DecodedMessage[] = [];
for await (const page of query) {
for await (const msg of page.reverse()) {
messages.push(msg as DecodedMessage);
for await (const msg of query) {
expect(
areUint8ArraysEqual(msg!.payload, utf8ToBytes(messageText))
).to.eq(true);
}
}
}
// index 2 would mean the third last message sent
const cursorIndex = 2;
// create cursor to extract messages after the 3rd index
const cursor = await createCursor(messages[cursorIndex]);
const messagesAfterCursor: DecodedMessage[] = [];
for await (const page of waku.store.queryGenerator([TestDecoder], {
cursor
})) {
for await (const msg of page.reverse()) {
messagesAfterCursor.push(msg as DecodedMessage);
}
}
const testMessage = messagesAfterCursor[0];
expect(messages.length).be.eq(totalMsgs);
expect(bytesToUtf8(testMessage.payload)).to.be.eq(
bytesToUtf8(messages[cursorIndex + 1].payload)
);
});
it("Callback on promise", async function () {
@ -267,48 +210,6 @@ describe("Waku Store", function () {
expect(messages?.length).eq(desiredMsgs);
});
it("Ordered Callback - Forward", async function () {
await sendMessages(nwaku, totalMsgs, TestContentTopic, DefaultPubSubTopic);
waku = await startAndConnectLightNode(nwaku);
const messages: IMessage[] = [];
await waku.store.queryWithOrderedCallback(
[TestDecoder],
async (msg) => {
messages.push(msg);
},
{
pageDirection: PageDirection.FORWARD
}
);
expect(messages?.length).eq(totalMsgs);
const payloads = messages.map((msg) => msg.payload[0]!);
expect(payloads).to.deep.eq(Array.from(Array(totalMsgs).keys()));
});
it("Ordered Callback - Backward", async function () {
await sendMessages(nwaku, totalMsgs, TestContentTopic, DefaultPubSubTopic);
waku = await startAndConnectLightNode(nwaku);
let messages: IMessage[] = [];
await waku.store.queryWithOrderedCallback(
[TestDecoder],
async (msg) => {
messages.push(msg);
},
{
pageDirection: PageDirection.BACKWARD
}
);
messages = messages.reverse();
expect(messages?.length).eq(totalMsgs);
const payloads = messages.map((msg) => msg.payload![0]!);
expect(payloads).to.deep.eq(Array.from(Array(totalMsgs).keys()));
});
it("Generator, with asymmetric & symmetric encrypted messages", async function () {
const asymText = "This message is encrypted for me using asymmetric";
const asymTopic = "/test/1/asymmetric/proto";
@ -382,15 +283,14 @@ describe("Waku Store", function () {
const messages: DecodedMessage[] = [];
log("Retrieve messages from store");
for await (const msgPromises of waku2.store.queryGenerator([
for await (const query of waku2.store.queryGenerator([
eciesDecoder,
symDecoder,
TestDecoder
])) {
for (const promise of msgPromises) {
const msg = await promise;
for await (const msg of query) {
if (msg) {
messages.push(msg);
messages.push(msg as DecodedMessage);
}
}
}
@ -437,8 +337,6 @@ describe("Waku Store", function () {
).to.be.true;
}
waku = await startAndConnectLightNode(nwaku);
const firstMessages: IMessage[] = [];
await waku.store.queryWithOrderedCallback(
[TestDecoder],

View File

@ -14,7 +14,7 @@ import {
customContentTopic,
customPubSubTopic,
customTestDecoder,
processMessages,
processQueriedMessages,
sendMessages,
startAndConnectLightNode,
TestContentTopic,
@ -47,7 +47,7 @@ describe("Waku Store, custom pubsub topic", function () {
it("Generator, custom pubsub topic", async function () {
await sendMessages(nwaku, totalMsgs, customContentTopic, customPubSubTopic);
waku = await startAndConnectLightNode(nwaku, [customPubSubTopic]);
const messages = await processMessages(
const messages = await processQueriedMessages(
waku,
[customTestDecoder],
customPubSubTopic
@ -72,7 +72,7 @@ describe("Waku Store, custom pubsub topic", function () {
DefaultPubSubTopic
]);
const customMessages = await processMessages(
const customMessages = await processQueriedMessages(
waku,
[customTestDecoder],
customPubSubTopic
@ -83,7 +83,7 @@ describe("Waku Store, custom pubsub topic", function () {
});
expect(result1).to.not.eq(-1);
const testMessages = await processMessages(
const testMessages = await processQueriedMessages(
waku,
[TestDecoder],
DefaultPubSubTopic
@ -128,12 +128,12 @@ describe("Waku Store, custom pubsub topic", function () {
customMessages.length != totalMsgs ||
testMessages.length != totalMsgs
) {
customMessages = await processMessages(
customMessages = await processQueriedMessages(
waku,
[customTestDecoder],
customPubSubTopic
);
testMessages = await processMessages(
testMessages = await processQueriedMessages(
waku,
[TestDecoder],
DefaultPubSubTopic

View File

@ -0,0 +1,129 @@
import { DecodedMessage, DefaultPubSubTopic, PageDirection } from "@waku/core";
import type { IMessage, LightNode } from "@waku/interfaces";
import { expect } from "chai";
import { makeLogFileName, NimGoNode, tearDownNodes } from "../../src/index.js";
import {
chunkAndReverseArray,
sendMessages,
startAndConnectLightNode,
TestContentTopic,
TestDecoder,
totalMsgs
} from "./utils.js";
describe("Waku Store, order", function () {
this.timeout(15000);
let waku: LightNode;
let nwaku: NimGoNode;
beforeEach(async function () {
this.timeout(15000);
nwaku = new NimGoNode(makeLogFileName(this));
await nwaku.startWithRetries({ store: true, lightpush: true, relay: true });
await nwaku.ensureSubscriptions();
});
afterEach(async function () {
this.timeout(15000);
await tearDownNodes([nwaku], [waku]);
});
[PageDirection.FORWARD, PageDirection.BACKWARD].forEach((pageDirection) => {
it(`Query Generator - ${pageDirection}`, async function () {
await sendMessages(
nwaku,
totalMsgs,
TestContentTopic,
DefaultPubSubTopic
);
waku = await startAndConnectLightNode(nwaku);
const messages: IMessage[] = [];
for await (const query of waku.store.queryGenerator([TestDecoder], {
pageDirection: pageDirection
})) {
for await (const msg of query) {
if (msg) {
messages.push(msg as DecodedMessage);
}
}
}
let expectedPayloads = Array.from(Array(totalMsgs).keys());
if (pageDirection === PageDirection.BACKWARD) {
expectedPayloads = chunkAndReverseArray(expectedPayloads, 10);
}
expect(messages?.length).eq(totalMsgs);
const payloads = messages.map((msg) => msg.payload[0]!);
expect(payloads).to.deep.eq(expectedPayloads);
});
});
[PageDirection.FORWARD, PageDirection.BACKWARD].forEach((pageDirection) => {
it(`Promise Callback - ${pageDirection}`, async function () {
await sendMessages(
nwaku,
totalMsgs,
TestContentTopic,
DefaultPubSubTopic
);
waku = await startAndConnectLightNode(nwaku);
const messages: IMessage[] = [];
await waku.store.queryWithPromiseCallback(
[TestDecoder],
async (msgPromise) => {
const msg = await msgPromise;
if (msg) {
messages.push(msg);
}
},
{
pageDirection: pageDirection
}
);
let expectedPayloads = Array.from(Array(totalMsgs).keys());
if (pageDirection === PageDirection.BACKWARD) {
expectedPayloads = chunkAndReverseArray(expectedPayloads, 10);
}
expect(messages?.length).eq(totalMsgs);
const payloads = messages.map((msg) => msg.payload[0]!);
expect(payloads).to.deep.eq(expectedPayloads);
});
});
[PageDirection.FORWARD, PageDirection.BACKWARD].forEach((pageDirection) => {
it(`Ordered Callback - ${pageDirection}`, async function () {
await sendMessages(
nwaku,
totalMsgs,
TestContentTopic,
DefaultPubSubTopic
);
waku = await startAndConnectLightNode(nwaku);
const messages: IMessage[] = [];
await waku.store.queryWithOrderedCallback(
[TestDecoder],
async (msg) => {
messages.push(msg);
},
{
pageDirection: pageDirection
}
);
if (pageDirection === PageDirection.BACKWARD) {
messages.reverse();
}
expect(messages?.length).eq(totalMsgs);
const payloads = messages.map((msg) => msg.payload[0]!);
expect(payloads).to.deep.eq(Array.from(Array(totalMsgs).keys()));
});
});
});

View File

@ -47,25 +47,20 @@ export async function sendMessages(
}
}
export async function processMessages(
export async function processQueriedMessages(
instance: LightNode,
decoders: Array<Decoder>,
expectedTopic: string
expectedTopic?: string
): Promise<DecodedMessage[]> {
const localMessages: DecodedMessage[] = [];
let localPromises: Promise<void>[] = [];
for await (const msgPromises of instance.store.queryGenerator(decoders)) {
const _promises = msgPromises.map(async (promise) => {
const msg = await promise;
for await (const query of instance.store.queryGenerator(decoders)) {
for await (const msg of query) {
if (msg) {
localMessages.push(msg);
expect(msg.pubSubTopic).to.eq(expectedTopic);
localMessages.push(msg as DecodedMessage);
}
});
localPromises = localPromises.concat(_promises);
}
}
await Promise.all(localPromises);
return localMessages;
}
@ -83,3 +78,14 @@ export async function startAndConnectLightNode(
log("Waku node created");
return waku;
}
export function chunkAndReverseArray(
arr: number[],
chunkSize: number
): number[] {
const result: number[] = [];
for (let i = 0; i < arr.length; i += chunkSize) {
result.push(...arr.slice(i, i + chunkSize).reverse());
}
return result.reverse();
}