new store tests

This commit is contained in:
fbarbu15 2023-10-05 19:06:37 +03:00
parent 6c35a51f67
commit c52b895c1e
No known key found for this signature in database
GPG Key ID: D75221C8DEA22501
6 changed files with 312 additions and 70 deletions

View File

@ -12,7 +12,7 @@ import {
TestDecoder TestDecoder
} from "./utils.js"; } from "./utils.js";
describe("Waku Store, Error Handling", function () { describe("Waku Store, error handling", function () {
this.timeout(15000); this.timeout(15000);
let waku: LightNode; let waku: LightNode;
let nwaku: NimGoNode; let nwaku: NimGoNode;

View File

@ -302,75 +302,6 @@ describe("Waku Store", function () {
expect(messages?.length).eq(3); expect(messages?.length).eq(3);
}); });
it("Ordered callback, using start and end time", async function () {
const now = new Date();
const startTime = new Date();
// Set start time 15 seconds in the past
startTime.setTime(now.getTime() - 15 * 1000);
const message1Timestamp = new Date();
// Set first message was 10 seconds in the past
message1Timestamp.setTime(now.getTime() - 10 * 1000);
const message2Timestamp = new Date();
// Set second message 2 seconds in the past
message2Timestamp.setTime(now.getTime() - 2 * 1000);
const messageTimestamps = [message1Timestamp, message2Timestamp];
const endTime = new Date();
// Set end time 1 second in the past
endTime.setTime(now.getTime() - 1000);
await sendMessages(nwaku, 2, TestContentTopic, DefaultPubSubTopic);
waku = await startAndConnectLightNode(nwaku);
for (let i = 0; i < 2; i++) {
expect(
await nwaku.sendMessage(
NimGoNode.toMessageRpcQuery({
payload: new Uint8Array([i]),
contentTopic: TestContentTopic,
timestamp: messageTimestamps[i]
})
)
).to.be.true;
}
const firstMessages: IMessage[] = [];
await waku.store.queryWithOrderedCallback(
[TestDecoder],
(msg) => {
if (msg) {
firstMessages.push(msg);
}
},
{
timeFilter: { startTime, endTime: message1Timestamp }
}
);
const bothMessages: IMessage[] = [];
await waku.store.queryWithOrderedCallback(
[TestDecoder],
async (msg) => {
bothMessages.push(msg);
},
{
timeFilter: {
startTime,
endTime
}
}
);
expect(firstMessages?.length).eq(1);
expect(firstMessages[0].payload![0]!).eq(0);
expect(bothMessages?.length).eq(2);
});
it("Ordered callback, aborts when callback returns true", async function () { it("Ordered callback, aborts when callback returns true", async function () {
await sendMessages(nwaku, totalMsgs, TestContentTopic, DefaultPubSubTopic); await sendMessages(nwaku, totalMsgs, TestContentTopic, DefaultPubSubTopic);
waku = await startAndConnectLightNode(nwaku); waku = await startAndConnectLightNode(nwaku);
@ -388,4 +319,17 @@ describe("Waku Store", function () {
expect(messages?.length).eq(desiredMsgs); expect(messages?.length).eq(desiredMsgs);
}); });
it("Query generator for 2000 messages", async function () {
this.timeout(40000);
await sendMessages(nwaku, 2000, TestContentTopic, DefaultPubSubTopic);
waku = await startAndConnectLightNode(nwaku);
const messages = await processQueriedMessages(
waku,
[TestDecoder],
DefaultPubSubTopic
);
expect(messages?.length).eq(2000);
});
}); });

View File

