mirror of
https://github.com/status-im/js-waku.git
synced 2025-02-24 02:48:11 +00:00
Merge pull request #1627 from waku-org/chore/new-store-tests
chore: new store tests
This commit is contained in:
commit
ac000042bf
23
package-lock.json
generated
23
package-lock.json
generated
@ -3961,6 +3961,12 @@
|
|||||||
"dev": true,
|
"dev": true,
|
||||||
"license": "MIT"
|
"license": "MIT"
|
||||||
},
|
},
|
||||||
|
"node_modules/@types/lodash": {
|
||||||
|
"version": "4.14.199",
|
||||||
|
"resolved": "https://registry.npmjs.org/@types/lodash/-/lodash-4.14.199.tgz",
|
||||||
|
"integrity": "sha512-Vrjz5N5Ia4SEzWWgIVwnHNEnb1UE1XMkvY5DGXrAeOGE9imk0hgTHh5GyDjLDJi9OTCn9oo9dXH1uToK1VRfrg==",
|
||||||
|
"dev": true
|
||||||
|
},
|
||||||
"node_modules/@types/markdown-it": {
|
"node_modules/@types/markdown-it": {
|
||||||
"version": "12.2.3",
|
"version": "12.2.3",
|
||||||
"dev": true,
|
"dev": true,
|
||||||
@ -13571,7 +13577,8 @@
|
|||||||
},
|
},
|
||||||
"node_modules/lodash": {
|
"node_modules/lodash": {
|
||||||
"version": "4.17.21",
|
"version": "4.17.21",
|
||||||
"license": "MIT"
|
"resolved": "https://registry.npmjs.org/lodash/-/lodash-4.17.21.tgz",
|
||||||
|
"integrity": "sha512-v2kDEe57lecTulaDIuNTPy3Ry4gLGJ6Z1O3vE1krgXZNrsQ+LFTGHVxVjcXPs17LhbZVGedAJv8XZ1tvj5FvSg=="
|
||||||
},
|
},
|
||||||
"node_modules/lodash-es": {
|
"node_modules/lodash-es": {
|
||||||
"version": "4.17.21",
|
"version": "4.17.21",
|
||||||
@ -26134,6 +26141,7 @@
|
|||||||
"chai-as-promised": "^7.1.1",
|
"chai-as-promised": "^7.1.1",
|
||||||
"debug": "^4.3.4",
|
"debug": "^4.3.4",
|
||||||
"dockerode": "^3.3.5",
|
"dockerode": "^3.3.5",
|
||||||
|
"lodash": "^4.17.21",
|
||||||
"p-retry": "^6.1.0",
|
"p-retry": "^6.1.0",
|
||||||
"p-timeout": "^6.1.0",
|
"p-timeout": "^6.1.0",
|
||||||
"portfinder": "^1.0.32",
|
"portfinder": "^1.0.32",
|
||||||
@ -26144,6 +26152,7 @@
|
|||||||
"@libp2p/bootstrap": "^9.0.2",
|
"@libp2p/bootstrap": "^9.0.2",
|
||||||
"@types/chai": "^4.3.5",
|
"@types/chai": "^4.3.5",
|
||||||
"@types/dockerode": "^3.3.19",
|
"@types/dockerode": "^3.3.19",
|
||||||
|
"@types/lodash": "^4.14.199",
|
||||||
"@types/mocha": "^10.0.1",
|
"@types/mocha": "^10.0.1",
|
||||||
"@types/sinon": "^10.0.16",
|
"@types/sinon": "^10.0.16",
|
||||||
"@types/tail": "^2.2.1",
|
"@types/tail": "^2.2.1",
|
||||||
@ -28698,6 +28707,12 @@
|
|||||||
"version": "3.0.3",
|
"version": "3.0.3",
|
||||||
"dev": true
|
"dev": true
|
||||||
},
|
},
|
||||||
|
"@types/lodash": {
|
||||||
|
"version": "4.14.199",
|
||||||
|
"resolved": "https://registry.npmjs.org/@types/lodash/-/lodash-4.14.199.tgz",
|
||||||
|
"integrity": "sha512-Vrjz5N5Ia4SEzWWgIVwnHNEnb1UE1XMkvY5DGXrAeOGE9imk0hgTHh5GyDjLDJi9OTCn9oo9dXH1uToK1VRfrg==",
|
||||||
|
"dev": true
|
||||||
|
},
|
||||||
"@types/markdown-it": {
|
"@types/markdown-it": {
|
||||||
"version": "12.2.3",
|
"version": "12.2.3",
|
||||||
"dev": true,
|
"dev": true,
|
||||||
@ -29322,6 +29337,7 @@
|
|||||||
"@libp2p/peer-id": "^3.0.2",
|
"@libp2p/peer-id": "^3.0.2",
|
||||||
"@types/chai": "^4.3.5",
|
"@types/chai": "^4.3.5",
|
||||||
"@types/dockerode": "^3.3.19",
|
"@types/dockerode": "^3.3.19",
|
||||||
|
"@types/lodash": "^4.14.199",
|
||||||
"@types/mocha": "^10.0.1",
|
"@types/mocha": "^10.0.1",
|
||||||
"@types/sinon": "^10.0.16",
|
"@types/sinon": "^10.0.16",
|
||||||
"@types/tail": "^2.2.1",
|
"@types/tail": "^2.2.1",
|
||||||
@ -29342,6 +29358,7 @@
|
|||||||
"dockerode": "^3.3.5",
|
"dockerode": "^3.3.5",
|
||||||
"interface-datastore": "^8.2.5",
|
"interface-datastore": "^8.2.5",
|
||||||
"libp2p": "^0.46.12",
|
"libp2p": "^0.46.12",
|
||||||
|
"lodash": "^4.17.21",
|
||||||
"mocha": "^10.2.0",
|
"mocha": "^10.2.0",
|
||||||
"npm-run-all": "^4.1.5",
|
"npm-run-all": "^4.1.5",
|
||||||
"p-retry": "^6.1.0",
|
"p-retry": "^6.1.0",
|
||||||
@ -34746,7 +34763,9 @@
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
"lodash": {
|
"lodash": {
|
||||||
"version": "4.17.21"
|
"version": "4.17.21",
|
||||||
|
"resolved": "https://registry.npmjs.org/lodash/-/lodash-4.17.21.tgz",
|
||||||
|
"integrity": "sha512-v2kDEe57lecTulaDIuNTPy3Ry4gLGJ6Z1O3vE1krgXZNrsQ+LFTGHVxVjcXPs17LhbZVGedAJv8XZ1tvj5FvSg=="
|
||||||
},
|
},
|
||||||
"lodash-es": {
|
"lodash-es": {
|
||||||
"version": "4.17.21"
|
"version": "4.17.21"
|
||||||
|
@ -60,6 +60,7 @@
|
|||||||
"chai-as-promised": "^7.1.1",
|
"chai-as-promised": "^7.1.1",
|
||||||
"debug": "^4.3.4",
|
"debug": "^4.3.4",
|
||||||
"dockerode": "^3.3.5",
|
"dockerode": "^3.3.5",
|
||||||
|
"lodash": "^4.17.21",
|
||||||
"p-retry": "^6.1.0",
|
"p-retry": "^6.1.0",
|
||||||
"p-timeout": "^6.1.0",
|
"p-timeout": "^6.1.0",
|
||||||
"portfinder": "^1.0.32",
|
"portfinder": "^1.0.32",
|
||||||
@ -70,6 +71,7 @@
|
|||||||
"@libp2p/bootstrap": "^9.0.2",
|
"@libp2p/bootstrap": "^9.0.2",
|
||||||
"@types/chai": "^4.3.5",
|
"@types/chai": "^4.3.5",
|
||||||
"@types/dockerode": "^3.3.19",
|
"@types/dockerode": "^3.3.19",
|
||||||
|
"@types/lodash": "^4.14.199",
|
||||||
"@types/mocha": "^10.0.1",
|
"@types/mocha": "^10.0.1",
|
||||||
"@types/sinon": "^10.0.16",
|
"@types/sinon": "^10.0.16",
|
||||||
"@types/tail": "^2.2.1",
|
"@types/tail": "^2.2.1",
|
||||||
|
@ -1,7 +1,8 @@
|
|||||||
import { DecodedMessage, DefaultPubSubTopic } from "@waku/core";
|
import { DecodedMessage, DefaultPubSubTopic } from "@waku/core";
|
||||||
import { bytesToUtf8 } from "@waku/utils/bytes";
|
import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes";
|
||||||
import { AssertionError, expect } from "chai";
|
import { AssertionError, expect } from "chai";
|
||||||
import debug from "debug";
|
import debug from "debug";
|
||||||
|
import isEqual from "lodash/isEqual";
|
||||||
|
|
||||||
import { MessageRpcResponse } from "./node/interfaces.js";
|
import { MessageRpcResponse } from "./node/interfaces.js";
|
||||||
|
|
||||||
@ -36,9 +37,17 @@ export class MessageCollector {
|
|||||||
}
|
}
|
||||||
|
|
||||||
hasMessage(topic: string, text: string): boolean {
|
hasMessage(topic: string, text: string): boolean {
|
||||||
return this.list.some(
|
return this.list.some((message) => {
|
||||||
(message) => message.contentTopic === topic && message.payload === text
|
if (message.contentTopic !== topic) {
|
||||||
);
|
return false;
|
||||||
|
}
|
||||||
|
if (typeof message.payload === "string") {
|
||||||
|
return message.payload === text;
|
||||||
|
} else if (message.payload instanceof Uint8Array) {
|
||||||
|
return isEqual(message.payload, utf8ToBytes(text));
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
// Type guard to determine if a message is of type MessageRpcResponse
|
// Type guard to determine if a message is of type MessageRpcResponse
|
||||||
|
@ -167,8 +167,8 @@ export class NimGoNode {
|
|||||||
async startWithRetries(
|
async startWithRetries(
|
||||||
args: Args,
|
args: Args,
|
||||||
options: {
|
options: {
|
||||||
retries: number;
|
retries?: number;
|
||||||
}
|
} = { retries: 3 }
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
await pRetry(
|
await pRetry(
|
||||||
async () => {
|
async () => {
|
||||||
|
@ -1,23 +1,49 @@
|
|||||||
import { LightNode } from "@waku/interfaces";
|
import { LightNode } from "@waku/interfaces";
|
||||||
import debug from "debug";
|
import debug from "debug";
|
||||||
|
import pRetry from "p-retry";
|
||||||
|
|
||||||
import { NimGoNode } from "./index.js";
|
import { NimGoNode } from "./index.js";
|
||||||
|
|
||||||
const log = debug("waku:test");
|
const log = debug("waku:test");
|
||||||
|
|
||||||
export function tearDownNodes(
|
export async function tearDownNodes(
|
||||||
nwakuNodes: NimGoNode[],
|
nwakuNodes: NimGoNode | NimGoNode[],
|
||||||
wakuNodes: LightNode[]
|
wakuNodes: LightNode | LightNode[]
|
||||||
): void {
|
): Promise<void> {
|
||||||
nwakuNodes.forEach((nwaku) => {
|
const nNodes = Array.isArray(nwakuNodes) ? nwakuNodes : [nwakuNodes];
|
||||||
|
const wNodes = Array.isArray(wakuNodes) ? wakuNodes : [wakuNodes];
|
||||||
|
|
||||||
|
const stopNwakuNodes = nNodes.map(async (nwaku) => {
|
||||||
if (nwaku) {
|
if (nwaku) {
|
||||||
nwaku.stop().catch((e) => log("Nwaku failed to stop", e));
|
await pRetry(
|
||||||
|
async () => {
|
||||||
|
try {
|
||||||
|
await nwaku.stop();
|
||||||
|
} catch (error) {
|
||||||
|
log("Nwaku failed to stop:", error);
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{ retries: 3 }
|
||||||
|
);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
wakuNodes.forEach((waku) => {
|
const stopWakuNodes = wNodes.map(async (waku) => {
|
||||||
if (waku) {
|
if (waku) {
|
||||||
waku.stop().catch((e) => log("Waku failed to stop", e));
|
await pRetry(
|
||||||
|
async () => {
|
||||||
|
try {
|
||||||
|
await waku.stop();
|
||||||
|
} catch (error) {
|
||||||
|
log("Waku failed to stop:", error);
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{ retries: 3 }
|
||||||
|
);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
await Promise.all([...stopNwakuNodes, ...stopWakuNodes]);
|
||||||
}
|
}
|
||||||
|
@ -149,10 +149,12 @@ describe("ConnectionManager", function () {
|
|||||||
let waku: LightNode;
|
let waku: LightNode;
|
||||||
|
|
||||||
this.beforeEach(async function () {
|
this.beforeEach(async function () {
|
||||||
|
this.timeout(15000);
|
||||||
waku = await createLightNode();
|
waku = await createLightNode();
|
||||||
});
|
});
|
||||||
|
|
||||||
afterEach(async () => {
|
afterEach(async () => {
|
||||||
|
this.timeout(15000);
|
||||||
await waku.stop();
|
await waku.stop();
|
||||||
sinon.restore();
|
sinon.restore();
|
||||||
});
|
});
|
||||||
|
@ -50,7 +50,8 @@ describe("Waku Filter V2: Multiple PubSubtopics", function () {
|
|||||||
});
|
});
|
||||||
|
|
||||||
this.afterEach(async function () {
|
this.afterEach(async function () {
|
||||||
tearDownNodes([nwaku, nwaku2], [waku]);
|
this.timeout(15000);
|
||||||
|
await tearDownNodes([nwaku, nwaku2], waku);
|
||||||
});
|
});
|
||||||
|
|
||||||
it("Subscribe and receive messages on custom pubsubtopic", async function () {
|
it("Subscribe and receive messages on custom pubsubtopic", async function () {
|
||||||
|
@ -29,7 +29,8 @@ describe("Waku Filter V2: Ping", function () {
|
|||||||
});
|
});
|
||||||
|
|
||||||
this.afterEach(async function () {
|
this.afterEach(async function () {
|
||||||
tearDownNodes([nwaku], [waku]);
|
this.timeout(15000);
|
||||||
|
await tearDownNodes(nwaku, waku);
|
||||||
});
|
});
|
||||||
|
|
||||||
it("Ping on subscribed peer", async function () {
|
it("Ping on subscribed peer", async function () {
|
||||||
|
@ -37,7 +37,8 @@ describe("Waku Filter V2: FilterPush", function () {
|
|||||||
});
|
});
|
||||||
|
|
||||||
this.afterEach(async function () {
|
this.afterEach(async function () {
|
||||||
tearDownNodes([nwaku], [waku]);
|
this.timeout(15000);
|
||||||
|
await tearDownNodes(nwaku, waku);
|
||||||
});
|
});
|
||||||
|
|
||||||
TEST_STRING.forEach((testItem) => {
|
TEST_STRING.forEach((testItem) => {
|
||||||
|
@ -48,7 +48,8 @@ describe("Waku Filter V2: Subscribe", function () {
|
|||||||
});
|
});
|
||||||
|
|
||||||
this.afterEach(async function () {
|
this.afterEach(async function () {
|
||||||
tearDownNodes([nwaku, nwaku2], [waku]);
|
this.timeout(15000);
|
||||||
|
await tearDownNodes([nwaku, nwaku2], waku);
|
||||||
});
|
});
|
||||||
|
|
||||||
it("Subscribe and receive messages via lightPush", async function () {
|
it("Subscribe and receive messages via lightPush", async function () {
|
||||||
@ -341,7 +342,7 @@ describe("Waku Filter V2: Subscribe", function () {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Check if both messages were received
|
// Check if both messages were received
|
||||||
expect(messageCollector.hasMessage(TestContentTopic, "M1")).to.be.true;
|
expect(messageCollector.hasMessage(TestContentTopic, "M1")).to.eq(true);
|
||||||
expect(messageCollector.hasMessage(newContentTopic, "M2")).to.be.true;
|
expect(messageCollector.hasMessage(newContentTopic, "M2")).to.eq(true);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
@ -34,7 +34,8 @@ describe("Waku Filter V2: Unsubscribe", function () {
|
|||||||
});
|
});
|
||||||
|
|
||||||
this.afterEach(async function () {
|
this.afterEach(async function () {
|
||||||
tearDownNodes([nwaku], [waku]);
|
this.timeout(15000);
|
||||||
|
await tearDownNodes(nwaku, waku);
|
||||||
});
|
});
|
||||||
|
|
||||||
it("Unsubscribe 1 topic - node subscribed to 1 topic", async function () {
|
it("Unsubscribe 1 topic - node subscribed to 1 topic", async function () {
|
||||||
|
@ -35,7 +35,8 @@ describe("Waku Light Push", function () {
|
|||||||
});
|
});
|
||||||
|
|
||||||
this.afterEach(async function () {
|
this.afterEach(async function () {
|
||||||
tearDownNodes([nwaku], [waku]);
|
this.timeout(15000);
|
||||||
|
await tearDownNodes(nwaku, waku);
|
||||||
});
|
});
|
||||||
|
|
||||||
TEST_STRING.forEach((testItem) => {
|
TEST_STRING.forEach((testItem) => {
|
@ -36,7 +36,8 @@ describe("Waku Light Push : Multiple PubSubtopics", function () {
|
|||||||
});
|
});
|
||||||
let nimPeerId: PeerId;
|
let nimPeerId: PeerId;
|
||||||
|
|
||||||
beforeEach(async function () {
|
this.beforeEach(async function () {
|
||||||
|
this.timeout(15000);
|
||||||
[nwaku, waku] = await runNodes(this, [
|
[nwaku, waku] = await runNodes(this, [
|
||||||
customPubSubTopic,
|
customPubSubTopic,
|
||||||
DefaultPubSubTopic
|
DefaultPubSubTopic
|
||||||
@ -46,7 +47,8 @@ describe("Waku Light Push : Multiple PubSubtopics", function () {
|
|||||||
});
|
});
|
||||||
|
|
||||||
this.afterEach(async function () {
|
this.afterEach(async function () {
|
||||||
tearDownNodes([nwaku, nwaku2], [waku]);
|
this.timeout(15000);
|
||||||
|
await tearDownNodes([nwaku, nwaku2], waku);
|
||||||
});
|
});
|
||||||
|
|
||||||
it("Push message on custom pubSubTopic", async function () {
|
it("Push message on custom pubSubTopic", async function () {
|
||||||
|
@ -1,772 +0,0 @@
|
|||||||
import {
|
|
||||||
createCursor,
|
|
||||||
createDecoder,
|
|
||||||
createEncoder,
|
|
||||||
DecodedMessage,
|
|
||||||
Decoder,
|
|
||||||
DefaultPubSubTopic,
|
|
||||||
PageDirection,
|
|
||||||
waitForRemotePeer
|
|
||||||
} from "@waku/core";
|
|
||||||
import type { IMessage, LightNode } from "@waku/interfaces";
|
|
||||||
import { Protocols } from "@waku/interfaces";
|
|
||||||
import {
|
|
||||||
createDecoder as createEciesDecoder,
|
|
||||||
createEncoder as createEciesEncoder,
|
|
||||||
generatePrivateKey,
|
|
||||||
getPublicKey
|
|
||||||
} from "@waku/message-encryption/ecies";
|
|
||||||
import {
|
|
||||||
createDecoder as createSymDecoder,
|
|
||||||
createEncoder as createSymEncoder,
|
|
||||||
generateSymmetricKey
|
|
||||||
} from "@waku/message-encryption/symmetric";
|
|
||||||
import { createLightNode } from "@waku/sdk";
|
|
||||||
import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes";
|
|
||||||
import { expect } from "chai";
|
|
||||||
import debug from "debug";
|
|
||||||
|
|
||||||
import {
|
|
||||||
delay,
|
|
||||||
makeLogFileName,
|
|
||||||
NOISE_KEY_1,
|
|
||||||
NOISE_KEY_2
|
|
||||||
} from "../src/index.js";
|
|
||||||
import { NimGoNode } from "../src/node/node.js";
|
|
||||||
|
|
||||||
const log = debug("waku:test:store");
|
|
||||||
|
|
||||||
const TestContentTopic = "/test/1/waku-store/utf8";
|
|
||||||
const TestEncoder = createEncoder({ contentTopic: TestContentTopic });
|
|
||||||
const TestDecoder = createDecoder(TestContentTopic);
|
|
||||||
|
|
||||||
describe("Waku Store", () => {
|
|
||||||
let waku: LightNode;
|
|
||||||
let nwaku: NimGoNode;
|
|
||||||
|
|
||||||
beforeEach(async function () {
|
|
||||||
this.timeout(15_000);
|
|
||||||
nwaku = new NimGoNode(makeLogFileName(this));
|
|
||||||
await nwaku.start({ store: true, lightpush: true, relay: true });
|
|
||||||
});
|
|
||||||
|
|
||||||
afterEach(async function () {
|
|
||||||
!!nwaku &&
|
|
||||||
nwaku.stop().catch((e) => console.log("Nwaku failed to stop", e));
|
|
||||||
!!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e));
|
|
||||||
});
|
|
||||||
|
|
||||||
it("Generator", async function () {
|
|
||||||
this.timeout(15_000);
|
|
||||||
const totalMsgs = 20;
|
|
||||||
|
|
||||||
for (let i = 0; i < totalMsgs; i++) {
|
|
||||||
expect(
|
|
||||||
await nwaku.sendMessage(
|
|
||||||
NimGoNode.toMessageRpcQuery({
|
|
||||||
payload: new Uint8Array([i]),
|
|
||||||
contentTopic: TestContentTopic
|
|
||||||
})
|
|
||||||
)
|
|
||||||
).to.be.true;
|
|
||||||
}
|
|
||||||
|
|
||||||
waku = await createLightNode({
|
|
||||||
staticNoiseKey: NOISE_KEY_1
|
|
||||||
});
|
|
||||||
await waku.start();
|
|
||||||
await waku.dial(await nwaku.getMultiaddrWithId());
|
|
||||||
await waitForRemotePeer(waku, [Protocols.Store]);
|
|
||||||
|
|
||||||
const messages: IMessage[] = [];
|
|
||||||
let promises: Promise<void>[] = [];
|
|
||||||
for await (const msgPromises of waku.store.queryGenerator([TestDecoder])) {
|
|
||||||
const _promises = msgPromises.map(async (promise) => {
|
|
||||||
const msg = await promise;
|
|
||||||
if (msg) {
|
|
||||||
messages.push(msg);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
promises = promises.concat(_promises);
|
|
||||||
}
|
|
||||||
await Promise.all(promises);
|
|
||||||
|
|
||||||
expect(messages?.length).eq(totalMsgs);
|
|
||||||
const result = messages?.findIndex((msg) => {
|
|
||||||
return msg.payload[0]! === 0;
|
|
||||||
});
|
|
||||||
expect(result).to.not.eq(-1);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("Generator, no message returned", async function () {
|
|
||||||
this.timeout(15_000);
|
|
||||||
|
|
||||||
waku = await createLightNode({
|
|
||||||
staticNoiseKey: NOISE_KEY_1
|
|
||||||
});
|
|
||||||
await waku.start();
|
|
||||||
await waku.dial(await nwaku.getMultiaddrWithId());
|
|
||||||
await waitForRemotePeer(waku, [Protocols.Store]);
|
|
||||||
|
|
||||||
const messages: IMessage[] = [];
|
|
||||||
let promises: Promise<void>[] = [];
|
|
||||||
for await (const msgPromises of waku.store.queryGenerator([TestDecoder])) {
|
|
||||||
const _promises = msgPromises.map(async (promise) => {
|
|
||||||
const msg = await promise;
|
|
||||||
if (msg) {
|
|
||||||
messages.push(msg);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
promises = promises.concat(_promises);
|
|
||||||
}
|
|
||||||
await Promise.all(promises);
|
|
||||||
|
|
||||||
expect(messages?.length).eq(0);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("Passing a cursor", async function () {
|
|
||||||
this.timeout(4_000);
|
|
||||||
const totalMsgs = 20;
|
|
||||||
|
|
||||||
for (let i = 0; i < totalMsgs; i++) {
|
|
||||||
expect(
|
|
||||||
await nwaku.sendMessage(
|
|
||||||
NimGoNode.toMessageRpcQuery({
|
|
||||||
payload: utf8ToBytes(`Message ${i}`),
|
|
||||||
contentTopic: TestContentTopic
|
|
||||||
})
|
|
||||||
)
|
|
||||||
).to.be.true;
|
|
||||||
}
|
|
||||||
|
|
||||||
waku = await createLightNode({
|
|
||||||
staticNoiseKey: NOISE_KEY_1
|
|
||||||
});
|
|
||||||
await waku.start();
|
|
||||||
await waku.dial(await nwaku.getMultiaddrWithId());
|
|
||||||
await waitForRemotePeer(waku, [Protocols.Store]);
|
|
||||||
|
|
||||||
const query = waku.store.queryGenerator([TestDecoder]);
|
|
||||||
|
|
||||||
// messages in reversed order (first message at last index)
|
|
||||||
const messages: DecodedMessage[] = [];
|
|
||||||
for await (const page of query) {
|
|
||||||
for await (const msg of page.reverse()) {
|
|
||||||
messages.push(msg as DecodedMessage);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// index 2 would mean the third last message sent
|
|
||||||
const cursorIndex = 2;
|
|
||||||
|
|
||||||
// create cursor to extract messages after the 3rd index
|
|
||||||
const cursor = await createCursor(messages[cursorIndex]);
|
|
||||||
|
|
||||||
const messagesAfterCursor: DecodedMessage[] = [];
|
|
||||||
for await (const page of waku.store.queryGenerator([TestDecoder], {
|
|
||||||
cursor
|
|
||||||
})) {
|
|
||||||
for await (const msg of page.reverse()) {
|
|
||||||
messagesAfterCursor.push(msg as DecodedMessage);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const testMessage = messagesAfterCursor[0];
|
|
||||||
|
|
||||||
expect(messages.length).be.eq(totalMsgs);
|
|
||||||
|
|
||||||
expect(bytesToUtf8(testMessage.payload)).to.be.eq(
|
|
||||||
bytesToUtf8(messages[cursorIndex + 1].payload)
|
|
||||||
);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("Callback on promise", async function () {
|
|
||||||
this.timeout(15_000);
|
|
||||||
|
|
||||||
const totalMsgs = 15;
|
|
||||||
|
|
||||||
for (let i = 0; i < totalMsgs; i++) {
|
|
||||||
expect(
|
|
||||||
await nwaku.sendMessage(
|
|
||||||
NimGoNode.toMessageRpcQuery({
|
|
||||||
payload: new Uint8Array([i]),
|
|
||||||
contentTopic: TestContentTopic
|
|
||||||
})
|
|
||||||
)
|
|
||||||
).to.be.true;
|
|
||||||
}
|
|
||||||
|
|
||||||
waku = await createLightNode({
|
|
||||||
staticNoiseKey: NOISE_KEY_1
|
|
||||||
});
|
|
||||||
await waku.start();
|
|
||||||
await waku.dial(await nwaku.getMultiaddrWithId());
|
|
||||||
await waitForRemotePeer(waku, [Protocols.Store]);
|
|
||||||
|
|
||||||
const messages: IMessage[] = [];
|
|
||||||
await waku.store.queryWithPromiseCallback(
|
|
||||||
[TestDecoder],
|
|
||||||
async (msgPromise) => {
|
|
||||||
const msg = await msgPromise;
|
|
||||||
if (msg) {
|
|
||||||
messages.push(msg);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
expect(messages?.length).eq(totalMsgs);
|
|
||||||
const result = messages?.findIndex((msg) => {
|
|
||||||
return msg.payload[0]! === 0;
|
|
||||||
});
|
|
||||||
expect(result).to.not.eq(-1);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("Callback on promise, aborts when callback returns true", async function () {
|
|
||||||
this.timeout(15_000);
|
|
||||||
|
|
||||||
const totalMsgs = 20;
|
|
||||||
|
|
||||||
for (let i = 0; i < totalMsgs; i++) {
|
|
||||||
expect(
|
|
||||||
await nwaku.sendMessage(
|
|
||||||
NimGoNode.toMessageRpcQuery({
|
|
||||||
payload: new Uint8Array([i]),
|
|
||||||
contentTopic: TestContentTopic
|
|
||||||
})
|
|
||||||
)
|
|
||||||
).to.be.true;
|
|
||||||
}
|
|
||||||
|
|
||||||
waku = await createLightNode({
|
|
||||||
staticNoiseKey: NOISE_KEY_1
|
|
||||||
});
|
|
||||||
await waku.start();
|
|
||||||
await waku.dial(await nwaku.getMultiaddrWithId());
|
|
||||||
await waitForRemotePeer(waku, [Protocols.Store]);
|
|
||||||
|
|
||||||
const desiredMsgs = 14;
|
|
||||||
const messages: IMessage[] = [];
|
|
||||||
await waku.store.queryWithPromiseCallback(
|
|
||||||
[TestDecoder],
|
|
||||||
async (msgPromise) => {
|
|
||||||
const msg = await msgPromise;
|
|
||||||
if (msg) {
|
|
||||||
messages.push(msg);
|
|
||||||
}
|
|
||||||
return messages.length >= desiredMsgs;
|
|
||||||
},
|
|
||||||
{ pageSize: 7 }
|
|
||||||
);
|
|
||||||
|
|
||||||
expect(messages?.length).eq(desiredMsgs);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("Ordered Callback - Forward", async function () {
|
|
||||||
this.timeout(15_000);
|
|
||||||
|
|
||||||
const totalMsgs = 18;
|
|
||||||
for (let i = 0; i < totalMsgs; i++) {
|
|
||||||
expect(
|
|
||||||
await nwaku.sendMessage(
|
|
||||||
NimGoNode.toMessageRpcQuery({
|
|
||||||
payload: new Uint8Array([i]),
|
|
||||||
contentTopic: TestContentTopic
|
|
||||||
})
|
|
||||||
)
|
|
||||||
).to.be.true;
|
|
||||||
await delay(1); // to ensure each timestamp is unique.
|
|
||||||
}
|
|
||||||
|
|
||||||
waku = await createLightNode({
|
|
||||||
staticNoiseKey: NOISE_KEY_1
|
|
||||||
});
|
|
||||||
await waku.start();
|
|
||||||
await waku.dial(await nwaku.getMultiaddrWithId());
|
|
||||||
await waitForRemotePeer(waku, [Protocols.Store]);
|
|
||||||
|
|
||||||
const messages: IMessage[] = [];
|
|
||||||
await waku.store.queryWithOrderedCallback(
|
|
||||||
[TestDecoder],
|
|
||||||
async (msg) => {
|
|
||||||
messages.push(msg);
|
|
||||||
},
|
|
||||||
{
|
|
||||||
pageDirection: PageDirection.FORWARD
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
expect(messages?.length).eq(totalMsgs);
|
|
||||||
const payloads = messages.map((msg) => msg.payload[0]!);
|
|
||||||
expect(payloads).to.deep.eq(Array.from(Array(totalMsgs).keys()));
|
|
||||||
});
|
|
||||||
|
|
||||||
it("Ordered Callback - Backward", async function () {
|
|
||||||
this.timeout(15_000);
|
|
||||||
|
|
||||||
const totalMsgs = 18;
|
|
||||||
for (let i = 0; i < totalMsgs; i++) {
|
|
||||||
expect(
|
|
||||||
await nwaku.sendMessage(
|
|
||||||
NimGoNode.toMessageRpcQuery({
|
|
||||||
payload: new Uint8Array([i]),
|
|
||||||
contentTopic: TestContentTopic
|
|
||||||
})
|
|
||||||
)
|
|
||||||
).to.be.true;
|
|
||||||
await delay(1); // to ensure each timestamp is unique.
|
|
||||||
}
|
|
||||||
|
|
||||||
waku = await createLightNode({
|
|
||||||
staticNoiseKey: NOISE_KEY_1
|
|
||||||
});
|
|
||||||
await waku.start();
|
|
||||||
await waku.dial(await nwaku.getMultiaddrWithId());
|
|
||||||
await waitForRemotePeer(waku, [Protocols.Store]);
|
|
||||||
|
|
||||||
let messages: IMessage[] = [];
|
|
||||||
await waku.store.queryWithOrderedCallback(
|
|
||||||
[TestDecoder],
|
|
||||||
async (msg) => {
|
|
||||||
messages.push(msg);
|
|
||||||
},
|
|
||||||
{
|
|
||||||
pageDirection: PageDirection.BACKWARD
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
messages = messages.reverse();
|
|
||||||
|
|
||||||
expect(messages?.length).eq(totalMsgs);
|
|
||||||
const payloads = messages.map((msg) => msg.payload![0]!);
|
|
||||||
expect(payloads).to.deep.eq(Array.from(Array(totalMsgs).keys()));
|
|
||||||
});
|
|
||||||
|
|
||||||
it("Generator, with asymmetric & symmetric encrypted messages", async function () {
|
|
||||||
this.timeout(15_000);
|
|
||||||
|
|
||||||
const asymText = "This message is encrypted for me using asymmetric";
|
|
||||||
const asymTopic = "/test/1/asymmetric/proto";
|
|
||||||
const symText =
|
|
||||||
"This message is encrypted for me using symmetric encryption";
|
|
||||||
const symTopic = "/test/1/symmetric/proto";
|
|
||||||
const clearText = "This is a clear text message for everyone to read";
|
|
||||||
const otherText =
|
|
||||||
"This message is not for and I must not be able to read it";
|
|
||||||
|
|
||||||
const timestamp = new Date();
|
|
||||||
|
|
||||||
const asymMsg = { payload: utf8ToBytes(asymText), timestamp };
|
|
||||||
const symMsg = {
|
|
||||||
payload: utf8ToBytes(symText),
|
|
||||||
timestamp: new Date(timestamp.valueOf() + 1)
|
|
||||||
};
|
|
||||||
const clearMsg = {
|
|
||||||
payload: utf8ToBytes(clearText),
|
|
||||||
timestamp: new Date(timestamp.valueOf() + 2)
|
|
||||||
};
|
|
||||||
const otherMsg = {
|
|
||||||
payload: utf8ToBytes(otherText),
|
|
||||||
timestamp: new Date(timestamp.valueOf() + 3)
|
|
||||||
};
|
|
||||||
|
|
||||||
const privateKey = generatePrivateKey();
|
|
||||||
const symKey = generateSymmetricKey();
|
|
||||||
const publicKey = getPublicKey(privateKey);
|
|
||||||
|
|
||||||
const eciesEncoder = createEciesEncoder({
|
|
||||||
contentTopic: asymTopic,
|
|
||||||
publicKey
|
|
||||||
});
|
|
||||||
const symEncoder = createSymEncoder({
|
|
||||||
contentTopic: symTopic,
|
|
||||||
symKey
|
|
||||||
});
|
|
||||||
|
|
||||||
const otherEncoder = createEciesEncoder({
|
|
||||||
contentTopic: TestContentTopic,
|
|
||||||
publicKey: getPublicKey(generatePrivateKey())
|
|
||||||
});
|
|
||||||
|
|
||||||
const eciesDecoder = createEciesDecoder(asymTopic, privateKey);
|
|
||||||
const symDecoder = createSymDecoder(symTopic, symKey);
|
|
||||||
|
|
||||||
const [waku1, waku2, nimWakuMultiaddr] = await Promise.all([
|
|
||||||
createLightNode({
|
|
||||||
staticNoiseKey: NOISE_KEY_1
|
|
||||||
}).then((waku) => waku.start().then(() => waku)),
|
|
||||||
createLightNode({
|
|
||||||
staticNoiseKey: NOISE_KEY_2
|
|
||||||
}).then((waku) => waku.start().then(() => waku)),
|
|
||||||
nwaku.getMultiaddrWithId()
|
|
||||||
]);
|
|
||||||
|
|
||||||
log("Waku nodes created");
|
|
||||||
|
|
||||||
await Promise.all([
|
|
||||||
waku1.dial(nimWakuMultiaddr),
|
|
||||||
waku2.dial(nimWakuMultiaddr)
|
|
||||||
]);
|
|
||||||
|
|
||||||
log("Waku nodes connected to nwaku");
|
|
||||||
|
|
||||||
await waitForRemotePeer(waku1, [Protocols.LightPush]);
|
|
||||||
|
|
||||||
log("Sending messages using light push");
|
|
||||||
await Promise.all([
|
|
||||||
waku1.lightPush.send(eciesEncoder, asymMsg),
|
|
||||||
waku1.lightPush.send(symEncoder, symMsg),
|
|
||||||
waku1.lightPush.send(otherEncoder, otherMsg),
|
|
||||||
waku1.lightPush.send(TestEncoder, clearMsg)
|
|
||||||
]);
|
|
||||||
|
|
||||||
await waitForRemotePeer(waku2, [Protocols.Store]);
|
|
||||||
|
|
||||||
const messages: DecodedMessage[] = [];
|
|
||||||
log("Retrieve messages from store");
|
|
||||||
|
|
||||||
for await (const msgPromises of waku2.store.queryGenerator([
|
|
||||||
eciesDecoder,
|
|
||||||
symDecoder,
|
|
||||||
TestDecoder
|
|
||||||
])) {
|
|
||||||
for (const promise of msgPromises) {
|
|
||||||
const msg = await promise;
|
|
||||||
if (msg) {
|
|
||||||
messages.push(msg);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Messages are ordered from oldest to latest within a page (1 page query)
|
|
||||||
expect(bytesToUtf8(messages[0].payload!)).to.eq(asymText);
|
|
||||||
expect(bytesToUtf8(messages[1].payload!)).to.eq(symText);
|
|
||||||
expect(bytesToUtf8(messages[2].payload!)).to.eq(clearText);
|
|
||||||
expect(messages?.length).eq(3);
|
|
||||||
|
|
||||||
!!waku1 && waku1.stop().catch((e) => console.log("Waku failed to stop", e));
|
|
||||||
!!waku2 && waku2.stop().catch((e) => console.log("Waku failed to stop", e));
|
|
||||||
});
|
|
||||||
|
|
||||||
it("Ordered callback, using start and end time", async function () {
|
|
||||||
this.timeout(20000);
|
|
||||||
|
|
||||||
const now = new Date();
|
|
||||||
|
|
||||||
const startTime = new Date();
|
|
||||||
// Set start time 15 seconds in the past
|
|
||||||
startTime.setTime(now.getTime() - 15 * 1000);
|
|
||||||
|
|
||||||
const message1Timestamp = new Date();
|
|
||||||
// Set first message was 10 seconds in the past
|
|
||||||
message1Timestamp.setTime(now.getTime() - 10 * 1000);
|
|
||||||
|
|
||||||
const message2Timestamp = new Date();
|
|
||||||
// Set second message 2 seconds in the past
|
|
||||||
message2Timestamp.setTime(now.getTime() - 2 * 1000);
|
|
||||||
const messageTimestamps = [message1Timestamp, message2Timestamp];
|
|
||||||
|
|
||||||
const endTime = new Date();
|
|
||||||
// Set end time 1 second in the past
|
|
||||||
endTime.setTime(now.getTime() - 1000);
|
|
||||||
|
|
||||||
for (let i = 0; i < 2; i++) {
|
|
||||||
expect(
|
|
||||||
await nwaku.sendMessage(
|
|
||||||
NimGoNode.toMessageRpcQuery({
|
|
||||||
payload: new Uint8Array([i]),
|
|
||||||
contentTopic: TestContentTopic,
|
|
||||||
timestamp: messageTimestamps[i]
|
|
||||||
})
|
|
||||||
)
|
|
||||||
).to.be.true;
|
|
||||||
}
|
|
||||||
|
|
||||||
waku = await createLightNode({
|
|
||||||
staticNoiseKey: NOISE_KEY_1
|
|
||||||
});
|
|
||||||
await waku.start();
|
|
||||||
await waku.dial(await nwaku.getMultiaddrWithId());
|
|
||||||
await waitForRemotePeer(waku, [Protocols.Store]);
|
|
||||||
|
|
||||||
const firstMessages: IMessage[] = [];
|
|
||||||
await waku.store.queryWithOrderedCallback(
|
|
||||||
[TestDecoder],
|
|
||||||
(msg) => {
|
|
||||||
if (msg) {
|
|
||||||
firstMessages.push(msg);
|
|
||||||
}
|
|
||||||
},
|
|
||||||
{
|
|
||||||
timeFilter: { startTime, endTime: message1Timestamp }
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
const bothMessages: IMessage[] = [];
|
|
||||||
await waku.store.queryWithOrderedCallback(
|
|
||||||
[TestDecoder],
|
|
||||||
async (msg) => {
|
|
||||||
bothMessages.push(msg);
|
|
||||||
},
|
|
||||||
{
|
|
||||||
timeFilter: {
|
|
||||||
startTime,
|
|
||||||
endTime
|
|
||||||
}
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
expect(firstMessages?.length).eq(1);
|
|
||||||
|
|
||||||
expect(firstMessages[0].payload![0]!).eq(0);
|
|
||||||
|
|
||||||
expect(bothMessages?.length).eq(2);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("Ordered callback, aborts when callback returns true", async function () {
|
|
||||||
this.timeout(15_000);
|
|
||||||
|
|
||||||
const totalMsgs = 20;
|
|
||||||
|
|
||||||
for (let i = 0; i < totalMsgs; i++) {
|
|
||||||
expect(
|
|
||||||
await nwaku.sendMessage(
|
|
||||||
NimGoNode.toMessageRpcQuery({
|
|
||||||
payload: new Uint8Array([i]),
|
|
||||||
contentTopic: TestContentTopic
|
|
||||||
})
|
|
||||||
)
|
|
||||||
).to.be.true;
|
|
||||||
await delay(1); // to ensure each timestamp is unique.
|
|
||||||
}
|
|
||||||
|
|
||||||
waku = await createLightNode({
|
|
||||||
staticNoiseKey: NOISE_KEY_1
|
|
||||||
});
|
|
||||||
await waku.start();
|
|
||||||
await waku.dial(await nwaku.getMultiaddrWithId());
|
|
||||||
await waitForRemotePeer(waku, [Protocols.Store]);
|
|
||||||
|
|
||||||
const desiredMsgs = 14;
|
|
||||||
const messages: IMessage[] = [];
|
|
||||||
await waku.store.queryWithOrderedCallback(
|
|
||||||
[TestDecoder],
|
|
||||||
async (msg) => {
|
|
||||||
messages.push(msg);
|
|
||||||
return messages.length >= desiredMsgs;
|
|
||||||
},
|
|
||||||
{ pageSize: 7 }
|
|
||||||
);
|
|
||||||
|
|
||||||
expect(messages?.length).eq(desiredMsgs);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
describe("Waku Store, custom pubsub topic", () => {
|
|
||||||
const customPubSubTopic = "/waku/2/custom-dapp/proto";
|
|
||||||
let waku: LightNode;
|
|
||||||
let nwaku: NimGoNode;
|
|
||||||
let nwaku2: NimGoNode;
|
|
||||||
|
|
||||||
const customContentTopic = "/test/2/waku-store/utf8";
|
|
||||||
|
|
||||||
const customTestDecoder = createDecoder(
|
|
||||||
customContentTopic,
|
|
||||||
customPubSubTopic
|
|
||||||
);
|
|
||||||
|
|
||||||
beforeEach(async function () {
|
|
||||||
this.timeout(15_000);
|
|
||||||
nwaku = new NimGoNode(makeLogFileName(this));
|
|
||||||
await nwaku.start({
|
|
||||||
store: true,
|
|
||||||
topic: [customPubSubTopic, DefaultPubSubTopic],
|
|
||||||
relay: true
|
|
||||||
});
|
|
||||||
await nwaku.ensureSubscriptions([customPubSubTopic, DefaultPubSubTopic]);
|
|
||||||
});
|
|
||||||
|
|
||||||
afterEach(async function () {
|
|
||||||
!!nwaku &&
|
|
||||||
nwaku.stop().catch((e) => console.log("Nwaku failed to stop", e));
|
|
||||||
!!nwaku2 &&
|
|
||||||
nwaku2.stop().catch((e) => console.log("Nwaku failed to stop", e));
|
|
||||||
!!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e));
|
|
||||||
});
|
|
||||||
|
|
||||||
it("Generator, custom pubsub topic", async function () {
|
|
||||||
this.timeout(15_000);
|
|
||||||
|
|
||||||
const totalMsgs = 20;
|
|
||||||
for (let i = 0; i < totalMsgs; i++) {
|
|
||||||
expect(
|
|
||||||
await nwaku.sendMessage(
|
|
||||||
NimGoNode.toMessageRpcQuery({
|
|
||||||
payload: new Uint8Array([i]),
|
|
||||||
contentTopic: customContentTopic
|
|
||||||
}),
|
|
||||||
customPubSubTopic
|
|
||||||
)
|
|
||||||
).to.be.true;
|
|
||||||
}
|
|
||||||
|
|
||||||
waku = await createLightNode({
|
|
||||||
staticNoiseKey: NOISE_KEY_1,
|
|
||||||
pubSubTopics: [customPubSubTopic]
|
|
||||||
});
|
|
||||||
await waku.start();
|
|
||||||
await waku.dial(await nwaku.getMultiaddrWithId());
|
|
||||||
await waitForRemotePeer(waku, [Protocols.Store]);
|
|
||||||
|
|
||||||
const messages: IMessage[] = [];
|
|
||||||
let promises: Promise<void>[] = [];
|
|
||||||
for await (const msgPromises of waku.store.queryGenerator([
|
|
||||||
customTestDecoder
|
|
||||||
])) {
|
|
||||||
const _promises = msgPromises.map(async (promise) => {
|
|
||||||
const msg = await promise;
|
|
||||||
if (msg) {
|
|
||||||
messages.push(msg);
|
|
||||||
expect(msg.pubSubTopic).to.eq(customPubSubTopic);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
promises = promises.concat(_promises);
|
|
||||||
}
|
|
||||||
await Promise.all(promises);
|
|
||||||
|
|
||||||
expect(messages?.length).eq(totalMsgs);
|
|
||||||
const result = messages?.findIndex((msg) => {
|
|
||||||
return msg.payload![0]! === 0;
|
|
||||||
});
|
|
||||||
expect(result).to.not.eq(-1);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("Generator, 2 different pubsubtopics", async function () {
|
|
||||||
this.timeout(10000);
|
|
||||||
|
|
||||||
const totalMsgs = 10;
|
|
||||||
await sendMessages(nwaku, totalMsgs, customContentTopic, customPubSubTopic);
|
|
||||||
await sendMessages(nwaku, totalMsgs, TestContentTopic, DefaultPubSubTopic);
|
|
||||||
|
|
||||||
waku = await createLightNode({
|
|
||||||
staticNoiseKey: NOISE_KEY_1,
|
|
||||||
pubSubTopics: [customPubSubTopic, DefaultPubSubTopic]
|
|
||||||
});
|
|
||||||
await waku.start();
|
|
||||||
|
|
||||||
await waku.dial(await nwaku.getMultiaddrWithId());
|
|
||||||
await waitForRemotePeer(waku, [Protocols.Store]);
|
|
||||||
|
|
||||||
const customMessages = await processMessages(
|
|
||||||
waku,
|
|
||||||
[customTestDecoder],
|
|
||||||
customPubSubTopic
|
|
||||||
);
|
|
||||||
expect(customMessages?.length).eq(totalMsgs);
|
|
||||||
const result1 = customMessages?.findIndex((msg) => {
|
|
||||||
return msg.payload![0]! === 0;
|
|
||||||
});
|
|
||||||
expect(result1).to.not.eq(-1);
|
|
||||||
|
|
||||||
const testMessages = await processMessages(
|
|
||||||
waku,
|
|
||||||
[TestDecoder],
|
|
||||||
DefaultPubSubTopic
|
|
||||||
);
|
|
||||||
expect(testMessages?.length).eq(totalMsgs);
|
|
||||||
const result2 = testMessages?.findIndex((msg) => {
|
|
||||||
return msg.payload![0]! === 0;
|
|
||||||
});
|
|
||||||
expect(result2).to.not.eq(-1);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("Generator, 2 nwaku nodes each with different pubsubtopics", async function () {
|
|
||||||
this.timeout(10000);
|
|
||||||
|
|
||||||
// Set up and start a new nwaku node with Default PubSubtopic
|
|
||||||
nwaku2 = new NimGoNode(makeLogFileName(this) + "2");
|
|
||||||
await nwaku2.start({
|
|
||||||
store: true,
|
|
||||||
topic: [DefaultPubSubTopic],
|
|
||||||
relay: true
|
|
||||||
});
|
|
||||||
|
|
||||||
const totalMsgs = 10;
|
|
||||||
await sendMessages(nwaku, totalMsgs, customContentTopic, customPubSubTopic);
|
|
||||||
await sendMessages(nwaku2, totalMsgs, TestContentTopic, DefaultPubSubTopic);
|
|
||||||
|
|
||||||
waku = await createLightNode({
|
|
||||||
staticNoiseKey: NOISE_KEY_1,
|
|
||||||
pubSubTopics: [customPubSubTopic, DefaultPubSubTopic]
|
|
||||||
});
|
|
||||||
await waku.start();
|
|
||||||
|
|
||||||
await waku.dial(await nwaku.getMultiaddrWithId());
|
|
||||||
await waku.dial(await nwaku2.getMultiaddrWithId());
|
|
||||||
await waitForRemotePeer(waku, [Protocols.Store]);
|
|
||||||
|
|
||||||
let customMessages: IMessage[] = [];
|
|
||||||
let testMessages: IMessage[] = [];
|
|
||||||
|
|
||||||
while (
|
|
||||||
customMessages.length != totalMsgs ||
|
|
||||||
testMessages.length != totalMsgs
|
|
||||||
) {
|
|
||||||
customMessages = await processMessages(
|
|
||||||
waku,
|
|
||||||
[customTestDecoder],
|
|
||||||
customPubSubTopic
|
|
||||||
);
|
|
||||||
testMessages = await processMessages(
|
|
||||||
waku,
|
|
||||||
[TestDecoder],
|
|
||||||
DefaultPubSubTopic
|
|
||||||
);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// will move those 2 reusable functions to store/utils when refactoring store tests but with another PR
|
|
||||||
async function sendMessages(
|
|
||||||
instance: NimGoNode,
|
|
||||||
numMessages: number,
|
|
||||||
contentTopic: string,
|
|
||||||
pubSubTopic: string
|
|
||||||
): Promise<void> {
|
|
||||||
for (let i = 0; i < numMessages; i++) {
|
|
||||||
expect(
|
|
||||||
await instance.sendMessage(
|
|
||||||
NimGoNode.toMessageRpcQuery({
|
|
||||||
payload: new Uint8Array([i]),
|
|
||||||
contentTopic: contentTopic
|
|
||||||
}),
|
|
||||||
pubSubTopic
|
|
||||||
)
|
|
||||||
).to.be.true;
|
|
||||||
}
|
|
||||||
await delay(1); // to ensure each timestamp is unique.
|
|
||||||
}
|
|
||||||
|
|
||||||
async function processMessages(
|
|
||||||
instance: LightNode,
|
|
||||||
decoders: Array<Decoder>,
|
|
||||||
expectedTopic: string
|
|
||||||
): Promise<IMessage[]> {
|
|
||||||
const localMessages: IMessage[] = [];
|
|
||||||
let localPromises: Promise<void>[] = [];
|
|
||||||
for await (const msgPromises of instance.store.queryGenerator(decoders)) {
|
|
||||||
const _promises = msgPromises.map(async (promise) => {
|
|
||||||
const msg = await promise;
|
|
||||||
if (msg) {
|
|
||||||
localMessages.push(msg);
|
|
||||||
expect(msg.pubSubTopic).to.eq(expectedTopic);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
localPromises = localPromises.concat(_promises);
|
|
||||||
}
|
|
||||||
await Promise.all(localPromises);
|
|
||||||
return localMessages;
|
|
||||||
}
|
|
||||||
});
|
|
203
packages/tests/tests/store/cursor.node.spec.ts
Normal file
203
packages/tests/tests/store/cursor.node.spec.ts
Normal file
@ -0,0 +1,203 @@
|
|||||||
|
import { createCursor, DecodedMessage, DefaultPubSubTopic } from "@waku/core";
|
||||||
|
import type { LightNode } from "@waku/interfaces";
|
||||||
|
import { bytesToUtf8 } from "@waku/utils/bytes";
|
||||||
|
import { expect } from "chai";
|
||||||
|
|
||||||
|
import { makeLogFileName, NimGoNode, tearDownNodes } from "../../src/index.js";
|
||||||
|
|
||||||
|
import {
|
||||||
|
customPubSubTopic,
|
||||||
|
sendMessages,
|
||||||
|
startAndConnectLightNode,
|
||||||
|
TestContentTopic,
|
||||||
|
TestDecoder,
|
||||||
|
totalMsgs
|
||||||
|
} from "./utils.js";
|
||||||
|
|
||||||
|
describe("Waku Store, cursor", function () {
|
||||||
|
this.timeout(15000);
|
||||||
|
let waku: LightNode;
|
||||||
|
let waku2: LightNode;
|
||||||
|
let nwaku: NimGoNode;
|
||||||
|
|
||||||
|
beforeEach(async function () {
|
||||||
|
this.timeout(15000);
|
||||||
|
nwaku = new NimGoNode(makeLogFileName(this));
|
||||||
|
await nwaku.startWithRetries({ store: true, lightpush: true, relay: true });
|
||||||
|
await nwaku.ensureSubscriptions();
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(async function () {
|
||||||
|
this.timeout(15000);
|
||||||
|
await tearDownNodes(nwaku, [waku, waku2]);
|
||||||
|
});
|
||||||
|
|
||||||
|
[
|
||||||
|
[2, 4],
|
||||||
|
[0, 20],
|
||||||
|
[10, 40],
|
||||||
|
[19, 20],
|
||||||
|
[19, 50],
|
||||||
|
[110, 120]
|
||||||
|
].forEach(([cursorIndex, messageCount]) => {
|
||||||
|
it(`Passing a valid cursor at ${cursorIndex} index when there are ${messageCount} messages`, async function () {
|
||||||
|
await sendMessages(
|
||||||
|
nwaku,
|
||||||
|
messageCount,
|
||||||
|
TestContentTopic,
|
||||||
|
DefaultPubSubTopic
|
||||||
|
);
|
||||||
|
waku = await startAndConnectLightNode(nwaku);
|
||||||
|
|
||||||
|
// messages in reversed order (first message at last index)
|
||||||
|
const messages: DecodedMessage[] = [];
|
||||||
|
for await (const page of waku.store.queryGenerator([TestDecoder])) {
|
||||||
|
for await (const msg of page.reverse()) {
|
||||||
|
messages.push(msg as DecodedMessage);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// create cursor to extract messages after the cursorIndex
|
||||||
|
const cursor = await createCursor(messages[cursorIndex]);
|
||||||
|
|
||||||
|
const messagesAfterCursor: DecodedMessage[] = [];
|
||||||
|
for await (const page of waku.store.queryGenerator([TestDecoder], {
|
||||||
|
cursor
|
||||||
|
})) {
|
||||||
|
for await (const msg of page.reverse()) {
|
||||||
|
if (msg) {
|
||||||
|
messagesAfterCursor.push(msg as DecodedMessage);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
expect(messages.length).be.eql(messageCount);
|
||||||
|
expect(messagesAfterCursor.length).be.eql(messageCount - cursorIndex - 1);
|
||||||
|
if (cursorIndex == messages.length - 1) {
|
||||||
|
// in this case the cursor will return nothin because it points at the end of the list
|
||||||
|
expect(messagesAfterCursor).be.eql([]);
|
||||||
|
} else {
|
||||||
|
expect(bytesToUtf8(messagesAfterCursor[0].payload)).to.be.eq(
|
||||||
|
bytesToUtf8(messages[cursorIndex + 1].payload)
|
||||||
|
);
|
||||||
|
expect(
|
||||||
|
bytesToUtf8(
|
||||||
|
messagesAfterCursor[messagesAfterCursor.length - 1].payload
|
||||||
|
)
|
||||||
|
).to.be.eq(bytesToUtf8(messages[messages.length - 1].payload));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Reusing cursor across nodes", async function () {
|
||||||
|
await sendMessages(nwaku, totalMsgs, TestContentTopic, DefaultPubSubTopic);
|
||||||
|
waku = await startAndConnectLightNode(nwaku);
|
||||||
|
waku2 = await startAndConnectLightNode(nwaku);
|
||||||
|
|
||||||
|
// messages in reversed order (first message at last index)
|
||||||
|
const messages: DecodedMessage[] = [];
|
||||||
|
for await (const page of waku.store.queryGenerator([TestDecoder])) {
|
||||||
|
for await (const msg of page.reverse()) {
|
||||||
|
messages.push(msg as DecodedMessage);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// create cursor to extract messages after the cursorIndex
|
||||||
|
const cursor = await createCursor(messages[5]);
|
||||||
|
|
||||||
|
// query node2 with the cursor from node1
|
||||||
|
const messagesAfterCursor: DecodedMessage[] = [];
|
||||||
|
for await (const page of waku2.store.queryGenerator([TestDecoder], {
|
||||||
|
cursor
|
||||||
|
})) {
|
||||||
|
for await (const msg of page.reverse()) {
|
||||||
|
if (msg) {
|
||||||
|
messagesAfterCursor.push(msg as DecodedMessage);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
expect(messages.length).be.eql(totalMsgs);
|
||||||
|
expect(messagesAfterCursor.length).be.eql(totalMsgs - 6);
|
||||||
|
expect(bytesToUtf8(messagesAfterCursor[0].payload)).to.be.eq(
|
||||||
|
bytesToUtf8(messages[6].payload)
|
||||||
|
);
|
||||||
|
expect(
|
||||||
|
bytesToUtf8(messagesAfterCursor[messagesAfterCursor.length - 1].payload)
|
||||||
|
).to.be.eq(bytesToUtf8(messages[messages.length - 1].payload));
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Passing cursor with wrong message digest", async function () {
|
||||||
|
await sendMessages(nwaku, totalMsgs, TestContentTopic, DefaultPubSubTopic);
|
||||||
|
waku = await startAndConnectLightNode(nwaku);
|
||||||
|
|
||||||
|
const messages: DecodedMessage[] = [];
|
||||||
|
for await (const page of waku.store.queryGenerator([TestDecoder])) {
|
||||||
|
for await (const msg of page.reverse()) {
|
||||||
|
messages.push(msg as DecodedMessage);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
const cursor = await createCursor(messages[5]);
|
||||||
|
|
||||||
|
// setting a wrong digest
|
||||||
|
cursor.digest = new Uint8Array([]);
|
||||||
|
|
||||||
|
const messagesAfterCursor: DecodedMessage[] = [];
|
||||||
|
try {
|
||||||
|
for await (const page of waku.store.queryGenerator([TestDecoder], {
|
||||||
|
cursor
|
||||||
|
})) {
|
||||||
|
for await (const msg of page.reverse()) {
|
||||||
|
if (msg) {
|
||||||
|
messagesAfterCursor.push(msg as DecodedMessage);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Should return same as go-waku. Raised bug: https://github.com/waku-org/nwaku/issues/2117
|
||||||
|
expect(messagesAfterCursor.length).to.eql(0);
|
||||||
|
} catch (error) {
|
||||||
|
if (
|
||||||
|
nwaku.type() === "go-waku" &&
|
||||||
|
typeof error === "string" &&
|
||||||
|
error.includes("History response contains an Error: INVALID_CURSOR")
|
||||||
|
) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
throw error instanceof Error
|
||||||
|
? new Error(`Unexpected error: ${error.message}`)
|
||||||
|
: error;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Passing cursor with wrong pubSubTopic", async function () {
|
||||||
|
await sendMessages(nwaku, totalMsgs, TestContentTopic, DefaultPubSubTopic);
|
||||||
|
waku = await startAndConnectLightNode(nwaku);
|
||||||
|
|
||||||
|
const messages: DecodedMessage[] = [];
|
||||||
|
for await (const page of waku.store.queryGenerator([TestDecoder])) {
|
||||||
|
for await (const msg of page.reverse()) {
|
||||||
|
messages.push(msg as DecodedMessage);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
messages[5].pubSubTopic = customPubSubTopic;
|
||||||
|
const cursor = await createCursor(messages[5]);
|
||||||
|
|
||||||
|
try {
|
||||||
|
for await (const page of waku.store.queryGenerator([TestDecoder], {
|
||||||
|
cursor
|
||||||
|
})) {
|
||||||
|
page;
|
||||||
|
}
|
||||||
|
throw new Error("Cursor with wrong pubsubtopic was accepted");
|
||||||
|
} catch (err) {
|
||||||
|
if (
|
||||||
|
!(err instanceof Error) ||
|
||||||
|
!err.message.includes(
|
||||||
|
`Cursor pubsub topic (${customPubSubTopic}) does not match decoder pubsub topic (${DefaultPubSubTopic})`
|
||||||
|
)
|
||||||
|
) {
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
224
packages/tests/tests/store/error_handling.node.spec.ts
Normal file
224
packages/tests/tests/store/error_handling.node.spec.ts
Normal file
@ -0,0 +1,224 @@
|
|||||||
|
import { DefaultPubSubTopic } from "@waku/core";
|
||||||
|
import { IMessage, type LightNode } from "@waku/interfaces";
|
||||||
|
import { expect } from "chai";
|
||||||
|
|
||||||
|
import { makeLogFileName, NimGoNode, tearDownNodes } from "../../src/index.js";
|
||||||
|
|
||||||
|
import {
|
||||||
|
customPubSubTopic,
|
||||||
|
customTestDecoder,
|
||||||
|
processQueriedMessages,
|
||||||
|
startAndConnectLightNode,
|
||||||
|
TestDecoder
|
||||||
|
} from "./utils.js";
|
||||||
|
|
||||||
|
describe("Waku Store, error handling", function () {
|
||||||
|
this.timeout(15000);
|
||||||
|
let waku: LightNode;
|
||||||
|
let nwaku: NimGoNode;
|
||||||
|
|
||||||
|
beforeEach(async function () {
|
||||||
|
this.timeout(15000);
|
||||||
|
nwaku = new NimGoNode(makeLogFileName(this));
|
||||||
|
await nwaku.startWithRetries({ store: true, lightpush: true, relay: true });
|
||||||
|
await nwaku.ensureSubscriptions();
|
||||||
|
waku = await startAndConnectLightNode(nwaku);
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(async function () {
|
||||||
|
this.timeout(15000);
|
||||||
|
await tearDownNodes(nwaku, waku);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Query Generator, Wrong PubSubTopic", async function () {
|
||||||
|
try {
|
||||||
|
for await (const msgPromises of waku.store.queryGenerator([
|
||||||
|
customTestDecoder
|
||||||
|
])) {
|
||||||
|
msgPromises;
|
||||||
|
}
|
||||||
|
throw new Error("QueryGenerator was successful but was expected to fail");
|
||||||
|
} catch (err) {
|
||||||
|
if (
|
||||||
|
!(err instanceof Error) ||
|
||||||
|
!err.message.includes(
|
||||||
|
`PubSub topic ${customPubSubTopic} has not been configured on this instance. Configured topics are: ${DefaultPubSubTopic}`
|
||||||
|
)
|
||||||
|
) {
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Query Generator, Multiple PubSubTopics", async function () {
|
||||||
|
try {
|
||||||
|
for await (const msgPromises of waku.store.queryGenerator([
|
||||||
|
TestDecoder,
|
||||||
|
customTestDecoder
|
||||||
|
])) {
|
||||||
|
msgPromises;
|
||||||
|
}
|
||||||
|
throw new Error("QueryGenerator was successful but was expected to fail");
|
||||||
|
} catch (err) {
|
||||||
|
if (
|
||||||
|
!(err instanceof Error) ||
|
||||||
|
!err.message.includes(
|
||||||
|
"API does not support querying multiple pubsub topics at once"
|
||||||
|
)
|
||||||
|
) {
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Query Generator, No Decoder", async function () {
|
||||||
|
try {
|
||||||
|
for await (const msgPromises of waku.store.queryGenerator([])) {
|
||||||
|
msgPromises;
|
||||||
|
}
|
||||||
|
throw new Error("QueryGenerator was successful but was expected to fail");
|
||||||
|
} catch (err) {
|
||||||
|
if (
|
||||||
|
!(err instanceof Error) ||
|
||||||
|
!err.message.includes("No decoders provided")
|
||||||
|
) {
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Query Generator, No message returned", async function () {
|
||||||
|
const messages = await processQueriedMessages(
|
||||||
|
waku,
|
||||||
|
[TestDecoder],
|
||||||
|
DefaultPubSubTopic
|
||||||
|
);
|
||||||
|
expect(messages?.length).eq(0);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Query with Ordered Callback, Wrong PubSubTopic", async function () {
|
||||||
|
try {
|
||||||
|
await waku.store.queryWithOrderedCallback(
|
||||||
|
[customTestDecoder],
|
||||||
|
async () => {}
|
||||||
|
);
|
||||||
|
throw new Error("QueryGenerator was successful but was expected to fail");
|
||||||
|
} catch (err) {
|
||||||
|
if (
|
||||||
|
!(err instanceof Error) ||
|
||||||
|
!err.message.includes(
|
||||||
|
`PubSub topic ${customPubSubTopic} has not been configured on this instance. Configured topics are: ${DefaultPubSubTopic}`
|
||||||
|
)
|
||||||
|
) {
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Query with Ordered Callback, Multiple PubSubTopics", async function () {
|
||||||
|
try {
|
||||||
|
await waku.store.queryWithOrderedCallback(
|
||||||
|
[TestDecoder, customTestDecoder],
|
||||||
|
async () => {}
|
||||||
|
);
|
||||||
|
throw new Error("QueryGenerator was successful but was expected to fail");
|
||||||
|
} catch (err) {
|
||||||
|
if (
|
||||||
|
!(err instanceof Error) ||
|
||||||
|
!err.message.includes(
|
||||||
|
"API does not support querying multiple pubsub topics at once"
|
||||||
|
)
|
||||||
|
) {
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Query with Ordered Callback, No Decoder", async function () {
|
||||||
|
try {
|
||||||
|
await waku.store.queryWithOrderedCallback([], async () => {});
|
||||||
|
throw new Error("QueryGenerator was successful but was expected to fail");
|
||||||
|
} catch (err) {
|
||||||
|
if (
|
||||||
|
!(err instanceof Error) ||
|
||||||
|
!err.message.includes("No decoders provided")
|
||||||
|
) {
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Query with Ordered Callback, No message returned", async function () {
|
||||||
|
const messages: IMessage[] = [];
|
||||||
|
await waku.store.queryWithOrderedCallback([TestDecoder], async (msg) => {
|
||||||
|
messages.push(msg);
|
||||||
|
});
|
||||||
|
expect(messages?.length).eq(0);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Query with Promise Callback, Wrong PubSubTopic", async function () {
|
||||||
|
try {
|
||||||
|
await waku.store.queryWithPromiseCallback(
|
||||||
|
[customTestDecoder],
|
||||||
|
async () => {}
|
||||||
|
);
|
||||||
|
throw new Error("QueryGenerator was successful but was expected to fail");
|
||||||
|
} catch (err) {
|
||||||
|
if (
|
||||||
|
!(err instanceof Error) ||
|
||||||
|
!err.message.includes(
|
||||||
|
`PubSub topic ${customPubSubTopic} has not been configured on this instance. Configured topics are: ${DefaultPubSubTopic}`
|
||||||
|
)
|
||||||
|
) {
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Query with Promise Callback, Multiple PubSubTopics", async function () {
|
||||||
|
try {
|
||||||
|
await waku.store.queryWithPromiseCallback(
|
||||||
|
[TestDecoder, customTestDecoder],
|
||||||
|
async () => {}
|
||||||
|
);
|
||||||
|
throw new Error("QueryGenerator was successful but was expected to fail");
|
||||||
|
} catch (err) {
|
||||||
|
if (
|
||||||
|
!(err instanceof Error) ||
|
||||||
|
!err.message.includes(
|
||||||
|
"API does not support querying multiple pubsub topics at once"
|
||||||
|
)
|
||||||
|
) {
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Query with Promise Callback, No Decoder", async function () {
|
||||||
|
try {
|
||||||
|
await waku.store.queryWithPromiseCallback([], async () => {});
|
||||||
|
throw new Error("QueryGenerator was successful but was expected to fail");
|
||||||
|
} catch (err) {
|
||||||
|
if (
|
||||||
|
!(err instanceof Error) ||
|
||||||
|
!err.message.includes("No decoders provided")
|
||||||
|
) {
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Query with Promise Callback, No message returned", async function () {
|
||||||
|
const messages: IMessage[] = [];
|
||||||
|
await waku.store.queryWithPromiseCallback(
|
||||||
|
[TestDecoder],
|
||||||
|
async (msgPromise) => {
|
||||||
|
const msg = await msgPromise;
|
||||||
|
if (msg) {
|
||||||
|
messages.push(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
expect(messages?.length).eq(0);
|
||||||
|
});
|
||||||
|
});
|
333
packages/tests/tests/store/index.node.spec.ts
Normal file
333
packages/tests/tests/store/index.node.spec.ts
Normal file
@ -0,0 +1,333 @@
|
|||||||
|
import {
|
||||||
|
createDecoder,
|
||||||
|
DecodedMessage,
|
||||||
|
DefaultPubSubTopic,
|
||||||
|
waitForRemotePeer
|
||||||
|
} from "@waku/core";
|
||||||
|
import type { IMessage, LightNode } from "@waku/interfaces";
|
||||||
|
import { Protocols } from "@waku/interfaces";
|
||||||
|
import {
|
||||||
|
createDecoder as createEciesDecoder,
|
||||||
|
createEncoder as createEciesEncoder,
|
||||||
|
generatePrivateKey,
|
||||||
|
getPublicKey
|
||||||
|
} from "@waku/message-encryption/ecies";
|
||||||
|
import {
|
||||||
|
createDecoder as createSymDecoder,
|
||||||
|
createEncoder as createSymEncoder,
|
||||||
|
generateSymmetricKey
|
||||||
|
} from "@waku/message-encryption/symmetric";
|
||||||
|
import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes";
|
||||||
|
import { expect } from "chai";
|
||||||
|
import isEqual from "lodash/isEqual";
|
||||||
|
|
||||||
|
import {
|
||||||
|
delay,
|
||||||
|
makeLogFileName,
|
||||||
|
MessageCollector,
|
||||||
|
NimGoNode,
|
||||||
|
tearDownNodes,
|
||||||
|
TEST_STRING
|
||||||
|
} from "../../src/index.js";
|
||||||
|
|
||||||
|
import {
|
||||||
|
customContentTopic,
|
||||||
|
log,
|
||||||
|
messageText,
|
||||||
|
processQueriedMessages,
|
||||||
|
sendMessages,
|
||||||
|
startAndConnectLightNode,
|
||||||
|
TestContentTopic,
|
||||||
|
TestDecoder,
|
||||||
|
TestEncoder,
|
||||||
|
totalMsgs
|
||||||
|
} from "./utils.js";
|
||||||
|
|
||||||
|
const secondDecoder = createDecoder(customContentTopic, DefaultPubSubTopic);
|
||||||
|
|
||||||
|
describe("Waku Store, general", function () {
|
||||||
|
this.timeout(15000);
|
||||||
|
let waku: LightNode;
|
||||||
|
let waku2: LightNode;
|
||||||
|
let nwaku: NimGoNode;
|
||||||
|
|
||||||
|
beforeEach(async function () {
|
||||||
|
this.timeout(15000);
|
||||||
|
nwaku = new NimGoNode(makeLogFileName(this));
|
||||||
|
await nwaku.startWithRetries({ store: true, lightpush: true, relay: true });
|
||||||
|
await nwaku.ensureSubscriptions();
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(async function () {
|
||||||
|
this.timeout(15000);
|
||||||
|
await tearDownNodes(nwaku, [waku, waku2]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Query generator for multiple messages", async function () {
|
||||||
|
await sendMessages(nwaku, totalMsgs, TestContentTopic, DefaultPubSubTopic);
|
||||||
|
waku = await startAndConnectLightNode(nwaku);
|
||||||
|
const messages = await processQueriedMessages(
|
||||||
|
waku,
|
||||||
|
[TestDecoder],
|
||||||
|
DefaultPubSubTopic
|
||||||
|
);
|
||||||
|
|
||||||
|
expect(messages?.length).eq(totalMsgs);
|
||||||
|
|
||||||
|
// checking that the message with text 0 exists
|
||||||
|
const result = messages?.findIndex((msg) => {
|
||||||
|
return msg.payload[0]! === 0;
|
||||||
|
});
|
||||||
|
expect(result).to.not.eq(-1);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Query generator for multiple messages with different message text format", async function () {
|
||||||
|
for (const testItem of TEST_STRING) {
|
||||||
|
expect(
|
||||||
|
await nwaku.sendMessage(
|
||||||
|
NimGoNode.toMessageRpcQuery({
|
||||||
|
payload: utf8ToBytes(testItem["value"]),
|
||||||
|
contentTopic: TestContentTopic
|
||||||
|
}),
|
||||||
|
DefaultPubSubTopic
|
||||||
|
)
|
||||||
|
).to.be.true;
|
||||||
|
await delay(1); // to ensure each timestamp is unique.
|
||||||
|
}
|
||||||
|
|
||||||
|
waku = await startAndConnectLightNode(nwaku);
|
||||||
|
const messageCollector = new MessageCollector();
|
||||||
|
messageCollector.list = await processQueriedMessages(
|
||||||
|
waku,
|
||||||
|
[TestDecoder],
|
||||||
|
DefaultPubSubTopic
|
||||||
|
);
|
||||||
|
|
||||||
|
// checking that all message sent were retrieved
|
||||||
|
TEST_STRING.forEach((testItem) => {
|
||||||
|
expect(
|
||||||
|
messageCollector.hasMessage(TestContentTopic, testItem["value"])
|
||||||
|
).to.eq(true);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Query generator for multiple messages with multiple decoders", async function () {
|
||||||
|
await nwaku.sendMessage(
|
||||||
|
NimGoNode.toMessageRpcQuery({
|
||||||
|
payload: utf8ToBytes("M1"),
|
||||||
|
contentTopic: TestContentTopic
|
||||||
|
}),
|
||||||
|
DefaultPubSubTopic
|
||||||
|
);
|
||||||
|
await nwaku.sendMessage(
|
||||||
|
NimGoNode.toMessageRpcQuery({
|
||||||
|
payload: utf8ToBytes("M2"),
|
||||||
|
contentTopic: customContentTopic
|
||||||
|
}),
|
||||||
|
DefaultPubSubTopic
|
||||||
|
);
|
||||||
|
waku = await startAndConnectLightNode(nwaku);
|
||||||
|
|
||||||
|
const messageCollector = new MessageCollector();
|
||||||
|
messageCollector.list = await processQueriedMessages(
|
||||||
|
waku,
|
||||||
|
[TestDecoder, secondDecoder],
|
||||||
|
DefaultPubSubTopic
|
||||||
|
);
|
||||||
|
expect(messageCollector.hasMessage(TestContentTopic, "M1")).to.eq(true);
|
||||||
|
expect(messageCollector.hasMessage(customContentTopic, "M2")).to.eq(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Query generator for multiple messages with different content topic format", async function () {
|
||||||
|
for (const testItem of TEST_STRING) {
|
||||||
|
expect(
|
||||||
|
await nwaku.sendMessage(
|
||||||
|
NimGoNode.toMessageRpcQuery({
|
||||||
|
payload: utf8ToBytes(messageText),
|
||||||
|
contentTopic: testItem["value"]
|
||||||
|
}),
|
||||||
|
DefaultPubSubTopic
|
||||||
|
)
|
||||||
|
).to.be.true;
|
||||||
|
await delay(1); // to ensure each timestamp is unique.
|
||||||
|
}
|
||||||
|
|
||||||
|
waku = await startAndConnectLightNode(nwaku);
|
||||||
|
|
||||||
|
for (const testItem of TEST_STRING) {
|
||||||
|
for await (const query of waku.store.queryGenerator([
|
||||||
|
createDecoder(testItem["value"])
|
||||||
|
])) {
|
||||||
|
for await (const msg of query) {
|
||||||
|
expect(isEqual(msg!.payload, utf8ToBytes(messageText))).to.eq(true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Callback on promise", async function () {
|
||||||
|
await sendMessages(nwaku, totalMsgs, TestContentTopic, DefaultPubSubTopic);
|
||||||
|
waku = await startAndConnectLightNode(nwaku);
|
||||||
|
|
||||||
|
const messages: IMessage[] = [];
|
||||||
|
await waku.store.queryWithPromiseCallback(
|
||||||
|
[TestDecoder],
|
||||||
|
async (msgPromise) => {
|
||||||
|
const msg = await msgPromise;
|
||||||
|
if (msg) {
|
||||||
|
messages.push(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
expect(messages?.length).eq(totalMsgs);
|
||||||
|
const result = messages?.findIndex((msg) => {
|
||||||
|
return msg.payload[0]! === 0;
|
||||||
|
});
|
||||||
|
expect(result).to.not.eq(-1);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Callback on promise, aborts when callback returns true", async function () {
|
||||||
|
await sendMessages(nwaku, totalMsgs, TestContentTopic, DefaultPubSubTopic);
|
||||||
|
waku = await startAndConnectLightNode(nwaku);
|
||||||
|
|
||||||
|
const desiredMsgs = 14;
|
||||||
|
const messages: IMessage[] = [];
|
||||||
|
await waku.store.queryWithPromiseCallback(
|
||||||
|
[TestDecoder],
|
||||||
|
async (msgPromise) => {
|
||||||
|
const msg = await msgPromise;
|
||||||
|
if (msg) {
|
||||||
|
messages.push(msg);
|
||||||
|
}
|
||||||
|
return messages.length >= desiredMsgs;
|
||||||
|
},
|
||||||
|
{ pageSize: 7 }
|
||||||
|
);
|
||||||
|
|
||||||
|
expect(messages?.length).eq(desiredMsgs);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Generator, with asymmetric & symmetric encrypted messages", async function () {
|
||||||
|
const asymText = "This message is encrypted for me using asymmetric";
|
||||||
|
const asymTopic = "/test/1/asymmetric/proto";
|
||||||
|
const symText =
|
||||||
|
"This message is encrypted for me using symmetric encryption";
|
||||||
|
const symTopic = "/test/1/symmetric/proto";
|
||||||
|
const clearText = "This is a clear text message for everyone to read";
|
||||||
|
const otherText =
|
||||||
|
"This message is not for and I must not be able to read it";
|
||||||
|
|
||||||
|
const timestamp = new Date();
|
||||||
|
|
||||||
|
const asymMsg = { payload: utf8ToBytes(asymText), timestamp };
|
||||||
|
const symMsg = {
|
||||||
|
payload: utf8ToBytes(symText),
|
||||||
|
timestamp: new Date(timestamp.valueOf() + 1)
|
||||||
|
};
|
||||||
|
const clearMsg = {
|
||||||
|
payload: utf8ToBytes(clearText),
|
||||||
|
timestamp: new Date(timestamp.valueOf() + 2)
|
||||||
|
};
|
||||||
|
const otherMsg = {
|
||||||
|
payload: utf8ToBytes(otherText),
|
||||||
|
timestamp: new Date(timestamp.valueOf() + 3)
|
||||||
|
};
|
||||||
|
|
||||||
|
const privateKey = generatePrivateKey();
|
||||||
|
const symKey = generateSymmetricKey();
|
||||||
|
const publicKey = getPublicKey(privateKey);
|
||||||
|
|
||||||
|
const eciesEncoder = createEciesEncoder({
|
||||||
|
contentTopic: asymTopic,
|
||||||
|
publicKey
|
||||||
|
});
|
||||||
|
const symEncoder = createSymEncoder({
|
||||||
|
contentTopic: symTopic,
|
||||||
|
symKey
|
||||||
|
});
|
||||||
|
|
||||||
|
const otherEncoder = createEciesEncoder({
|
||||||
|
contentTopic: TestContentTopic,
|
||||||
|
publicKey: getPublicKey(generatePrivateKey())
|
||||||
|
});
|
||||||
|
|
||||||
|
const eciesDecoder = createEciesDecoder(asymTopic, privateKey);
|
||||||
|
const symDecoder = createSymDecoder(symTopic, symKey);
|
||||||
|
|
||||||
|
waku = await startAndConnectLightNode(nwaku);
|
||||||
|
waku2 = await startAndConnectLightNode(nwaku);
|
||||||
|
const nimWakuMultiaddr = await nwaku.getMultiaddrWithId();
|
||||||
|
|
||||||
|
await Promise.all([
|
||||||
|
waku.dial(nimWakuMultiaddr),
|
||||||
|
waku2.dial(nimWakuMultiaddr)
|
||||||
|
]);
|
||||||
|
|
||||||
|
log("Waku nodes connected to nwaku");
|
||||||
|
|
||||||
|
await waitForRemotePeer(waku, [Protocols.LightPush]);
|
||||||
|
|
||||||
|
log("Sending messages using light push");
|
||||||
|
await Promise.all([
|
||||||
|
waku.lightPush.send(eciesEncoder, asymMsg),
|
||||||
|
waku.lightPush.send(symEncoder, symMsg),
|
||||||
|
waku.lightPush.send(otherEncoder, otherMsg),
|
||||||
|
waku.lightPush.send(TestEncoder, clearMsg)
|
||||||
|
]);
|
||||||
|
|
||||||
|
await waitForRemotePeer(waku2, [Protocols.Store]);
|
||||||
|
|
||||||
|
const messages: DecodedMessage[] = [];
|
||||||
|
log("Retrieve messages from store");
|
||||||
|
|
||||||
|
for await (const query of waku2.store.queryGenerator([
|
||||||
|
eciesDecoder,
|
||||||
|
symDecoder,
|
||||||
|
TestDecoder
|
||||||
|
])) {
|
||||||
|
for await (const msg of query) {
|
||||||
|
if (msg) {
|
||||||
|
messages.push(msg as DecodedMessage);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Messages are ordered from oldest to latest within a page (1 page query)
|
||||||
|
expect(bytesToUtf8(messages[0].payload!)).to.eq(asymText);
|
||||||
|
expect(bytesToUtf8(messages[1].payload!)).to.eq(symText);
|
||||||
|
expect(bytesToUtf8(messages[2].payload!)).to.eq(clearText);
|
||||||
|
expect(messages?.length).eq(3);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Ordered callback, aborts when callback returns true", async function () {
|
||||||
|
await sendMessages(nwaku, totalMsgs, TestContentTopic, DefaultPubSubTopic);
|
||||||
|
waku = await startAndConnectLightNode(nwaku);
|
||||||
|
|
||||||
|
const desiredMsgs = 14;
|
||||||
|
const messages: IMessage[] = [];
|
||||||
|
await waku.store.queryWithOrderedCallback(
|
||||||
|
[TestDecoder],
|
||||||
|
async (msg) => {
|
||||||
|
messages.push(msg);
|
||||||
|
return messages.length >= desiredMsgs;
|
||||||
|
},
|
||||||
|
{ pageSize: 7 }
|
||||||
|
);
|
||||||
|
|
||||||
|
expect(messages?.length).eq(desiredMsgs);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Query generator for 2000 messages", async function () {
|
||||||
|
this.timeout(40000);
|
||||||
|
await sendMessages(nwaku, 2000, TestContentTopic, DefaultPubSubTopic);
|
||||||
|
waku = await startAndConnectLightNode(nwaku);
|
||||||
|
const messages = await processQueriedMessages(
|
||||||
|
waku,
|
||||||
|
[TestDecoder],
|
||||||
|
DefaultPubSubTopic
|
||||||
|
);
|
||||||
|
|
||||||
|
expect(messages?.length).eq(2000);
|
||||||
|
});
|
||||||
|
});
|
143
packages/tests/tests/store/multiple_pubsub.spec.ts
Normal file
143
packages/tests/tests/store/multiple_pubsub.spec.ts
Normal file
@ -0,0 +1,143 @@
|
|||||||
|
import { DefaultPubSubTopic, waitForRemotePeer } from "@waku/core";
|
||||||
|
import type { IMessage, LightNode } from "@waku/interfaces";
|
||||||
|
import { createLightNode, Protocols } from "@waku/sdk";
|
||||||
|
import { expect } from "chai";
|
||||||
|
|
||||||
|
import {
|
||||||
|
makeLogFileName,
|
||||||
|
NimGoNode,
|
||||||
|
NOISE_KEY_1,
|
||||||
|
tearDownNodes
|
||||||
|
} from "../../src/index.js";
|
||||||
|
|
||||||
|
import {
|
||||||
|
customContentTopic,
|
||||||
|
customPubSubTopic,
|
||||||
|
customTestDecoder,
|
||||||
|
processQueriedMessages,
|
||||||
|
sendMessages,
|
||||||
|
startAndConnectLightNode,
|
||||||
|
TestContentTopic,
|
||||||
|
TestDecoder,
|
||||||
|
totalMsgs
|
||||||
|
} from "./utils.js";
|
||||||
|
|
||||||
|
describe("Waku Store, custom pubsub topic", function () {
|
||||||
|
this.timeout(15000);
|
||||||
|
let waku: LightNode;
|
||||||
|
let nwaku: NimGoNode;
|
||||||
|
let nwaku2: NimGoNode;
|
||||||
|
|
||||||
|
beforeEach(async function () {
|
||||||
|
this.timeout(15000);
|
||||||
|
nwaku = new NimGoNode(makeLogFileName(this));
|
||||||
|
await nwaku.start({
|
||||||
|
store: true,
|
||||||
|
topic: [customPubSubTopic, DefaultPubSubTopic],
|
||||||
|
relay: true
|
||||||
|
});
|
||||||
|
await nwaku.ensureSubscriptions([customPubSubTopic, DefaultPubSubTopic]);
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(async function () {
|
||||||
|
this.timeout(15000);
|
||||||
|
await tearDownNodes([nwaku, nwaku2], waku);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Generator, custom pubsub topic", async function () {
|
||||||
|
await sendMessages(nwaku, totalMsgs, customContentTopic, customPubSubTopic);
|
||||||
|
waku = await startAndConnectLightNode(nwaku, [customPubSubTopic]);
|
||||||
|
const messages = await processQueriedMessages(
|
||||||
|
waku,
|
||||||
|
[customTestDecoder],
|
||||||
|
customPubSubTopic
|
||||||
|
);
|
||||||
|
|
||||||
|
expect(messages?.length).eq(totalMsgs);
|
||||||
|
const result = messages?.findIndex((msg) => {
|
||||||
|
return msg.payload![0]! === 0;
|
||||||
|
});
|
||||||
|
expect(result).to.not.eq(-1);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Generator, 2 different pubsubtopics", async function () {
|
||||||
|
this.timeout(10000);
|
||||||
|
|
||||||
|
const totalMsgs = 10;
|
||||||
|
await sendMessages(nwaku, totalMsgs, customContentTopic, customPubSubTopic);
|
||||||
|
await sendMessages(nwaku, totalMsgs, TestContentTopic, DefaultPubSubTopic);
|
||||||
|
|
||||||
|
waku = await startAndConnectLightNode(nwaku, [
|
||||||
|
customPubSubTopic,
|
||||||
|
DefaultPubSubTopic
|
||||||
|
]);
|
||||||
|
|
||||||
|
const customMessages = await processQueriedMessages(
|
||||||
|
waku,
|
||||||
|
[customTestDecoder],
|
||||||
|
customPubSubTopic
|
||||||
|
);
|
||||||
|
expect(customMessages?.length).eq(totalMsgs);
|
||||||
|
const result1 = customMessages?.findIndex((msg) => {
|
||||||
|
return msg.payload![0]! === 0;
|
||||||
|
});
|
||||||
|
expect(result1).to.not.eq(-1);
|
||||||
|
|
||||||
|
const testMessages = await processQueriedMessages(
|
||||||
|
waku,
|
||||||
|
[TestDecoder],
|
||||||
|
DefaultPubSubTopic
|
||||||
|
);
|
||||||
|
expect(testMessages?.length).eq(totalMsgs);
|
||||||
|
const result2 = testMessages?.findIndex((msg) => {
|
||||||
|
return msg.payload![0]! === 0;
|
||||||
|
});
|
||||||
|
expect(result2).to.not.eq(-1);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Generator, 2 nwaku nodes each with different pubsubtopics", async function () {
|
||||||
|
this.timeout(10000);
|
||||||
|
|
||||||
|
// Set up and start a new nwaku node with Default PubSubtopic
|
||||||
|
nwaku2 = new NimGoNode(makeLogFileName(this) + "2");
|
||||||
|
await nwaku2.start({
|
||||||
|
store: true,
|
||||||
|
topic: [DefaultPubSubTopic],
|
||||||
|
relay: true
|
||||||
|
});
|
||||||
|
await nwaku2.ensureSubscriptions([DefaultPubSubTopic]);
|
||||||
|
|
||||||
|
const totalMsgs = 10;
|
||||||
|
await sendMessages(nwaku, totalMsgs, customContentTopic, customPubSubTopic);
|
||||||
|
await sendMessages(nwaku2, totalMsgs, TestContentTopic, DefaultPubSubTopic);
|
||||||
|
|
||||||
|
waku = await createLightNode({
|
||||||
|
staticNoiseKey: NOISE_KEY_1,
|
||||||
|
pubSubTopics: [customPubSubTopic, DefaultPubSubTopic]
|
||||||
|
});
|
||||||
|
await waku.start();
|
||||||
|
|
||||||
|
await waku.dial(await nwaku.getMultiaddrWithId());
|
||||||
|
await waku.dial(await nwaku2.getMultiaddrWithId());
|
||||||
|
await waitForRemotePeer(waku, [Protocols.Store]);
|
||||||
|
|
||||||
|
let customMessages: IMessage[] = [];
|
||||||
|
let testMessages: IMessage[] = [];
|
||||||
|
|
||||||
|
while (
|
||||||
|
customMessages.length != totalMsgs ||
|
||||||
|
testMessages.length != totalMsgs
|
||||||
|
) {
|
||||||
|
customMessages = await processQueriedMessages(
|
||||||
|
waku,
|
||||||
|
[customTestDecoder],
|
||||||
|
customPubSubTopic
|
||||||
|
);
|
||||||
|
testMessages = await processQueriedMessages(
|
||||||
|
waku,
|
||||||
|
[TestDecoder],
|
||||||
|
DefaultPubSubTopic
|
||||||
|
);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
129
packages/tests/tests/store/order.node.spec.ts
Normal file
129
packages/tests/tests/store/order.node.spec.ts
Normal file
@ -0,0 +1,129 @@
|
|||||||
|
import { DecodedMessage, DefaultPubSubTopic, PageDirection } from "@waku/core";
|
||||||
|
import type { IMessage, LightNode } from "@waku/interfaces";
|
||||||
|
import { expect } from "chai";
|
||||||
|
|
||||||
|
import { makeLogFileName, NimGoNode, tearDownNodes } from "../../src/index.js";
|
||||||
|
|
||||||
|
import {
|
||||||
|
chunkAndReverseArray,
|
||||||
|
sendMessages,
|
||||||
|
startAndConnectLightNode,
|
||||||
|
TestContentTopic,
|
||||||
|
TestDecoder,
|
||||||
|
totalMsgs
|
||||||
|
} from "./utils.js";
|
||||||
|
|
||||||
|
describe("Waku Store, order", function () {
|
||||||
|
this.timeout(15000);
|
||||||
|
let waku: LightNode;
|
||||||
|
let nwaku: NimGoNode;
|
||||||
|
|
||||||
|
beforeEach(async function () {
|
||||||
|
this.timeout(15000);
|
||||||
|
nwaku = new NimGoNode(makeLogFileName(this));
|
||||||
|
await nwaku.startWithRetries({ store: true, lightpush: true, relay: true });
|
||||||
|
await nwaku.ensureSubscriptions();
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(async function () {
|
||||||
|
this.timeout(15000);
|
||||||
|
await tearDownNodes(nwaku, waku);
|
||||||
|
});
|
||||||
|
|
||||||
|
[PageDirection.FORWARD, PageDirection.BACKWARD].forEach((pageDirection) => {
|
||||||
|
it(`Query Generator - ${pageDirection}`, async function () {
|
||||||
|
await sendMessages(
|
||||||
|
nwaku,
|
||||||
|
totalMsgs,
|
||||||
|
TestContentTopic,
|
||||||
|
DefaultPubSubTopic
|
||||||
|
);
|
||||||
|
waku = await startAndConnectLightNode(nwaku);
|
||||||
|
|
||||||
|
const messages: IMessage[] = [];
|
||||||
|
for await (const query of waku.store.queryGenerator([TestDecoder], {
|
||||||
|
pageDirection: pageDirection
|
||||||
|
})) {
|
||||||
|
for await (const msg of query) {
|
||||||
|
if (msg) {
|
||||||
|
messages.push(msg as DecodedMessage);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let expectedPayloads = Array.from(Array(totalMsgs).keys());
|
||||||
|
if (pageDirection === PageDirection.BACKWARD) {
|
||||||
|
expectedPayloads = chunkAndReverseArray(expectedPayloads, 10);
|
||||||
|
}
|
||||||
|
|
||||||
|
expect(messages?.length).eq(totalMsgs);
|
||||||
|
const payloads = messages.map((msg) => msg.payload[0]!);
|
||||||
|
expect(payloads).to.deep.eq(expectedPayloads);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
[PageDirection.FORWARD, PageDirection.BACKWARD].forEach((pageDirection) => {
|
||||||
|
it(`Promise Callback - ${pageDirection}`, async function () {
|
||||||
|
await sendMessages(
|
||||||
|
nwaku,
|
||||||
|
totalMsgs,
|
||||||
|
TestContentTopic,
|
||||||
|
DefaultPubSubTopic
|
||||||
|
);
|
||||||
|
waku = await startAndConnectLightNode(nwaku);
|
||||||
|
|
||||||
|
const messages: IMessage[] = [];
|
||||||
|
await waku.store.queryWithPromiseCallback(
|
||||||
|
[TestDecoder],
|
||||||
|
async (msgPromise) => {
|
||||||
|
const msg = await msgPromise;
|
||||||
|
if (msg) {
|
||||||
|
messages.push(msg);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
pageDirection: pageDirection
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
let expectedPayloads = Array.from(Array(totalMsgs).keys());
|
||||||
|
if (pageDirection === PageDirection.BACKWARD) {
|
||||||
|
expectedPayloads = chunkAndReverseArray(expectedPayloads, 10);
|
||||||
|
}
|
||||||
|
|
||||||
|
expect(messages?.length).eq(totalMsgs);
|
||||||
|
const payloads = messages.map((msg) => msg.payload[0]!);
|
||||||
|
expect(payloads).to.deep.eq(expectedPayloads);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
[PageDirection.FORWARD, PageDirection.BACKWARD].forEach((pageDirection) => {
|
||||||
|
it(`Ordered Callback - ${pageDirection}`, async function () {
|
||||||
|
await sendMessages(
|
||||||
|
nwaku,
|
||||||
|
totalMsgs,
|
||||||
|
TestContentTopic,
|
||||||
|
DefaultPubSubTopic
|
||||||
|
);
|
||||||
|
waku = await startAndConnectLightNode(nwaku);
|
||||||
|
|
||||||
|
const messages: IMessage[] = [];
|
||||||
|
await waku.store.queryWithOrderedCallback(
|
||||||
|
[TestDecoder],
|
||||||
|
async (msg) => {
|
||||||
|
messages.push(msg);
|
||||||
|
},
|
||||||
|
{
|
||||||
|
pageDirection: pageDirection
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
if (pageDirection === PageDirection.BACKWARD) {
|
||||||
|
messages.reverse();
|
||||||
|
}
|
||||||
|
expect(messages?.length).eq(totalMsgs);
|
||||||
|
const payloads = messages.map((msg) => msg.payload[0]!);
|
||||||
|
expect(payloads).to.deep.eq(Array.from(Array(totalMsgs).keys()));
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
99
packages/tests/tests/store/page_size.node.spec.ts
Normal file
99
packages/tests/tests/store/page_size.node.spec.ts
Normal file
@ -0,0 +1,99 @@
|
|||||||
|
import { DefaultPubSubTopic } from "@waku/core";
|
||||||
|
import type { LightNode } from "@waku/interfaces";
|
||||||
|
import { expect } from "chai";
|
||||||
|
|
||||||
|
import { makeLogFileName, NimGoNode, tearDownNodes } from "../../src/index.js";
|
||||||
|
|
||||||
|
import {
|
||||||
|
sendMessages,
|
||||||
|
startAndConnectLightNode,
|
||||||
|
TestContentTopic,
|
||||||
|
TestDecoder
|
||||||
|
} from "./utils.js";
|
||||||
|
|
||||||
|
describe("Waku Store, page size", function () {
|
||||||
|
this.timeout(15000);
|
||||||
|
let waku: LightNode;
|
||||||
|
let nwaku: NimGoNode;
|
||||||
|
|
||||||
|
beforeEach(async function () {
|
||||||
|
this.timeout(15000);
|
||||||
|
nwaku = new NimGoNode(makeLogFileName(this));
|
||||||
|
await nwaku.startWithRetries({ store: true, lightpush: true, relay: true });
|
||||||
|
await nwaku.ensureSubscriptions();
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(async function () {
|
||||||
|
this.timeout(15000);
|
||||||
|
await tearDownNodes(nwaku, waku);
|
||||||
|
});
|
||||||
|
|
||||||
|
[
|
||||||
|
[0, 110],
|
||||||
|
[1, 4],
|
||||||
|
[3, 20],
|
||||||
|
[10, 10],
|
||||||
|
[11, 10],
|
||||||
|
[19, 20],
|
||||||
|
[110, 120]
|
||||||
|
].forEach(([pageSize, messageCount]) => {
|
||||||
|
it(`Passing page size ${pageSize} when there are ${messageCount} messages`, async function () {
|
||||||
|
await sendMessages(
|
||||||
|
nwaku,
|
||||||
|
messageCount,
|
||||||
|
TestContentTopic,
|
||||||
|
DefaultPubSubTopic
|
||||||
|
);
|
||||||
|
|
||||||
|
// Determine effectivePageSize for test expectations
|
||||||
|
let effectivePageSize = pageSize;
|
||||||
|
if (pageSize === 0) {
|
||||||
|
if (nwaku.type() == "go-waku") {
|
||||||
|
effectivePageSize = 100;
|
||||||
|
} else {
|
||||||
|
effectivePageSize = 20;
|
||||||
|
}
|
||||||
|
} else if (pageSize > 100) {
|
||||||
|
effectivePageSize = 100;
|
||||||
|
}
|
||||||
|
|
||||||
|
waku = await startAndConnectLightNode(nwaku);
|
||||||
|
let messagesRetrieved = 0;
|
||||||
|
for await (const query of waku.store.queryGenerator([TestDecoder], {
|
||||||
|
pageSize: pageSize
|
||||||
|
})) {
|
||||||
|
// Calculate expected page size
|
||||||
|
const expectedPageSize = Math.min(
|
||||||
|
effectivePageSize,
|
||||||
|
messageCount - messagesRetrieved
|
||||||
|
);
|
||||||
|
expect(query.length).eq(expectedPageSize);
|
||||||
|
|
||||||
|
for await (const msg of query) {
|
||||||
|
if (msg) {
|
||||||
|
messagesRetrieved++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
expect(messagesRetrieved).eq(messageCount);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
// Possible issue here because pageSize differs across implementations
|
||||||
|
it("Default pageSize", async function () {
|
||||||
|
await sendMessages(nwaku, 20, TestContentTopic, DefaultPubSubTopic);
|
||||||
|
waku = await startAndConnectLightNode(nwaku);
|
||||||
|
|
||||||
|
let messagesRetrieved = 0;
|
||||||
|
for await (const query of waku.store.queryGenerator([TestDecoder])) {
|
||||||
|
expect(query.length).eq(10);
|
||||||
|
for await (const msg of query) {
|
||||||
|
if (msg) {
|
||||||
|
messagesRetrieved++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
expect(messagesRetrieved).eq(20);
|
||||||
|
});
|
||||||
|
});
|
110
packages/tests/tests/store/sorting.node.spec.ts
Normal file
110
packages/tests/tests/store/sorting.node.spec.ts
Normal file
@ -0,0 +1,110 @@
|
|||||||
|
import { DecodedMessage, DefaultPubSubTopic, PageDirection } from "@waku/core";
|
||||||
|
import type { IMessage, LightNode } from "@waku/interfaces";
|
||||||
|
|
||||||
|
import { makeLogFileName, NimGoNode, tearDownNodes } from "../../src/index.js";
|
||||||
|
|
||||||
|
import {
|
||||||
|
sendMessages,
|
||||||
|
startAndConnectLightNode,
|
||||||
|
TestContentTopic,
|
||||||
|
TestDecoder,
|
||||||
|
totalMsgs
|
||||||
|
} from "./utils.js";
|
||||||
|
|
||||||
|
describe("Waku Store, sorting", function () {
|
||||||
|
this.timeout(15000);
|
||||||
|
let waku: LightNode;
|
||||||
|
let nwaku: NimGoNode;
|
||||||
|
|
||||||
|
beforeEach(async function () {
|
||||||
|
this.timeout(15000);
|
||||||
|
nwaku = new NimGoNode(makeLogFileName(this));
|
||||||
|
await nwaku.startWithRetries({ store: true, lightpush: true, relay: true });
|
||||||
|
await nwaku.ensureSubscriptions();
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(async function () {
|
||||||
|
this.timeout(15000);
|
||||||
|
await tearDownNodes(nwaku, waku);
|
||||||
|
});
|
||||||
|
|
||||||
|
[PageDirection.FORWARD, PageDirection.BACKWARD].forEach((pageDirection) => {
|
||||||
|
it(`Query Generator sorting by timestamp while page direction is ${pageDirection}`, async function () {
|
||||||
|
await sendMessages(
|
||||||
|
nwaku,
|
||||||
|
totalMsgs,
|
||||||
|
TestContentTopic,
|
||||||
|
DefaultPubSubTopic
|
||||||
|
);
|
||||||
|
waku = await startAndConnectLightNode(nwaku);
|
||||||
|
|
||||||
|
for await (const query of waku.store.queryGenerator([TestDecoder], {
|
||||||
|
pageDirection: PageDirection.FORWARD
|
||||||
|
})) {
|
||||||
|
const page: IMessage[] = [];
|
||||||
|
for await (const msg of query) {
|
||||||
|
if (msg) {
|
||||||
|
page.push(msg as DecodedMessage);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Extract timestamps
|
||||||
|
const timestamps = page.map(
|
||||||
|
(msg) => msg.timestamp as unknown as bigint
|
||||||
|
);
|
||||||
|
// Check if timestamps are sorted
|
||||||
|
for (let i = 1; i < timestamps.length; i++) {
|
||||||
|
if (timestamps[i] < timestamps[i - 1]) {
|
||||||
|
throw new Error(
|
||||||
|
`Messages are not sorted by timestamp. Found out of order at index ${i}`
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
[PageDirection.FORWARD, PageDirection.BACKWARD].forEach((pageDirection) => {
|
||||||
|
it(`Ordered Callback sorting by timestamp while page direction is ${pageDirection}`, async function () {
|
||||||
|
await sendMessages(
|
||||||
|
nwaku,
|
||||||
|
totalMsgs,
|
||||||
|
TestContentTopic,
|
||||||
|
DefaultPubSubTopic
|
||||||
|
);
|
||||||
|
waku = await startAndConnectLightNode(nwaku);
|
||||||
|
|
||||||
|
const messages: IMessage[] = [];
|
||||||
|
await waku.store.queryWithOrderedCallback(
|
||||||
|
[TestDecoder],
|
||||||
|
async (msg) => {
|
||||||
|
messages.push(msg);
|
||||||
|
},
|
||||||
|
{
|
||||||
|
pageDirection: pageDirection
|
||||||
|
}
|
||||||
|
);
|
||||||
|
// Extract timestamps
|
||||||
|
const timestamps = messages.map(
|
||||||
|
(msg) => msg.timestamp as unknown as bigint
|
||||||
|
);
|
||||||
|
// Check if timestamps are sorted
|
||||||
|
for (let i = 1; i < timestamps.length; i++) {
|
||||||
|
if (
|
||||||
|
pageDirection === PageDirection.FORWARD &&
|
||||||
|
timestamps[i] < timestamps[i - 1]
|
||||||
|
) {
|
||||||
|
throw new Error(
|
||||||
|
`Messages are not sorted by timestamp in FORWARD direction. Found out of order at index ${i}`
|
||||||
|
);
|
||||||
|
} else if (
|
||||||
|
pageDirection === PageDirection.BACKWARD &&
|
||||||
|
timestamps[i] > timestamps[i - 1]
|
||||||
|
) {
|
||||||
|
throw new Error(
|
||||||
|
`Messages are not sorted by timestamp in BACKWARD direction. Found out of order at index ${i}`
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
119
packages/tests/tests/store/time_filter.node.spec.ts
Normal file
119
packages/tests/tests/store/time_filter.node.spec.ts
Normal file
@ -0,0 +1,119 @@
|
|||||||
|
import type { IMessage, LightNode } from "@waku/interfaces";
|
||||||
|
import { expect } from "chai";
|
||||||
|
|
||||||
|
import { makeLogFileName, NimGoNode, tearDownNodes } from "../../src/index.js";
|
||||||
|
|
||||||
|
import {
|
||||||
|
adjustDate,
|
||||||
|
startAndConnectLightNode,
|
||||||
|
TestContentTopic,
|
||||||
|
TestDecoder
|
||||||
|
} from "./utils.js";
|
||||||
|
|
||||||
|
describe("Waku Store, time filter", function () {
|
||||||
|
this.timeout(15000);
|
||||||
|
let waku: LightNode;
|
||||||
|
let nwaku: NimGoNode;
|
||||||
|
|
||||||
|
beforeEach(async function () {
|
||||||
|
this.timeout(15000);
|
||||||
|
nwaku = new NimGoNode(makeLogFileName(this));
|
||||||
|
await nwaku.startWithRetries({ store: true, lightpush: true, relay: true });
|
||||||
|
await nwaku.ensureSubscriptions();
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(async function () {
|
||||||
|
this.timeout(15000);
|
||||||
|
await tearDownNodes(nwaku, waku);
|
||||||
|
});
|
||||||
|
|
||||||
|
[
|
||||||
|
[-19000, -10, 10],
|
||||||
|
[-19000, 1, 4],
|
||||||
|
[-19000, -2, -1],
|
||||||
|
// [-19000, 0, 1000], // skipped for now because it fails on gowaku which returns messages > startTime
|
||||||
|
[-19000, -1000, 0],
|
||||||
|
[19000, -10, 10], // message in the future
|
||||||
|
[-19000, 10, -10] // startTime is newer than endTime
|
||||||
|
].forEach(([msgTime, startTime, endTime]) => {
|
||||||
|
it(`msgTime: ${msgTime} ms from now, startTime: ${
|
||||||
|
msgTime + startTime
|
||||||
|
}, endTime: ${msgTime + endTime}`, async function () {
|
||||||
|
const msgTimestamp = adjustDate(new Date(), msgTime);
|
||||||
|
expect(
|
||||||
|
await nwaku.sendMessage(
|
||||||
|
NimGoNode.toMessageRpcQuery({
|
||||||
|
payload: new Uint8Array([0]),
|
||||||
|
contentTopic: TestContentTopic,
|
||||||
|
timestamp: msgTimestamp
|
||||||
|
})
|
||||||
|
)
|
||||||
|
).to.be.true;
|
||||||
|
|
||||||
|
waku = await startAndConnectLightNode(nwaku);
|
||||||
|
|
||||||
|
const messages: IMessage[] = [];
|
||||||
|
await waku.store.queryWithOrderedCallback(
|
||||||
|
[TestDecoder],
|
||||||
|
(msg) => {
|
||||||
|
if (msg) {
|
||||||
|
messages.push(msg);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
timeFilter: {
|
||||||
|
startTime: adjustDate(msgTimestamp, startTime),
|
||||||
|
endTime: adjustDate(msgTimestamp, endTime)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
// in this context 0 is the messageTimestamp
|
||||||
|
if (
|
||||||
|
(startTime > 0 && endTime > 0) ||
|
||||||
|
(startTime < 0 && endTime < 0) ||
|
||||||
|
startTime > endTime
|
||||||
|
) {
|
||||||
|
expect(messages.length).eq(0);
|
||||||
|
} else {
|
||||||
|
expect(messages.length).eq(1);
|
||||||
|
expect(messages[0].payload![0]!).eq(0);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
[-20000, 40000].forEach((msgTime) => {
|
||||||
|
it(`Timestamp too far from node time: ${msgTime} ms from now`, async function () {
|
||||||
|
const msgTimestamp = adjustDate(new Date(), msgTime);
|
||||||
|
expect(
|
||||||
|
await nwaku.sendMessage(
|
||||||
|
NimGoNode.toMessageRpcQuery({
|
||||||
|
payload: new Uint8Array([0]),
|
||||||
|
contentTopic: TestContentTopic,
|
||||||
|
timestamp: msgTimestamp
|
||||||
|
})
|
||||||
|
)
|
||||||
|
).to.be.true;
|
||||||
|
|
||||||
|
waku = await startAndConnectLightNode(nwaku);
|
||||||
|
|
||||||
|
const messages: IMessage[] = [];
|
||||||
|
await waku.store.queryWithOrderedCallback(
|
||||||
|
[TestDecoder],
|
||||||
|
(msg) => {
|
||||||
|
if (msg) {
|
||||||
|
messages.push(msg);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
timeFilter: {
|
||||||
|
startTime: adjustDate(msgTimestamp, -1000),
|
||||||
|
endTime: adjustDate(msgTimestamp, 1000)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
expect(messages.length).eq(0);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
97
packages/tests/tests/store/utils.ts
Normal file
97
packages/tests/tests/store/utils.ts
Normal file
@ -0,0 +1,97 @@
|
|||||||
|
import {
|
||||||
|
createDecoder,
|
||||||
|
createEncoder,
|
||||||
|
DecodedMessage,
|
||||||
|
Decoder,
|
||||||
|
DefaultPubSubTopic,
|
||||||
|
waitForRemotePeer
|
||||||
|
} from "@waku/core";
|
||||||
|
import { LightNode, Protocols } from "@waku/interfaces";
|
||||||
|
import { createLightNode } from "@waku/sdk";
|
||||||
|
import { expect } from "chai";
|
||||||
|
import debug from "debug";
|
||||||
|
|
||||||
|
import { delay, NimGoNode, NOISE_KEY_1 } from "../../src";
|
||||||
|
|
||||||
|
export const log = debug("waku:test:store");
|
||||||
|
|
||||||
|
export const TestContentTopic = "/test/1/waku-store/utf8";
|
||||||
|
export const TestEncoder = createEncoder({ contentTopic: TestContentTopic });
|
||||||
|
export const TestDecoder = createDecoder(TestContentTopic);
|
||||||
|
export const customContentTopic = "/test/2/waku-store/utf8";
|
||||||
|
export const customPubSubTopic = "/waku/2/custom-dapp/proto";
|
||||||
|
export const customTestDecoder = createDecoder(
|
||||||
|
customContentTopic,
|
||||||
|
customPubSubTopic
|
||||||
|
);
|
||||||
|
export const totalMsgs = 20;
|
||||||
|
export const messageText = "Store Push works!";
|
||||||
|
|
||||||
|
export async function sendMessages(
|
||||||
|
instance: NimGoNode,
|
||||||
|
numMessages: number,
|
||||||
|
contentTopic: string,
|
||||||
|
pubSubTopic: string
|
||||||
|
): Promise<void> {
|
||||||
|
for (let i = 0; i < numMessages; i++) {
|
||||||
|
expect(
|
||||||
|
await instance.sendMessage(
|
||||||
|
NimGoNode.toMessageRpcQuery({
|
||||||
|
payload: new Uint8Array([i]),
|
||||||
|
contentTopic: contentTopic
|
||||||
|
}),
|
||||||
|
pubSubTopic
|
||||||
|
)
|
||||||
|
).to.be.true;
|
||||||
|
await delay(1); // to ensure each timestamp is unique.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function processQueriedMessages(
|
||||||
|
instance: LightNode,
|
||||||
|
decoders: Array<Decoder>,
|
||||||
|
expectedTopic?: string
|
||||||
|
): Promise<DecodedMessage[]> {
|
||||||
|
const localMessages: DecodedMessage[] = [];
|
||||||
|
for await (const query of instance.store.queryGenerator(decoders)) {
|
||||||
|
for await (const msg of query) {
|
||||||
|
if (msg) {
|
||||||
|
expect(msg.pubSubTopic).to.eq(expectedTopic);
|
||||||
|
localMessages.push(msg as DecodedMessage);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return localMessages;
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function startAndConnectLightNode(
|
||||||
|
instance: NimGoNode,
|
||||||
|
pubSubTopics: string[] = [DefaultPubSubTopic]
|
||||||
|
): Promise<LightNode> {
|
||||||
|
const waku = await createLightNode({
|
||||||
|
pubSubTopics: pubSubTopics,
|
||||||
|
staticNoiseKey: NOISE_KEY_1
|
||||||
|
});
|
||||||
|
await waku.start();
|
||||||
|
await waku.dial(await instance.getMultiaddrWithId());
|
||||||
|
await waitForRemotePeer(waku, [Protocols.Store]);
|
||||||
|
log("Waku node created");
|
||||||
|
return waku;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function chunkAndReverseArray(
|
||||||
|
arr: number[],
|
||||||
|
chunkSize: number
|
||||||
|
): number[] {
|
||||||
|
const result: number[] = [];
|
||||||
|
for (let i = 0; i < arr.length; i += chunkSize) {
|
||||||
|
result.push(...arr.slice(i, i + chunkSize).reverse());
|
||||||
|
}
|
||||||
|
return result.reverse();
|
||||||
|
}
|
||||||
|
|
||||||
|
export const adjustDate = (baseDate: Date, adjustMs: number): Date => {
|
||||||
|
const adjusted = new Date(baseDate);
|
||||||
|
adjusted.setTime(adjusted.getTime() + adjustMs);
|
||||||
|
return adjusted;
|
||||||
|
};
|
Loading…
x
Reference in New Issue
Block a user