chore: new lightpush tests (#1571)

* new lightpush tests

* fixes after CI run

* split tests into 2 files

* small fix

* address code review comments

* small fix after CI run

---------

Co-authored-by: Danish Arora <35004822+danisharora099@users.noreply.github.com>
This commit is contained in:
fbarbu15 2023-09-19 13:21:03 +03:00 committed by GitHub
parent 4bce7295e0
commit e284c78701
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 769 additions and 514 deletions

View File

@ -10,3 +10,5 @@ export * from "./constants.js";
export * from "./delay.js";
export * from "./log_file.js";
export * from "./node/node.js";
export * from "./teardown.js";
export * from "./message_collector.js";

View File

@ -0,0 +1,208 @@
import { DecodedMessage, DefaultPubSubTopic } from "@waku/core";
import { bytesToUtf8 } from "@waku/utils/bytes";
import { AssertionError, expect } from "chai";
import debug from "debug";
import { MessageRpcResponse } from "./node/interfaces.js";
import { base64ToUtf8, delay, NimGoNode } from "./index.js";
const log = debug("waku:test");
/**
* Class responsible for collecting messages.
* It provides utility methods to interact with the collected messages,
* and offers a way to wait for incoming messages.
*/
export class MessageCollector {
list: Array<MessageRpcResponse | DecodedMessage> = [];
callback: (msg: DecodedMessage) => void = () => {};
constructor(
private contentTopic: string,
private nwaku?: NimGoNode,
private pubSubTopic = DefaultPubSubTopic
) {
if (!this.nwaku) {
this.callback = (msg: DecodedMessage): void => {
log("Got a message");
this.list.push(msg);
};
}
}
get count(): number {
return this.list.length;
}
getMessage(index: number): MessageRpcResponse | DecodedMessage {
return this.list[index];
}
// Type guard to determine if a message is of type MessageRpcResponse
isMessageRpcResponse(
message: MessageRpcResponse | DecodedMessage
): message is MessageRpcResponse {
return (
("payload" in message && typeof message.payload === "string") ||
!!this.nwaku
);
}
async waitForMessages(
numMessages: number,
timeoutDuration: number = 400
): Promise<boolean> {
const startTime = Date.now();
while (this.count < numMessages) {
if (this.nwaku) {
try {
this.list = await this.nwaku.messages(this.pubSubTopic);
} catch (error) {
log(`Can't retrieve messages because of ${error}`);
await delay(10);
}
}
if (Date.now() - startTime > timeoutDuration * numMessages) {
return false;
}
await delay(10);
}
return true;
}
// Verifies a received message against expected values.
verifyReceivedMessage(
index: number,
options: {
expectedMessageText: string | Uint8Array | undefined;
expectedContentTopic?: string;
expectedPubSubTopic?: string;
expectedVersion?: number;
expectedMeta?: Uint8Array;
expectedEphemeral?: boolean;
expectedTimestamp?: bigint;
checkTimestamp?: boolean; // Used to determine if we need to check the timestamp
}
): void {
expect(this.list.length).to.be.greaterThan(
index,
`The message list does not have a message at index ${index}`
);
const message = this.getMessage(index);
expect(message.contentTopic).to.eq(
options.expectedContentTopic || this.contentTopic,
`Message content topic mismatch. Expected: ${
options.expectedContentTopic || this.contentTopic
}. Got: ${message.contentTopic}`
);
expect(message.version).to.eq(
options.expectedVersion || 0,
`Message version mismatch. Expected: ${
options.expectedVersion || 0
}. Got: ${message.version}`
);
if (message.ephemeral !== undefined) {
expect(message.ephemeral).to.eq(
options.expectedEphemeral !== undefined
? options.expectedEphemeral
: false,
`Message ephemeral value mismatch. Expected: ${
options.expectedEphemeral !== undefined
? options.expectedEphemeral
: false
}. Got: ${message.ephemeral}`
);
}
if (this.isMessageRpcResponse(message)) {
// nwaku message specific assertions
const receivedMessageText = message.payload
? base64ToUtf8(message.payload)
: undefined;
expect(receivedMessageText).to.eq(
options.expectedMessageText,
`Message text mismatch. Expected: ${options.expectedMessageText}. Got: ${receivedMessageText}`
);
if (message.timestamp) {
// In we send timestamp in the request we assert that it matches the timestamp in the response +- 1 sec
// We take the 1s deviation because there are some ms diffs in timestamps, probably because of conversions
if (options.expectedTimestamp !== undefined) {
const lowerBound =
BigInt(options.expectedTimestamp) - BigInt(1000000000);
const upperBound =
BigInt(options.expectedTimestamp) + BigInt(1000000000);
if (
message.timestamp < lowerBound ||
message.timestamp > upperBound
) {
throw new AssertionError(
`Message timestamp not within the expected range. Expected between: ${lowerBound} and ${upperBound}. Got: ${message.timestamp}`
);
}
}
// In we don't send timestamp in the request we assert that the timestamp in the response is between now and (now-10s)
else {
const now = BigInt(Date.now()) * BigInt(1_000_000);
const tenSecondsAgo = now - BigInt(10_000_000_000);
if (message.timestamp < tenSecondsAgo || message.timestamp > now) {
throw new AssertionError(
`Message timestamp not within the expected range. Expected between: ${tenSecondsAgo} and ${now}. Got: ${message.timestamp}`
);
}
}
}
} else {
// js-waku message specific assertions
expect(message.pubSubTopic).to.eq(
options.expectedPubSubTopic || DefaultPubSubTopic,
`Message pub/sub topic mismatch. Expected: ${
options.expectedPubSubTopic || DefaultPubSubTopic
}. Got: ${message.pubSubTopic}`
);
expect(bytesToUtf8(message.payload)).to.eq(
options.expectedMessageText,
`Message text mismatch. Expected: ${
options.expectedMessageText
}. Got: ${bytesToUtf8(message.payload)}`
);
const shouldCheckTimestamp =
options.checkTimestamp !== undefined ? options.checkTimestamp : true;
if (shouldCheckTimestamp && message.timestamp) {
const now = Date.now();
const tenSecondsAgo = now - 10000;
expect(message.timestamp.getTime()).to.be.within(
tenSecondsAgo,
now,
`Message timestamp not within the expected range. Expected between: ${tenSecondsAgo} and ${now}. Got: ${message.timestamp.getTime()}`
);
}
expect([
options.expectedMeta,
undefined,
new Uint8Array(0)
]).to.deep.include(
message.meta,
`Message meta mismatch. Expected: ${
options.expectedMeta
? JSON.stringify(options.expectedMeta)
: "undefined or " + JSON.stringify(new Uint8Array(0))
}. Got: ${JSON.stringify(message.meta)}`
);
}
}
}

View File

@ -52,4 +52,5 @@ export interface MessageRpcResponse {
contentTopic?: string;
version?: number;
timestamp?: bigint; // Unix epoch time in nanoseconds as a 64-bits integer value.
ephemeral?: boolean;
}

View File

@ -5,6 +5,7 @@ import { DefaultPubSubTopic } from "@waku/core";
import { isDefined } from "@waku/utils";
import { bytesToHex, hexToBytes } from "@waku/utils/bytes";
import debug from "debug";
import pRetry from "p-retry";
import portfinder from "portfinder";
import { existsAsync, mkdirAsync, openAsync } from "../async_fs.js";
@ -164,6 +165,25 @@ export class NimGoNode {
}
}
async startWithRetries(
args: Args,
options: {
retries: number;
}
): Promise<void> {
await pRetry(
async () => {
try {
await this.start(args);
} catch (error) {
log("Nwaku node failed to start:", error);
throw error;
}
},
{ retries: options.retries }
);
}
public async stop(): Promise<void> {
await this.docker?.container?.stop();
delete this.docker;

View File

@ -0,0 +1,23 @@
import { LightNode } from "@waku/interfaces";
import debug from "debug";
import { NimGoNode } from "./index.js";
const log = debug("waku:test");
export function tearDownNodes(
nwakuNodes: NimGoNode[],
wakuNodes: LightNode[]
): void {
nwakuNodes.forEach((nwaku) => {
if (nwaku) {
nwaku.stop().catch((e) => log("Nwaku failed to stop", e));
}
});
wakuNodes.forEach((waku) => {
if (waku) {
waku.stop().catch((e) => log("Waku failed to stop", e));
}
});
}

View File

@ -1,254 +0,0 @@
import {
createDecoder,
createEncoder,
DecodedMessage,
Decoder,
DefaultPubSubTopic,
Encoder,
waitForRemotePeer
} from "@waku/core";
import { IFilterSubscription, LightNode, Protocols } from "@waku/interfaces";
// import { createLightNode } from "@waku/sdk";
import { createLightNode } from "@waku/sdk";
import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai";
import debug from "debug";
import { Context } from "mocha";
import pRetry from "p-retry";
import {
delay,
makeLogFileName,
NimGoNode,
NOISE_KEY_1
} from "../../src/index.js";
// Constants for test configuration.
export const log = debug("waku:test:filter");
export const TestContentTopic = "/test/1/waku-filter";
export const TestEncoder = createEncoder({ contentTopic: TestContentTopic });
export const TestDecoder = createDecoder(TestContentTopic);
export const messageText = "Filtering works!";
export const messagePayload = { payload: utf8ToBytes(messageText) };
/**
* Class responsible for collecting messages.
* It provides utility methods to interact with the collected messages,
* and offers a way to wait for incoming messages.
*/
export class MessageCollector {
list: Array<DecodedMessage> = [];
// Callback to handle incoming messages.
callback = (msg: DecodedMessage): void => {
log("Got a message");
this.list.push(msg);
};
get count(): number {
return this.list.length;
}
getMessage(index: number): DecodedMessage {
return this.list[index];
}
async waitForMessages(
numMessages: number,
timeoutDuration: number = 400
): Promise<boolean> {
const startTime = Date.now();
while (this.count < numMessages) {
if (Date.now() - startTime > timeoutDuration * numMessages) {
return false;
}
await delay(10);
}
return true;
}
// Verifies a received message against expected values.
verifyReceivedMessage(options: {
index: number;
expectedContentTopic?: string;
expectedPubSubTopic?: string;
expectedMessageText?: string | Uint8Array;
expectedVersion?: number;
expectedMeta?: Uint8Array;
expectedEphemeral?: boolean;
checkTimestamp?: boolean; // Used to determine if we need to check the timestamp
}): void {
expect(this.list.length).to.be.greaterThan(
options.index,
`The message list does not have a message at index ${options.index}`
);
const message = this.getMessage(options.index);
expect(message.contentTopic).to.eq(
options.expectedContentTopic || TestContentTopic,
`Message content topic mismatch. Expected: ${
options.expectedContentTopic || TestContentTopic
}. Got: ${message.contentTopic}`
);
expect(message.pubSubTopic).to.eq(
options.expectedPubSubTopic || DefaultPubSubTopic,
`Message pub/sub topic mismatch. Expected: ${
options.expectedPubSubTopic || DefaultPubSubTopic
}. Got: ${message.pubSubTopic}`
);
expect(bytesToUtf8(message.payload)).to.eq(
options.expectedMessageText || messageText,
`Message text mismatch. Expected: ${
options.expectedMessageText || messageText
}. Got: ${bytesToUtf8(message.payload)}`
);
expect(message.version).to.eq(
options.expectedVersion || 0,
`Message version mismatch. Expected: ${
options.expectedVersion || 0
}. Got: ${message.version}`
);
const shouldCheckTimestamp =
options.checkTimestamp !== undefined ? options.checkTimestamp : true;
if (shouldCheckTimestamp && message.timestamp) {
const now = Date.now();
const tenSecondsAgo = now - 10000;
expect(message.timestamp.getTime()).to.be.within(
tenSecondsAgo,
now,
`Message timestamp not within the expected range. Expected between: ${tenSecondsAgo} and ${now}. Got: ${message.timestamp.getTime()}`
);
}
expect([
options.expectedMeta,
undefined,
new Uint8Array(0)
]).to.deep.include(
message.meta,
`Message meta mismatch. Expected: ${
options.expectedMeta
? JSON.stringify(options.expectedMeta)
: "undefined or " + JSON.stringify(new Uint8Array(0))
}. Got: ${JSON.stringify(message.meta)}`
);
expect(message.ephemeral).to.eq(
options.expectedEphemeral !== undefined
? options.expectedEphemeral
: false,
`Message ephemeral value mismatch. Expected: ${
options.expectedEphemeral !== undefined
? options.expectedEphemeral
: false
}. Got: ${message.ephemeral}`
);
}
}
// Utility to generate test data for multiple topics tests.
export function generateTestData(topicCount: number): {
contentTopics: string[];
encoders: Encoder[];
decoders: Decoder[];
} {
const contentTopics = Array.from(
{ length: topicCount },
(_, i) => `/test/${i + 1}/waku-multi`
);
const encoders = contentTopics.map((topic) =>
createEncoder({ contentTopic: topic })
);
const decoders = contentTopics.map((topic) => createDecoder(topic));
return {
contentTopics,
encoders,
decoders
};
}
// Utility to validate errors related to pings in the subscription.
export async function validatePingError(
subscription: IFilterSubscription
): Promise<void> {
try {
await subscription.ping();
throw new Error(
"Ping was successful but was expected to fail with a specific error."
);
} catch (err) {
if (
err instanceof Error &&
err.message.includes("peer has no subscriptions")
) {
return;
} else {
throw err;
}
}
}
interface SetupReturn {
nwaku: NimGoNode;
waku: LightNode;
subscription: IFilterSubscription;
messageCollector: MessageCollector;
}
// Setup before each test to initialize nodes and message collector.
export async function setupNodes(currentTest: Context): Promise<SetupReturn> {
const nwaku = new NimGoNode(makeLogFileName(currentTest));
// Sometimes the node setup fails, when that happens we retry it max 3 times.
await pRetry(
async () => {
try {
await nwaku.start({
filter: true,
lightpush: true,
relay: true
});
} catch (error) {
log("nwaku node failed to start:", error);
throw error;
}
},
{ retries: 3 }
);
let waku: LightNode | undefined;
try {
waku = await createLightNode({
staticNoiseKey: NOISE_KEY_1,
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }
});
await waku.start();
} catch (error) {
log("jswaku node failed to start:", error);
}
if (waku) {
await waku.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]);
const subscription = await waku.filter.createSubscription();
const messageCollector = new MessageCollector();
return { nwaku, waku, subscription, messageCollector };
} else {
throw new Error("Failed to initialize waku");
}
}
export function tearDownNodes(
nwaku: NimGoNode,
waku: LightNode,
nwaku2?: NimGoNode
): void {
!!nwaku && nwaku.stop().catch((e) => log("Nwaku failed to stop", e));
!!nwaku2 && nwaku2.stop().catch((e) => log("Nwaku2 failed to stop", e));
!!waku && waku.stop().catch((e) => log("Waku failed to stop", e));
}