@ -0,0 +1,94 @@
import { DefaultPubSubTopic } from "@waku/core";
import type { LightNode } from "@waku/interfaces";
import { expect } from "chai";
import { makeLogFileName, NimGoNode, tearDownNodes } from "../../src/index.js";
import {
sendMessages,
startAndConnectLightNode,
TestContentTopic,
TestDecoder
} from "./utils.js";
describe("Waku Store, page size", function () {
this.timeout(15000);
let waku: LightNode;
let nwaku: NimGoNode;
beforeEach(async function () {
this.timeout(15000);
nwaku = new NimGoNode(makeLogFileName(this));
await nwaku.startWithRetries({ store: true, lightpush: true, relay: true });
await nwaku.ensureSubscriptions();
});
afterEach(async function () {
this.timeout(15000);
await tearDownNodes([nwaku], [waku]);
});
[
[0, 30],
[1, 4],
[3, 20],
[10, 10],
[11, 10],
[19, 20],
[110, 120]
].forEach(([pageSize, messageCount]) => {
it(`Passing page size ${pageSize} when there are ${messageCount} messages`, async function () {
await sendMessages(
nwaku,
messageCount,
TestContentTopic,
DefaultPubSubTopic
);
// Determine effectivePageSize for test expectations
let effectivePageSize = pageSize;
if (pageSize === 0) {
effectivePageSize = 20;
} else if (pageSize > 100) {
effectivePageSize = 100;
}
waku = await startAndConnectLightNode(nwaku);
let messagesRetrieved = 0;
for await (const query of waku.store.queryGenerator([TestDecoder], {
pageSize: pageSize
})) {
// Calculate expected page size
const expectedPageSize = Math.min(
effectivePageSize,
messageCount - messagesRetrieved
);
expect(query.length).eq(expectedPageSize);
for await (const msg of query) {
if (msg) {
messagesRetrieved++;
}
}
}
expect(messagesRetrieved).eq(messageCount);
});
});
it("Default pageSize", async function () {
await sendMessages(nwaku, 20, TestContentTopic, DefaultPubSubTopic);
waku = await startAndConnectLightNode(nwaku);
let messagesRetrieved = 0;
for await (const query of waku.store.queryGenerator([TestDecoder])) {
expect(query.length).eq(10);
for await (const msg of query) {
if (msg) {
messagesRetrieved++;
}
}
}
expect(messagesRetrieved).eq(20);
});
});

View File

@ -0,0 +1,110 @@
import { DecodedMessage, DefaultPubSubTopic, PageDirection } from "@waku/core";
import type { IMessage, LightNode } from "@waku/interfaces";
import { makeLogFileName, NimGoNode, tearDownNodes } from "../../src/index.js";
import {
sendMessages,
startAndConnectLightNode,
TestContentTopic,
TestDecoder,
totalMsgs
} from "./utils.js";
describe("Waku Store, sorting", function () {
this.timeout(15000);
let waku: LightNode;
let nwaku: NimGoNode;
beforeEach(async function () {
this.timeout(15000);
nwaku = new NimGoNode(makeLogFileName(this));
await nwaku.startWithRetries({ store: true, lightpush: true, relay: true });
await nwaku.ensureSubscriptions();
});
afterEach(async function () {
this.timeout(15000);
await tearDownNodes([nwaku], [waku]);
});
[PageDirection.FORWARD, PageDirection.BACKWARD].forEach((pageDirection) => {
it(`Query Generator sorting by timestamp while page direction is ${pageDirection}`, async function () {
await sendMessages(
nwaku,
totalMsgs,
TestContentTopic,
DefaultPubSubTopic
);
waku = await startAndConnectLightNode(nwaku);
for await (const query of waku.store.queryGenerator([TestDecoder], {
pageDirection: PageDirection.FORWARD
})) {
const page: IMessage[] = [];
for await (const msg of query) {
if (msg) {
page.push(msg as DecodedMessage);
}
}
// Extract timestamps
const timestamps = page.map(
(msg) => msg.timestamp as unknown as bigint
);
// Check if timestamps are sorted
for (let i = 1; i < timestamps.length; i++) {
if (timestamps[i] < timestamps[i - 1]) {
throw new Error(
`Messages are not sorted by timestamp. Found out of order at index ${i}`
);
}
}
}
});
});
[PageDirection.FORWARD, PageDirection.BACKWARD].forEach((pageDirection) => {
it(`Ordered Callback sorting by timestamp while page direction is ${pageDirection}`, async function () {
await sendMessages(
nwaku,
totalMsgs,
TestContentTopic,
DefaultPubSubTopic
);
waku = await startAndConnectLightNode(nwaku);
const messages: IMessage[] = [];
await waku.store.queryWithOrderedCallback(
[TestDecoder],
async (msg) => {
messages.push(msg);
},
{
pageDirection: pageDirection
}
);
// Extract timestamps
const timestamps = messages.map(
(msg) => msg.timestamp as unknown as bigint
);
// Check if timestamps are sorted
for (let i = 1; i < timestamps.length; i++) {
if (
pageDirection === PageDirection.FORWARD &&
timestamps[i] < timestamps[i - 1]
) {
throw new Error(
`Messages are not sorted by timestamp in FORWARD direction. Found out of order at index ${i}`
);
} else if (
pageDirection === PageDirection.BACKWARD &&
timestamps[i] > timestamps[i - 1]
) {
throw new Error(
`Messages are not sorted by timestamp in BACKWARD direction. Found out of order at index ${i}`
);
}
}
});
});
});

