intermediat commit

This commit is contained in:
fbarbu15 2023-09-29 14:03:43 +03:00
parent a3c45b6e1a
commit f307e9b6c6
No known key found for this signature in database
GPG Key ID: D75221C8DEA22501
13 changed files with 725 additions and 30 deletions

View File

@ -168,8 +168,8 @@ export class NimGoNode {
async startWithRetries(
args: Args,
options: {
retries: number;
}
retries?: number;
} = { retries: 3 }
): Promise<void> {
await pRetry(
async () => {

View File

@ -1,23 +1,46 @@
import { LightNode } from "@waku/interfaces";
import debug from "debug";
import pRetry from "p-retry";
import { NimGoNode } from "./index.js";
const log = debug("waku:test");
export function tearDownNodes(
export async function tearDownNodes(
nwakuNodes: NimGoNode[],
wakuNodes: LightNode[]
): void {
nwakuNodes.forEach((nwaku) => {
): Promise<void> {
const stopNwakuNodes = nwakuNodes.map(async (nwaku) => {
if (nwaku) {
nwaku.stop().catch((e) => log("Nwaku failed to stop", e));
await pRetry(
async () => {
try {
await nwaku.stop();
} catch (error) {
log("Nwaku failed to stop:", error);
throw error;
}
},
{ retries: 3 }
);
}
});
wakuNodes.forEach((waku) => {
const stopWakuNodes = wakuNodes.map(async (waku) => {
if (waku) {
waku.stop().catch((e) => log("Waku failed to stop", e));
await pRetry(
async () => {
try {
await waku.stop();
} catch (error) {
log("Waku failed to stop:", error);
throw error;
}
},
{ retries: 3 }
);
}
});
await Promise.all([...stopNwakuNodes, ...stopWakuNodes]);
}

View File

@ -28,7 +28,8 @@ describe("Waku Filter V2: Ping", function () {
});
this.afterEach(async function () {
tearDownNodes([nwaku], [waku]);
this.timeout(15000);
await tearDownNodes([nwaku], [waku]);
});
it("Ping on subscribed peer", async function () {

View File

@ -37,7 +37,8 @@ describe("Waku Filter V2: FilterPush", function () {
});
this.afterEach(async function () {
tearDownNodes([nwaku], [waku]);
this.timeout(15000);
await tearDownNodes([nwaku], [waku]);
});
TEST_STRING.forEach((testItem) => {

View File

@ -48,7 +48,8 @@ describe("Waku Filter V2: Subscribe", function () {
});
this.afterEach(async function () {
tearDownNodes([nwaku, nwaku2], [waku]);
this.timeout(15000);
await tearDownNodes([nwaku, nwaku2], [waku]);
});
it("Subscribe and receive messages via lightPush", async function () {

View File

@ -34,7 +34,8 @@ describe("Waku Filter V2: Unsubscribe", function () {
});
this.afterEach(async function () {
tearDownNodes([nwaku], [waku]);
this.timeout(15000);
await tearDownNodes([nwaku], [waku]);
});
it("Unsubscribe 1 topic - node subscribed to 1 topic", async function () {

View File

@ -67,14 +67,7 @@ export async function runNodes(
currentTest: Context
): Promise<[NimGoNode, LightNode]> {
const nwaku = new NimGoNode(makeLogFileName(currentTest));
await nwaku.startWithRetries(
{
filter: true,
lightpush: true,
relay: true
},
{ retries: 3 }
);
await nwaku.startWithRetries({ filter: true, lightpush: true, relay: true });
let waku: LightNode | undefined;
try {

View File

@ -26,7 +26,8 @@ describe("Waku Light Push [node only] - custom pubsub topic", function () {
});
this.afterEach(async function () {
tearDownNodes([nwaku], [waku]);
this.timeout(15000);
await tearDownNodes([nwaku], [waku]);
});
it("Push message", async function () {

View File

@ -39,7 +39,8 @@ describe("Waku Light Push [node only]", function () {
});
this.afterEach(async function () {
tearDownNodes([nwaku], [waku]);
this.timeout(15000);
await tearDownNodes([nwaku], [waku]);
});
TEST_STRING.forEach((testItem) => {

View File

@ -18,14 +18,11 @@ export async function runNodes(
): Promise<[NimGoNode, LightNode]> {
const nwakuOptional = pubSubTopic ? { topic: pubSubTopic } : {};
const nwaku = new NimGoNode(makeLogFileName(context));
await nwaku.startWithRetries(
{
lightpush: true,
relay: true,
...nwakuOptional
},
{ retries: 3 }
);
await nwaku.startWithRetries({
lightpush: true,
relay: true,
...nwakuOptional
});
let waku: LightNode | undefined;
try {

View File

@ -0,0 +1,87 @@
import { createDecoder, waitForRemotePeer } from "@waku/core";
import type { IMessage, LightNode } from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import { createLightNode } from "@waku/sdk";
import { expect } from "chai";
import {
makeLogFileName,
NimGoNode,
NOISE_KEY_1,
tearDownNodes
} from "../../src/index.js";
const customPubSubTopic = "/waku/2/custom-dapp/proto";
const TestContentTopic = "/test/1/waku-store/utf8";
const CustomPubSubTestDecoder = createDecoder(
TestContentTopic,
customPubSubTopic
);
describe("Waku Store, custom pubsub topic", () => {
let waku: LightNode;
let nwaku: NimGoNode;
beforeEach(async function () {
this.timeout(15000);
nwaku = new NimGoNode(makeLogFileName(this));
await nwaku.startWithRetries({
store: true,
topic: customPubSubTopic,
relay: true
});
});
afterEach(async function () {
this.timeout(15000);
await tearDownNodes([nwaku], [waku]);
});
it("Generator, custom pubsub topic", async function () {
this.timeout(15000);
const totalMsgs = 20;
for (let i = 0; i < totalMsgs; i++) {
expect(
await nwaku.sendMessage(
NimGoNode.toMessageRpcQuery({
payload: new Uint8Array([i]),
contentTopic: TestContentTopic
}),
customPubSubTopic
)
).to.be.true;
}
waku = await createLightNode({
staticNoiseKey: NOISE_KEY_1,
pubSubTopics: [customPubSubTopic]
});
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Store]);
const messages: IMessage[] = [];
let promises: Promise<void>[] = [];
for await (const msgPromises of waku.store.queryGenerator([
CustomPubSubTestDecoder
])) {
const _promises = msgPromises.map(async (promise) => {
const msg = await promise;
if (msg) {
messages.push(msg);
expect(msg.pubSubTopic).to.eq(customPubSubTopic);
}
});
promises = promises.concat(_promises);
}
await Promise.all(promises);
expect(messages?.length).eq(totalMsgs);
const result = messages?.findIndex((msg) => {
return msg.payload![0]! === 0;
});
expect(result).to.not.eq(-1);
});
});

View File

@ -0,0 +1,529 @@
import {
createCursor,
createDecoder,
createEncoder,
DecodedMessage,
DefaultPubSubTopic,
PageDirection,
waitForRemotePeer
} from "@waku/core";
import type { IMessage, LightNode } from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import {
createDecoder as createEciesDecoder,
createEncoder as createEciesEncoder,
generatePrivateKey,
getPublicKey
} from "@waku/message-encryption/ecies";
import {
createDecoder as createSymDecoder,
createEncoder as createSymEncoder,
generateSymmetricKey
} from "@waku/message-encryption/symmetric";
import { createLightNode } from "@waku/sdk";
import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai";
import debug from "debug";
import {
delay,
makeLogFileName,
NimGoNode,
NOISE_KEY_1,
NOISE_KEY_2,
tearDownNodes
} from "../../src/index.js";
import {
processMessages,
sendMessages,
startAndConnectLightNode
} from "./utils.js";
const log = debug("waku:test:store");
const TestContentTopic = "/test/1/waku-store/utf8";
const TestEncoder = createEncoder({ contentTopic: TestContentTopic });
const TestDecoder = createDecoder(TestContentTopic);
describe.only("Waku Store", function () {
this.timeout(15000);
let waku: LightNode;
let nwaku: NimGoNode;
beforeEach(async function () {
this.timeout(15000);
nwaku = new NimGoNode(makeLogFileName(this));
await nwaku.startWithRetries({ store: true, lightpush: true, relay: true });
});
afterEach(async function () {
this.timeout(15000);
await tearDownNodes([nwaku], [waku]);
});
it("Generator", async function () {
const totalMsgs = 20;
await sendMessages(nwaku, totalMsgs, TestContentTopic, DefaultPubSubTopic);
waku = await startAndConnectLightNode(nwaku);
const messages = await processMessages(
waku,
[TestDecoder],
DefaultPubSubTopic
);
expect(messages?.length).eq(totalMsgs);
const result = messages?.findIndex((msg) => {
return msg.payload[0]! === 0;
});
expect(result).to.not.eq(-1);
});
it.only("Generator, no message returned", async function () {
waku = await startAndConnectLightNode(nwaku);
const messages = await processMessages(
waku,
[TestDecoder],
DefaultPubSubTopic
);
expect(messages?.length).eq(0);
});
it("Passing a cursor", async function () {
this.timeout(4_000);
const totalMsgs = 20;
for (let i = 0; i < totalMsgs; i++) {
expect(
await nwaku.sendMessage(
NimGoNode.toMessageRpcQuery({
payload: utf8ToBytes(`Message ${i}`),
contentTopic: TestContentTopic
})
)
).to.be.true;
}
waku = await createLightNode({
staticNoiseKey: NOISE_KEY_1
});
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Store]);
const query = waku.store.queryGenerator([TestDecoder]);
// messages in reversed order (first message at last index)
const messages: DecodedMessage[] = [];
for await (const page of query) {
for await (const msg of page.reverse()) {
messages.push(msg as DecodedMessage);
}
}
// index 2 would mean the third last message sent
const cursorIndex = 2;
// create cursor to extract messages after the 3rd index
const cursor = await createCursor(messages[cursorIndex]);
const messagesAfterCursor: DecodedMessage[] = [];
for await (const page of waku.store.queryGenerator([TestDecoder], {
cursor
})) {
for await (const msg of page.reverse()) {
messagesAfterCursor.push(msg as DecodedMessage);
}
}
const testMessage = messagesAfterCursor[0];
expect(messages.length).be.eq(totalMsgs);
expect(bytesToUtf8(testMessage.payload)).to.be.eq(
bytesToUtf8(messages[cursorIndex + 1].payload)
);
});
it("Callback on promise", async function () {
this.timeout(15_000);
const totalMsgs = 15;
for (let i = 0; i < totalMsgs; i++) {
expect(
await nwaku.sendMessage(
NimGoNode.toMessageRpcQuery({
payload: new Uint8Array([i]),
contentTopic: TestContentTopic
})
)
).to.be.true;
}
waku = await createLightNode({
staticNoiseKey: NOISE_KEY_1
});
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Store]);
const messages: IMessage[] = [];
await waku.store.queryWithPromiseCallback(
[TestDecoder],
async (msgPromise) => {
const msg = await msgPromise;
if (msg) {
messages.push(msg);
}
}
);
expect(messages?.length).eq(totalMsgs);
const result = messages?.findIndex((msg) => {
return msg.payload[0]! === 0;
});
expect(result).to.not.eq(-1);
});
it("Callback on promise, aborts when callback returns true", async function () {
this.timeout(15_000);
const totalMsgs = 20;
for (let i = 0; i < totalMsgs; i++) {
expect(
await nwaku.sendMessage(
NimGoNode.toMessageRpcQuery({
payload: new Uint8Array([i]),
contentTopic: TestContentTopic
})
)
).to.be.true;
}
waku = await createLightNode({
staticNoiseKey: NOISE_KEY_1
});
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Store]);
const desiredMsgs = 14;
const messages: IMessage[] = [];
await waku.store.queryWithPromiseCallback(
[TestDecoder],
async (msgPromise) => {
const msg = await msgPromise;
if (msg) {
messages.push(msg);
}
return messages.length >= desiredMsgs;
},
{ pageSize: 7 }
);
expect(messages?.length).eq(desiredMsgs);
});
it("Ordered Callback - Forward", async function () {
this.timeout(15_000);
const totalMsgs = 18;
for (let i = 0; i < totalMsgs; i++) {
expect(
await nwaku.sendMessage(
NimGoNode.toMessageRpcQuery({
payload: new Uint8Array([i]),
contentTopic: TestContentTopic
})
)
).to.be.true;
await delay(1); // to ensure each timestamp is unique.
}
waku = await createLightNode({
staticNoiseKey: NOISE_KEY_1
});
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Store]);
const messages: IMessage[] = [];
await waku.store.queryWithOrderedCallback(
[TestDecoder],
async (msg) => {
messages.push(msg);
},
{
pageDirection: PageDirection.FORWARD
}
);
expect(messages?.length).eq(totalMsgs);
const payloads = messages.map((msg) => msg.payload[0]!);
expect(payloads).to.deep.eq(Array.from(Array(totalMsgs).keys()));
});
it("Ordered Callback - Backward", async function () {
this.timeout(15_000);
const totalMsgs = 18;
for (let i = 0; i < totalMsgs; i++) {
expect(
await nwaku.sendMessage(
NimGoNode.toMessageRpcQuery({
payload: new Uint8Array([i]),
contentTopic: TestContentTopic
})
)
).to.be.true;
await delay(1); // to ensure each timestamp is unique.
}
waku = await createLightNode({
staticNoiseKey: NOISE_KEY_1
});
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Store]);
let messages: IMessage[] = [];
await waku.store.queryWithOrderedCallback(
[TestDecoder],
async (msg) => {
messages.push(msg);
},
{
pageDirection: PageDirection.BACKWARD
}
);
messages = messages.reverse();
expect(messages?.length).eq(totalMsgs);
const payloads = messages.map((msg) => msg.payload![0]!);
expect(payloads).to.deep.eq(Array.from(Array(totalMsgs).keys()));
});
it("Generator, with asymmetric & symmetric encrypted messages", async function () {
this.timeout(15_000);
const asymText = "This message is encrypted for me using asymmetric";
const asymTopic = "/test/1/asymmetric/proto";
const symText =
"This message is encrypted for me using symmetric encryption";
const symTopic = "/test/1/symmetric/proto";
const clearText = "This is a clear text message for everyone to read";
const otherText =
"This message is not for and I must not be able to read it";
const timestamp = new Date();
const asymMsg = { payload: utf8ToBytes(asymText), timestamp };
const symMsg = {
payload: utf8ToBytes(symText),
timestamp: new Date(timestamp.valueOf() + 1)
};
const clearMsg = {
payload: utf8ToBytes(clearText),
timestamp: new Date(timestamp.valueOf() + 2)
};
const otherMsg = {
payload: utf8ToBytes(otherText),
timestamp: new Date(timestamp.valueOf() + 3)
};
const privateKey = generatePrivateKey();
const symKey = generateSymmetricKey();
const publicKey = getPublicKey(privateKey);
const eciesEncoder = createEciesEncoder({
contentTopic: asymTopic,
publicKey
});
const symEncoder = createSymEncoder({
contentTopic: symTopic,
symKey
});
const otherEncoder = createEciesEncoder({
contentTopic: TestContentTopic,
publicKey: getPublicKey(generatePrivateKey())
});
const eciesDecoder = createEciesDecoder(asymTopic, privateKey);
const symDecoder = createSymDecoder(symTopic, symKey);
const [waku1, waku2, nimWakuMultiaddr] = await Promise.all([
createLightNode({
staticNoiseKey: NOISE_KEY_1
}).then((waku) => waku.start().then(() => waku)),
createLightNode({
staticNoiseKey: NOISE_KEY_2
}).then((waku) => waku.start().then(() => waku)),
nwaku.getMultiaddrWithId()
]);
log("Waku nodes created");
await Promise.all([
waku1.dial(nimWakuMultiaddr),
waku2.dial(nimWakuMultiaddr)
]);
log("Waku nodes connected to nwaku");
await waitForRemotePeer(waku1, [Protocols.LightPush]);
log("Sending messages using light push");
await Promise.all([
waku1.lightPush.send(eciesEncoder, asymMsg),
waku1.lightPush.send(symEncoder, symMsg),
waku1.lightPush.send(otherEncoder, otherMsg),
waku1.lightPush.send(TestEncoder, clearMsg)
]);
await waitForRemotePeer(waku2, [Protocols.Store]);
const messages: DecodedMessage[] = [];
log("Retrieve messages from store");
for await (const msgPromises of waku2.store.queryGenerator([
eciesDecoder,
symDecoder,
TestDecoder
])) {
for (const promise of msgPromises) {
const msg = await promise;
if (msg) {
messages.push(msg);
}
}
}
// Messages are ordered from oldest to latest within a page (1 page query)
expect(bytesToUtf8(messages[0].payload!)).to.eq(asymText);
expect(bytesToUtf8(messages[1].payload!)).to.eq(symText);
expect(bytesToUtf8(messages[2].payload!)).to.eq(clearText);
expect(messages?.length).eq(3);
!!waku1 && waku1.stop().catch((e) => console.log("Waku failed to stop", e));
!!waku2 && waku2.stop().catch((e) => console.log("Waku failed to stop", e));
});
it("Ordered callback, using start and end time", async function () {
this.timeout(20000);
const now = new Date();
const startTime = new Date();
// Set start time 15 seconds in the past
startTime.setTime(now.getTime() - 15 * 1000);
const message1Timestamp = new Date();
// Set first message was 10 seconds in the past
message1Timestamp.setTime(now.getTime() - 10 * 1000);
const message2Timestamp = new Date();
// Set second message 2 seconds in the past
message2Timestamp.setTime(now.getTime() - 2 * 1000);
const messageTimestamps = [message1Timestamp, message2Timestamp];
const endTime = new Date();
// Set end time 1 second in the past
endTime.setTime(now.getTime() - 1000);
for (let i = 0; i < 2; i++) {
expect(
await nwaku.sendMessage(
NimGoNode.toMessageRpcQuery({
payload: new Uint8Array([i]),
contentTopic: TestContentTopic,
timestamp: messageTimestamps[i]
})
)
).to.be.true;
}
waku = await createLightNode({
staticNoiseKey: NOISE_KEY_1
});
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Store]);
const firstMessages: IMessage[] = [];
await waku.store.queryWithOrderedCallback(
[TestDecoder],
(msg) => {
if (msg) {
firstMessages.push(msg);
}
},
{
timeFilter: { startTime, endTime: message1Timestamp }
}
);
const bothMessages: IMessage[] = [];
await waku.store.queryWithOrderedCallback(
[TestDecoder],
async (msg) => {
bothMessages.push(msg);
},
{
timeFilter: {
startTime,
endTime
}
}
);
expect(firstMessages?.length).eq(1);
expect(firstMessages[0].payload![0]!).eq(0);
expect(bothMessages?.length).eq(2);
});
it("Ordered callback, aborts when callback returns true", async function () {
this.timeout(15_000);
const totalMsgs = 20;
for (let i = 0; i < totalMsgs; i++) {
expect(
await nwaku.sendMessage(
NimGoNode.toMessageRpcQuery({
payload: new Uint8Array([i]),
contentTopic: TestContentTopic
})
)
).to.be.true;
await delay(1); // to ensure each timestamp is unique.
}
waku = await createLightNode({
staticNoiseKey: NOISE_KEY_1
});
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Store]);
const desiredMsgs = 14;
const messages: IMessage[] = [];
await waku.store.queryWithOrderedCallback(
[TestDecoder],
async (msg) => {
messages.push(msg);
return messages.length >= desiredMsgs;
},
{ pageSize: 7 }
);
expect(messages?.length).eq(desiredMsgs);
});
});

View File

@ -0,0 +1,60 @@
import { Decoder, waitForRemotePeer } from "@waku/core";
import { IMessage, LightNode, Protocols } from "@waku/interfaces";
import { createLightNode } from "@waku/sdk";
import { expect } from "chai";
import { delay, NimGoNode, NOISE_KEY_1 } from "../../src";
export async function sendMessages(
instance: NimGoNode,
numMessages: number,
contentTopic: string,
pubSubTopic: string
): Promise<void> {
for (let i = 0; i < numMessages; i++) {
expect(
await instance.sendMessage(
NimGoNode.toMessageRpcQuery({
payload: new Uint8Array([i]),
contentTopic: contentTopic
}),
pubSubTopic
)
).to.be.true;
}
await delay(1); // to ensure each timestamp is unique.
}
export async function processMessages(
instance: LightNode,
decoders: Array<Decoder>,
expectedTopic: string
): Promise<IMessage[]> {
const localMessages: IMessage[] = [];
let localPromises: Promise<void>[] = [];
for await (const msgPromises of instance.store.queryGenerator(decoders)) {
const _promises = msgPromises.map(async (promise) => {
const msg = await promise;
if (msg) {
localMessages.push(msg);
expect(msg.pubSubTopic).to.eq(expectedTopic);
}
});
localPromises = localPromises.concat(_promises);
}
await Promise.all(localPromises);
return localMessages;
}
export async function startAndConnectLightNode(
instance: NimGoNode
): Promise<LightNode> {
const waku = await createLightNode({
staticNoiseKey: NOISE_KEY_1
});
await waku.start();
await waku.dial(await instance.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Store]);
return waku;
}