View File

@ -2,17 +2,15 @@ import type { IFilterSubscription, LightNode } from "@waku/interfaces";
import { utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai";
import { NimGoNode } from "../../src/index.js";
import { MessageCollector, NimGoNode, tearDownNodes } from "../../src/index.js";
import {
MessageCollector,
setupNodes,
tearDownNodes,
runNodes,
TestContentTopic,
TestDecoder,
TestEncoder,
validatePingError
} from "./filter_test_utils.js";
} from "./utils.js";
describe("Waku Filter V2: Ping", function () {
// Set the timeout for all tests in this suite. Can be overwritten at test level
@ -22,17 +20,15 @@ describe("Waku Filter V2: Ping", function () {
let subscription: IFilterSubscription;
let messageCollector: MessageCollector;
this.afterEach(async function () {
tearDownNodes(nwaku, waku);
});
this.beforeEach(async function () {
this.timeout(15000);
const setup = await setupNodes(this);
nwaku = setup.nwaku;
waku = setup.waku;
subscription = setup.subscription;
messageCollector = setup.messageCollector;
[nwaku, waku] = await runNodes(this);
subscription = await waku.filter.createSubscription();
messageCollector = new MessageCollector(TestContentTopic);
});
this.afterEach(async function () {
tearDownNodes([nwaku], [waku]);
});
it("Ping on subscribed peer", async function () {

View File

@ -6,20 +6,20 @@ import { expect } from "chai";
import {
delay,
MessageCollector,
NimGoNode,
tearDownNodes,
TEST_STRING,
TEST_TIMESTAMPS
} from "../../src/index.js";
import {
MessageCollector,
messageText,
setupNodes,
tearDownNodes,
runNodes,
TestContentTopic,
TestDecoder,
TestEncoder
} from "./filter_test_utils.js";
} from "./utils.js";
describe("Waku Filter V2: FilterPush", function () {
// Set the timeout for all tests in this suite. Can be overwritten at test level
@ -29,17 +29,15 @@ describe("Waku Filter V2: FilterPush", function () {
let subscription: IFilterSubscription;
let messageCollector: MessageCollector;
this.afterEach(async function () {
tearDownNodes(nwaku, waku);
});
this.beforeEach(async function () {
this.timeout(15000);
const setup = await setupNodes(this);
nwaku = setup.nwaku;
waku = setup.waku;
subscription = setup.subscription;
messageCollector = setup.messageCollector;
[nwaku, waku] = await runNodes(this);
subscription = await waku.filter.createSubscription();
messageCollector = new MessageCollector(TestContentTopic);
});
this.afterEach(async function () {
tearDownNodes([nwaku], [waku]);
});
TEST_STRING.forEach((testItem) => {
@ -50,8 +48,7 @@ describe("Waku Filter V2: FilterPush", function () {
});
expect(await messageCollector.waitForMessages(1)).to.eq(true);
messageCollector.verifyReceivedMessage({
index: 0,
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: testItem.value
});
});
@ -72,8 +69,8 @@ describe("Waku Filter V2: FilterPush", function () {
]);
expect(await messageCollector.waitForMessages(1)).to.eq(true);
messageCollector.verifyReceivedMessage({
index: 0,
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
checkTimestamp: false
});
@ -82,7 +79,7 @@ describe("Waku Filter V2: FilterPush", function () {
if (testItem == undefined) {
expect(timestamp).to.eq(undefined);
}
if (timestamp !== undefined) {
if (timestamp !== undefined && timestamp instanceof Date) {
expect(testItem?.toString()).to.contain(timestamp.getTime().toString());
}
});
@ -219,7 +216,9 @@ describe("Waku Filter V2: FilterPush", function () {
]);
expect(await messageCollector.waitForMessages(1)).to.eq(true);
messageCollector.verifyReceivedMessage({ index: 0 });
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText
});
});
// Will be skipped until https://github.com/waku-org/js-waku/issues/1464 si done
@ -245,12 +244,10 @@ describe("Waku Filter V2: FilterPush", function () {
// Confirm both messages were received.
expect(await messageCollector.waitForMessages(2)).to.eq(true);
messageCollector.verifyReceivedMessage({
index: 0,
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: "M1"
});
messageCollector.verifyReceivedMessage({
index: 1,
messageCollector.verifyReceivedMessage(1, {
expectedMessageText: "M2"
});
});
@ -270,12 +267,10 @@ describe("Waku Filter V2: FilterPush", function () {
// Confirm both messages were received.
expect(await messageCollector.waitForMessages(2)).to.eq(true);
messageCollector.verifyReceivedMessage({
index: 0,
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: "M1"
});
messageCollector.verifyReceivedMessage({
index: 1,
messageCollector.verifyReceivedMessage(1, {
expectedMessageText: "M2"
});
});

View File

@ -12,21 +12,21 @@ import { expect } from "chai";
import {
delay,
makeLogFileName,
MessageCollector,
NimGoNode,
tearDownNodes,
TEST_STRING
} from "../../src/index.js";
import {
generateTestData,
MessageCollector,
messagePayload,
messageText,
setupNodes,
tearDownNodes,
runNodes,
TestContentTopic,
TestDecoder,
TestEncoder
} from "./filter_test_utils.js";
} from "./utils.js";
describe("Waku Filter V2: Subscribe", function () {
// Set the timeout for all tests in this suite. Can be overwritten at test level
@ -37,17 +37,15 @@ describe("Waku Filter V2: Subscribe", function () {
let subscription: IFilterSubscription;
let messageCollector: MessageCollector;
this.afterEach(async function () {
tearDownNodes(nwaku, waku, nwaku2);
});
this.beforeEach(async function () {
this.timeout(15000);
const setup = await setupNodes(this);
nwaku = setup.nwaku;
waku = setup.waku;
subscription = setup.subscription;
messageCollector = setup.messageCollector;
[nwaku, waku] = await runNodes(this);
subscription = await waku.filter.createSubscription();
messageCollector = new MessageCollector(TestContentTopic);
});
this.afterEach(async function () {
tearDownNodes([nwaku, nwaku2], [waku]);
});
it("Subscribe and receive messages via lightPush", async function () {
@ -56,7 +54,9 @@ describe("Waku Filter V2: Subscribe", function () {
await waku.lightPush.send(TestEncoder, messagePayload);
expect(await messageCollector.waitForMessages(1)).to.eq(true);
messageCollector.verifyReceivedMessage({ index: 0 });
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText
});
expect((await nwaku.messages()).length).to.eq(1);
});
@ -74,7 +74,9 @@ describe("Waku Filter V2: Subscribe", function () {
);
expect(await messageCollector.waitForMessages(1)).to.eq(true);
messageCollector.verifyReceivedMessage({ index: 0 });
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText
});
expect((await nwaku.messages()).length).to.eq(1);
});
@ -84,7 +86,9 @@ describe("Waku Filter V2: Subscribe", function () {
await waku.lightPush.send(TestEncoder, messagePayload);
expect(await messageCollector.waitForMessages(1)).to.eq(true);
messageCollector.verifyReceivedMessage({ index: 0 });
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText
});
// Send another message on the same topic.
const newMessageText = "Filtering still works!";
@ -94,9 +98,8 @@ describe("Waku Filter V2: Subscribe", function () {
// Verify that the second message was successfully received.
expect(await messageCollector.waitForMessages(2)).to.eq(true);
messageCollector.verifyReceivedMessage({
expectedMessageText: newMessageText,
index: 1
messageCollector.verifyReceivedMessage(1, {
expectedMessageText: newMessageText
});
expect((await nwaku.messages()).length).to.eq(2);
});
@ -106,7 +109,9 @@ describe("Waku Filter V2: Subscribe", function () {
await subscription.subscribe([TestDecoder], messageCollector.callback);
await waku.lightPush.send(TestEncoder, messagePayload);
expect(await messageCollector.waitForMessages(1)).to.eq(true);
messageCollector.verifyReceivedMessage({ index: 0 });
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText
});
// Modify subscription to include a new content topic and send a message.
const newMessageText = "Filtering still works!";
@ -119,18 +124,16 @@ describe("Waku Filter V2: Subscribe", function () {
payload: utf8ToBytes(newMessageText)
});
expect(await messageCollector.waitForMessages(2)).to.eq(true);
messageCollector.verifyReceivedMessage({
messageCollector.verifyReceivedMessage(1, {
expectedContentTopic: newContentTopic,
expectedMessageText: newMessageText,
index: 1
expectedMessageText: newMessageText
});
// Send another message on the initial content topic to verify it still works.
await waku.lightPush.send(TestEncoder, newMessagePayload);
expect(await messageCollector.waitForMessages(3)).to.eq(true);
messageCollector.verifyReceivedMessage({
expectedMessageText: newMessageText,
index: 2
messageCollector.verifyReceivedMessage(2, {
expectedMessageText: newMessageText
});
expect((await nwaku.messages()).length).to.eq(3);
});
@ -154,10 +157,9 @@ describe("Waku Filter V2: Subscribe", function () {
// Verify that each message was received on the corresponding topic.
expect(await messageCollector.waitForMessages(20)).to.eq(true);
td.contentTopics.forEach((topic, index) => {
messageCollector.verifyReceivedMessage({
messageCollector.verifyReceivedMessage(index, {
expectedContentTopic: topic,
expectedMessageText: `Message for Topic ${index + 1}`,
index: index
expectedMessageText: `Message for Topic ${index + 1}`
});
});
});
@ -179,10 +181,9 @@ describe("Waku Filter V2: Subscribe", function () {
// Verify that each message was received on the corresponding topic.
expect(await messageCollector.waitForMessages(30)).to.eq(true);
td.contentTopics.forEach((topic, index) => {
messageCollector.verifyReceivedMessage({
messageCollector.verifyReceivedMessage(index, {
expectedContentTopic: topic,
expectedMessageText: `Message for Topic ${index + 1}`,
index: index
expectedMessageText: `Message for Topic ${index + 1}`
});
});
});
@ -253,12 +254,10 @@ describe("Waku Filter V2: Subscribe", function () {
// Confirm both messages were received.
expect(await messageCollector.waitForMessages(2)).to.eq(true);
messageCollector.verifyReceivedMessage({
index: 0,
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: "M1"
});
messageCollector.verifyReceivedMessage({
index: 1,
messageCollector.verifyReceivedMessage(1, {
expectedMessageText: "M2"
});
});
@ -273,8 +272,8 @@ describe("Waku Filter V2: Subscribe", function () {
await waku.lightPush.send(newEncoder, messagePayload);
expect(await messageCollector.waitForMessages(1)).to.eq(true);
messageCollector.verifyReceivedMessage({
index: 0,
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: newContentTopic
});
});
@ -295,12 +294,10 @@ describe("Waku Filter V2: Subscribe", function () {
// Check if both messages were received
expect(await messageCollector.waitForMessages(2)).to.eq(true);
messageCollector.verifyReceivedMessage({
index: 0,
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: "M1"
});
messageCollector.verifyReceivedMessage({
index: 1,
messageCollector.verifyReceivedMessage(1, {
expectedContentTopic: newContentTopic,
expectedMessageText: "M2"
});
@ -332,12 +329,10 @@ describe("Waku Filter V2: Subscribe", function () {
// Check if both messages were received
expect(await messageCollector.waitForMessages(2)).to.eq(true);
messageCollector.verifyReceivedMessage({
index: 0,
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: "M1"
});
messageCollector.verifyReceivedMessage({
index: 1,
messageCollector.verifyReceivedMessage(1, {
expectedContentTopic: newContentTopic,
expectedMessageText: "M2"
});

View File

@ -3,18 +3,17 @@ import type { IFilterSubscription, LightNode } from "@waku/interfaces";
import { utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai";
import { NimGoNode } from "../../src/index.js";
import { MessageCollector, NimGoNode, tearDownNodes } from "../../src/index.js";
import {
generateTestData,
MessageCollector,
messagePayload,
setupNodes,
tearDownNodes,
messageText,
runNodes,
TestContentTopic,
TestDecoder,
TestEncoder
} from "./filter_test_utils.js";
} from "./utils.js";
describe("Waku Filter V2: Unsubscribe", function () {
// Set the timeout for all tests in this suite. Can be overwritten at test level
@ -24,17 +23,15 @@ describe("Waku Filter V2: Unsubscribe", function () {
let subscription: IFilterSubscription;
let messageCollector: MessageCollector;
this.afterEach(async function () {
tearDownNodes(nwaku, waku);
});
this.beforeEach(async function () {
this.timeout(15000);
const setup = await setupNodes(this);
nwaku = setup.nwaku;
waku = setup.waku;
subscription = setup.subscription;
messageCollector = setup.messageCollector;
[nwaku, waku] = await runNodes(this);
subscription = await waku.filter.createSubscription();
messageCollector = new MessageCollector(TestContentTopic);
});
this.afterEach(async function () {
tearDownNodes([nwaku], [waku]);
});
it("Unsubscribe 1 topic - node subscribed to 1 topic", async function () {
@ -48,7 +45,9 @@ describe("Waku Filter V2: Unsubscribe", function () {
expect(await messageCollector.waitForMessages(2)).to.eq(false);
// Check that from 2 messages send only the 1st was received
messageCollector.verifyReceivedMessage({ index: 0 });
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText
});
expect(messageCollector.count).to.eq(1);
expect((await nwaku.messages()).length).to.eq(2);
});

View File

@ -0,0 +1,97 @@
import {
createDecoder,
createEncoder,
Decoder,
Encoder,
waitForRemotePeer
} from "@waku/core";
import { IFilterSubscription, LightNode, Protocols } from "@waku/interfaces";
import { createLightNode } from "@waku/sdk";
import { utf8ToBytes } from "@waku/utils/bytes";
import debug from "debug";
import { Context } from "mocha";
import { makeLogFileName, NimGoNode, NOISE_KEY_1 } from "../../src/index.js";
// Constants for test configuration.
export const log = debug("waku:test:filter");
export const TestContentTopic = "/test/1/waku-filter";
export const TestEncoder = createEncoder({ contentTopic: TestContentTopic });
export const TestDecoder = createDecoder(TestContentTopic);
export const messageText = "Filtering works!";
export const messagePayload = { payload: utf8ToBytes(messageText) };
// Utility to generate test data for multiple topics tests.
export function generateTestData(topicCount: number): {
contentTopics: string[];
encoders: Encoder[];
decoders: Decoder[];
} {
const contentTopics = Array.from(
{ length: topicCount },
(_, i) => `/test/${i + 1}/waku-multi`
);
const encoders = contentTopics.map((topic) =>
createEncoder({ contentTopic: topic })
);
const decoders = contentTopics.map((topic) => createDecoder(topic));
return {
contentTopics,
encoders,
decoders
};
}
// Utility to validate errors related to pings in the subscription.
export async function validatePingError(
subscription: IFilterSubscription
): Promise<void> {
try {
await subscription.ping();
throw new Error(
"Ping was successful but was expected to fail with a specific error."
);
} catch (err) {
if (
err instanceof Error &&
err.message.includes("peer has no subscriptions")
) {
return;
} else {
throw err;
}
}
}
export async function runNodes(
currentTest: Context
): Promise<[NimGoNode, LightNode]> {
const nwaku = new NimGoNode(makeLogFileName(currentTest));
await nwaku.startWithRetries(
{
filter: true,
lightpush: true,
relay: true
},
{ retries: 3 }
);
let waku: LightNode | undefined;
try {
waku = await createLightNode({
staticNoiseKey: NOISE_KEY_1,
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }
});
await waku.start();
} catch (error) {
log("jswaku node failed to start:", error);
}
if (waku) {
await waku.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]);
return [nwaku, waku];
} else {
throw new Error("Failed to initialize waku");
}
}

View File

@ -0,0 +1,49 @@
import { LightNode } from "@waku/interfaces";
import { utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai";
import { MessageCollector, NimGoNode, tearDownNodes } from "../../src/index.js";
import {
messageText,
runNodes,
TestContentTopic,
TestEncoder
} from "./utils.js";
describe("Waku Light Push [node only] - custom pubsub topic", function () {
this.timeout(15000);
let waku: LightNode;
let nwaku: NimGoNode;
let messageCollector: MessageCollector;
const customPubSubTopic = "/waku/2/custom-dapp/proto";
beforeEach(async function () {
[nwaku, waku] = await runNodes(this, customPubSubTopic);
messageCollector = new MessageCollector(
TestContentTopic,
nwaku,
customPubSubTopic
);
});
this.afterEach(async function () {
tearDownNodes([nwaku], [waku]);
});
it("Push message", async function () {
const nimPeerId = await nwaku.getPeerId();
const pushResponse = await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes(messageText)
});
expect(pushResponse.recipients[0].toString()).to.eq(nimPeerId.toString());
expect(await messageCollector.waitForMessages(1)).to.eq(true);
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: TestContentTopic
});
});
});

View File

@ -0,0 +1,234 @@
import { createEncoder, DefaultPubSubTopic } from "@waku/core";
import { IRateLimitProof, LightNode, SendError } from "@waku/interfaces";
import { utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai";
import {
MessageCollector,
NimGoNode,
tearDownNodes,
TEST_STRING
} from "../../src/index.js";
import { generateRandomUint8Array } from "../../src/random_array.js";
import {
messagePayload,
messageText,
runNodes,
TestContentTopic,
TestEncoder
} from "./utils.js";
describe("Waku Light Push [node only]", function () {
// Set the timeout for all tests in this suite. Can be overwritten at test level
this.timeout(15000);
let waku: LightNode;
let nwaku: NimGoNode;
let messageCollector: MessageCollector;
this.beforeEach(async function () {
this.timeout(15000);
[nwaku, waku] = await runNodes(this);
messageCollector = new MessageCollector(
TestContentTopic,
nwaku,
DefaultPubSubTopic
);
});
this.afterEach(async function () {
tearDownNodes([nwaku], [waku]);
});
TEST_STRING.forEach((testItem) => {
it(`Push message with ${testItem.description} payload`, async function () {
const pushResponse = await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes(testItem.value)
});
expect(pushResponse.recipients.length).to.eq(1);
expect(await messageCollector.waitForMessages(1)).to.eq(true);
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: testItem.value
});
});
});
it("Push 30 different messages", async function () {
const generateMessageText = (index: number): string => `M${index}`;
for (let i = 0; i < 30; i++) {
const pushResponse = await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes(generateMessageText(i))
});
expect(pushResponse.recipients.length).to.eq(1);
}
expect(await messageCollector.waitForMessages(30)).to.eq(true);
for (let i = 0; i < 30; i++) {
messageCollector.verifyReceivedMessage(i, {
expectedMessageText: generateMessageText(i)
});
}
});
it("Fails to push message with empty payload", async function () {
const pushResponse = await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes("")
});
if (nwaku.type() == "go-waku") {
expect(pushResponse.recipients.length).to.eq(1);
expect(await messageCollector.waitForMessages(1)).to.eq(true);
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: undefined
});
} else {
expect(pushResponse.recipients.length).to.eq(0);
expect(pushResponse.errors).to.include(SendError.NO_RPC_RESPONSE);
expect(await messageCollector.waitForMessages(1)).to.eq(false);
}
});
TEST_STRING.forEach((testItem) => {
it(`Push message with content topic containing ${testItem.description}`, async function () {
const customEncoder = createEncoder({
contentTopic: testItem.value
});
const pushResponse = await waku.lightPush.send(
customEncoder,
messagePayload
);
expect(pushResponse.recipients.length).to.eq(1);
expect(await messageCollector.waitForMessages(1)).to.eq(true);
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: testItem.value
});
});
});
it("Fails to push message with empty content topic", async function () {
try {
createEncoder({ contentTopic: "" });
expect.fail("Expected an error but didn't get one");
} catch (error) {
expect((error as Error).message).to.equal(
"Content topic must be specified"
);
}
});
it("Push message with meta", async function () {
const customTestEncoder = createEncoder({
contentTopic: TestContentTopic,
metaSetter: () => new Uint8Array(10)
});
const pushResponse = await waku.lightPush.send(
customTestEncoder,
messagePayload
);
expect(pushResponse.recipients.length).to.eq(1);
expect(await messageCollector.waitForMessages(1)).to.eq(true);
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText
});
});
it("Fails to push message with large meta", async function () {
const customTestEncoder = createEncoder({
contentTopic: TestContentTopic,
metaSetter: () => new Uint8Array(10 ** 6)
});
const pushResponse = await waku.lightPush.send(
customTestEncoder,
messagePayload
);
if (nwaku.type() == "go-waku") {
expect(pushResponse.recipients.length).to.eq(1);
expect(await messageCollector.waitForMessages(1)).to.eq(true);
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText
});
} else {
expect(pushResponse.recipients.length).to.eq(0);
expect(pushResponse.errors).to.include(SendError.NO_RPC_RESPONSE);
expect(await messageCollector.waitForMessages(1)).to.eq(false);
}
});
it("Push message with rate limit", async function () {
const rateLimitProof: IRateLimitProof = {
proof: utf8ToBytes("proofData"),
merkleRoot: utf8ToBytes("merkleRootData"),
epoch: utf8ToBytes("epochData"),
shareX: utf8ToBytes("shareXData"),
shareY: utf8ToBytes("shareYData"),
nullifier: utf8ToBytes("nullifierData"),
rlnIdentifier: utf8ToBytes("rlnIdentifierData")
};
const pushResponse = await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes(messageText),
rateLimitProof: rateLimitProof
});
expect(pushResponse.recipients.length).to.eq(1);
expect(await messageCollector.waitForMessages(1)).to.eq(true);
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText
});
});
[
Date.now() - 3600000 * 24 * 356,
Date.now() - 3600000,
Date.now() + 3600000
].forEach((testItem) => {
it(`Push message with custom timestamp: ${testItem}`, async function () {
const customTimeNanos = BigInt(testItem) * BigInt(1000000);
const pushResponse = await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes(messageText),
timestamp: new Date(testItem)
});
expect(pushResponse.recipients.length).to.eq(1);
expect(await messageCollector.waitForMessages(1)).to.eq(true);
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedTimestamp: customTimeNanos
});
});
});
it("Push message equal or less that 1MB", async function () {
const oneMbPayload = generateRandomUint8Array(1024 ** 2);
let pushResponse = await waku.lightPush.send(TestEncoder, {
payload: oneMbPayload
});
expect(pushResponse.recipients.length).to.greaterThan(0);
const bigPayload = generateRandomUint8Array(65536);
pushResponse = await waku.lightPush.send(TestEncoder, {
payload: bigPayload
});
expect(pushResponse.recipients.length).to.greaterThan(0);
});
it("Fails to push message bigger that 1MB", async function () {
const MB = 1024 ** 2;
const pushResponse = await waku.lightPush.send(TestEncoder, {
payload: generateRandomUint8Array(MB + 65536)
});
expect(pushResponse.recipients.length).to.eq(0);
expect(pushResponse.errors).to.include(SendError.SIZE_TOO_BIG);
expect(await messageCollector.waitForMessages(1)).to.eq(false);
});
});

View File

@ -0,0 +1,48 @@
import { createEncoder, waitForRemotePeer } from "@waku/core";
import { LightNode, Protocols } from "@waku/interfaces";
import { createLightNode, utf8ToBytes } from "@waku/sdk";
import debug from "debug";
import { makeLogFileName, NimGoNode, NOISE_KEY_1 } from "../../src/index.js";
// Constants for test configuration.
export const log = debug("waku:test:lightpush");
export const TestContentTopic = "/test/1/waku-light-push/utf8";
export const TestEncoder = createEncoder({ contentTopic: TestContentTopic });
export const messageText = "Light Push works!";
export const messagePayload = { payload: utf8ToBytes(messageText) };
export async function runNodes(
context: Mocha.Context,
pubSubTopic?: string
): Promise<[NimGoNode, LightNode]> {
const nwakuOptional = pubSubTopic ? { topic: pubSubTopic } : {};
const nwaku = new NimGoNode(makeLogFileName(context));
await nwaku.startWithRetries(
{
lightpush: true,
relay: true,
...nwakuOptional
},
{ retries: 3 }
);
let waku: LightNode | undefined;
try {
waku = await createLightNode({
pubSubTopic,
staticNoiseKey: NOISE_KEY_1
});
await waku.start();
} catch (error) {
log("jswaku node failed to start:", error);
}
if (waku) {
await waku.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.LightPush]);
return [nwaku, waku];
} else {
throw new Error("Failed to initialize waku");
}
}

View File

@ -1,158 +0,0 @@
import { createEncoder, waitForRemotePeer } from "@waku/core";
import { LightNode, SendError } from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import { createLightNode } from "@waku/sdk";
import { utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai";
import debug from "debug";
import {
base64ToUtf8,
delay,
makeLogFileName,
NimGoNode,
NOISE_KEY_1
} from "../src/index.js";
import { MessageRpcResponse } from "../src/node/interfaces.js";
import { generateRandomUint8Array } from "../src/random_array.js";
const log = debug("waku:test:lightpush");
const TestContentTopic = "/test/1/waku-light-push/utf8";
const TestEncoder = createEncoder({
contentTopic: TestContentTopic
});
async function runNodes(
context: Mocha.Context,
pubSubTopic?: string
): Promise<[NimGoNode, LightNode]> {
const nwakuOptional = pubSubTopic ? { topic: pubSubTopic } : {};
const nwaku = new NimGoNode(makeLogFileName(context));
await nwaku.start({
lightpush: true,
relay: true,
...nwakuOptional
});
const waku = await createLightNode({
pubSubTopic,
staticNoiseKey: NOISE_KEY_1
});
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.LightPush]);
return [nwaku, waku];
}
describe("Waku Light Push [node only]", () => {
let waku: LightNode;
let nwaku: NimGoNode;
beforeEach(async function () {
this.timeout(15_000);
[nwaku, waku] = await runNodes(this);
});
afterEach(async function () {
try {
await nwaku?.stop();
await waku?.stop();
} catch (e) {
console.error("Failed to stop nodes: ", e);
}
});
it("Push successfully", async function () {
this.timeout(15_000);
const messageText = "Light Push works!";
const pushResponse = await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes(messageText)
});
expect(pushResponse.recipients.length).to.eq(1);
let msgs: MessageRpcResponse[] = [];
while (msgs.length === 0) {
await delay(200);
msgs = await nwaku.messages();
}
expect(msgs[0].contentTopic).to.equal(TestContentTopic);
expect(base64ToUtf8(msgs[0].payload)).to.equal(messageText);
});
it("Pushes messages equal or less that 1MB", async function () {
this.timeout(15_000);
const MB = 1024 ** 2;
let pushResponse = await waku.lightPush.send(TestEncoder, {
payload: generateRandomUint8Array(MB)
});
expect(pushResponse.recipients.length).to.greaterThan(0);
pushResponse = await waku.lightPush.send(TestEncoder, {
payload: generateRandomUint8Array(65536)
});
expect(pushResponse.recipients.length).to.greaterThan(0);
});
it("Fails to push message bigger that 1MB", async function () {
this.timeout(15_000);
const MB = 1024 ** 2;
const pushResponse = await waku.lightPush.send(TestEncoder, {
payload: generateRandomUint8Array(MB + 65536)
});
expect(pushResponse.recipients.length).to.eq(0);
expect(pushResponse.errors).to.include(SendError.SIZE_TOO_BIG);
});
});
describe("Waku Light Push [node only] - custom pubsub topic", () => {
let waku: LightNode;
let nwaku: NimGoNode;
const customPubSubTopic = "/waku/2/custom-dapp/proto";
beforeEach(async function () {
this.timeout(15_000);
[nwaku, waku] = await runNodes(this, customPubSubTopic);
});
afterEach(async function () {
try {
await nwaku?.stop();
await waku?.stop();
} catch (e) {
console.error("Failed to stop nodes: ", e);
}
});
it("Push message", async function () {
this.timeout(15_000);
const nimPeerId = await nwaku.getPeerId();
const messageText = "Light Push works!";
log("Send message via lightpush");
const pushResponse = await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes(messageText)
});
log("Ack received", pushResponse);
expect(pushResponse.recipients[0].toString()).to.eq(nimPeerId.toString());
let msgs: MessageRpcResponse[] = [];
log("Waiting for message to show in nwaku");
while (msgs.length === 0) {
await delay(200);
msgs = await nwaku.messages(customPubSubTopic);
}
expect(msgs[0].contentTopic).to.equal(TestContentTopic);
expect(base64ToUtf8(msgs[0].payload)).to.equal(messageText);
});
});