View File

@ -0,0 +1,88 @@
import type { IMessage, LightNode } from "@waku/interfaces";
import { expect } from "chai";
import { makeLogFileName, NimGoNode, tearDownNodes } from "../../src/index.js";
import {
adjustDate,
startAndConnectLightNode,
TestContentTopic,
TestDecoder
} from "./utils.js";
describe("Waku Store, time filter", function () {
this.timeout(15000);
let waku: LightNode;
let nwaku: NimGoNode;
beforeEach(async function () {
this.timeout(15000);
nwaku = new NimGoNode(makeLogFileName(this));
await nwaku.startWithRetries({ store: true, lightpush: true, relay: true });
await nwaku.ensureSubscriptions();
});
afterEach(async function () {
this.timeout(15000);
await tearDownNodes([nwaku], [waku]);
});
[
[-10000, -10, 10],
[-10000, 1, 4],
[-10000, -2, -1],
[-10000, 0, 1000],
[-10000, -1000, 0],
[10000, 4, 1],
[10000, -10, 10]
].forEach(([msgTimeAdjustment, startTime, endTime]) => {
it(`msgTime: ${adjustDate(
new Date(),
msgTimeAdjustment
)}, startTime: ${adjustDate(
adjustDate(new Date(), msgTimeAdjustment),
startTime
)}, endTime: ${adjustDate(
adjustDate(new Date(), msgTimeAdjustment),
endTime
)}`, async function () {
const msgTimestamp = adjustDate(new Date(), msgTimeAdjustment);
expect(
await nwaku.sendMessage(
NimGoNode.toMessageRpcQuery({
payload: new Uint8Array([0]),
contentTopic: TestContentTopic,
timestamp: msgTimestamp
})
)
).to.be.true;
waku = await startAndConnectLightNode(nwaku);
const messages: IMessage[] = [];
await waku.store.queryWithOrderedCallback(
[TestDecoder],
(msg) => {
if (msg) {
messages.push(msg);
}
},
{
timeFilter: {
startTime: adjustDate(msgTimestamp, startTime),
endTime: adjustDate(msgTimestamp, endTime)
}
}
);
// in this context 0 is the messageTimestamp
if ((startTime > 0 && endTime > 0) || (startTime < 0 && endTime < 0)) {
expect(messages.length).eq(0);
} else {
expect(messages.length).eq(1);
expect(messages[0].payload![0]!).eq(0);
}
});
});
});

View File

@ -89,3 +89,9 @@ export function chunkAndReverseArray(
} }
return result.reverse(); return result.reverse();
} }
export const adjustDate = (baseDate: Date, adjustMs: number): Date => {
const adjusted = new Date(baseDate);
adjusted.setTime(adjusted.getTime() + adjustMs);
return adjusted;
};