mirror of https://github.com/waku-org/js-waku.git
Merge branch 'master' into chore/split-ci-into-chunks
This commit is contained in:
commit
4d54027f35
|
@ -10,3 +10,5 @@ export * from "./constants.js";
|
|||
export * from "./delay.js";
|
||||
export * from "./log_file.js";
|
||||
export * from "./node/node.js";
|
||||
export * from "./teardown.js";
|
||||
export * from "./message_collector.js";
|
||||
|
|
|
@ -0,0 +1,208 @@
|
|||
import { DecodedMessage, DefaultPubSubTopic } from "@waku/core";
|
||||
import { bytesToUtf8 } from "@waku/utils/bytes";
|
||||
import { AssertionError, expect } from "chai";
|
||||
import debug from "debug";
|
||||
|
||||
import { MessageRpcResponse } from "./node/interfaces.js";
|
||||
|
||||
import { base64ToUtf8, delay, NimGoNode } from "./index.js";
|
||||
|
||||
const log = debug("waku:test");
|
||||
|
||||
/**
|
||||
* Class responsible for collecting messages.
|
||||
* It provides utility methods to interact with the collected messages,
|
||||
* and offers a way to wait for incoming messages.
|
||||
*/
|
||||
export class MessageCollector {
|
||||
list: Array<MessageRpcResponse | DecodedMessage> = [];
|
||||
callback: (msg: DecodedMessage) => void = () => {};
|
||||
|
||||
constructor(
|
||||
private contentTopic: string,
|
||||
private nwaku?: NimGoNode,
|
||||
private pubSubTopic = DefaultPubSubTopic
|
||||
) {
|
||||
if (!this.nwaku) {
|
||||
this.callback = (msg: DecodedMessage): void => {
|
||||
log("Got a message");
|
||||
this.list.push(msg);
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
get count(): number {
|
||||
return this.list.length;
|
||||
}
|
||||
|
||||
getMessage(index: number): MessageRpcResponse | DecodedMessage {
|
||||
return this.list[index];
|
||||
}
|
||||
|
||||
// Type guard to determine if a message is of type MessageRpcResponse
|
||||
isMessageRpcResponse(
|
||||
message: MessageRpcResponse | DecodedMessage
|
||||
): message is MessageRpcResponse {
|
||||
return (
|
||||
("payload" in message && typeof message.payload === "string") ||
|
||||
!!this.nwaku
|
||||
);
|
||||
}
|
||||
|
||||
async waitForMessages(
|
||||
numMessages: number,
|
||||
timeoutDuration: number = 400
|
||||
): Promise<boolean> {
|
||||
const startTime = Date.now();
|
||||
|
||||
while (this.count < numMessages) {
|
||||
if (this.nwaku) {
|
||||
try {
|
||||
this.list = await this.nwaku.messages(this.pubSubTopic);
|
||||
} catch (error) {
|
||||
log(`Can't retrieve messages because of ${error}`);
|
||||
await delay(10);
|
||||
}
|
||||
}
|
||||
|
||||
if (Date.now() - startTime > timeoutDuration * numMessages) {
|
||||
return false;
|
||||
}
|
||||
|
||||
await delay(10);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
// Verifies a received message against expected values.
|
||||
verifyReceivedMessage(
|
||||
index: number,
|
||||
options: {
|
||||
expectedMessageText: string | Uint8Array | undefined;
|
||||
expectedContentTopic?: string;
|
||||
expectedPubSubTopic?: string;
|
||||
expectedVersion?: number;
|
||||
expectedMeta?: Uint8Array;
|
||||
expectedEphemeral?: boolean;
|
||||
expectedTimestamp?: bigint;
|
||||
checkTimestamp?: boolean; // Used to determine if we need to check the timestamp
|
||||
}
|
||||
): void {
|
||||
expect(this.list.length).to.be.greaterThan(
|
||||
index,
|
||||
`The message list does not have a message at index ${index}`
|
||||
);
|
||||
|
||||
const message = this.getMessage(index);
|
||||
expect(message.contentTopic).to.eq(
|
||||
options.expectedContentTopic || this.contentTopic,
|
||||
`Message content topic mismatch. Expected: ${
|
||||
options.expectedContentTopic || this.contentTopic
|
||||
}. Got: ${message.contentTopic}`
|
||||
);
|
||||
|
||||
expect(message.version).to.eq(
|
||||
options.expectedVersion || 0,
|
||||
`Message version mismatch. Expected: ${
|
||||
options.expectedVersion || 0
|
||||
}. Got: ${message.version}`
|
||||
);
|
||||
|
||||
if (message.ephemeral !== undefined) {
|
||||
expect(message.ephemeral).to.eq(
|
||||
options.expectedEphemeral !== undefined
|
||||
? options.expectedEphemeral
|
||||
: false,
|
||||
`Message ephemeral value mismatch. Expected: ${
|
||||
options.expectedEphemeral !== undefined
|
||||
? options.expectedEphemeral
|
||||
: false
|
||||
}. Got: ${message.ephemeral}`
|
||||
);
|
||||
}
|
||||
|
||||
if (this.isMessageRpcResponse(message)) {
|
||||
// nwaku message specific assertions
|
||||
const receivedMessageText = message.payload
|
||||
? base64ToUtf8(message.payload)
|
||||
: undefined;
|
||||
|
||||
expect(receivedMessageText).to.eq(
|
||||
options.expectedMessageText,
|
||||
`Message text mismatch. Expected: ${options.expectedMessageText}. Got: ${receivedMessageText}`
|
||||
);
|
||||
|
||||
if (message.timestamp) {
|
||||
// In we send timestamp in the request we assert that it matches the timestamp in the response +- 1 sec
|
||||
// We take the 1s deviation because there are some ms diffs in timestamps, probably because of conversions
|
||||
if (options.expectedTimestamp !== undefined) {
|
||||
const lowerBound =
|
||||
BigInt(options.expectedTimestamp) - BigInt(1000000000);
|
||||
const upperBound =
|
||||
BigInt(options.expectedTimestamp) + BigInt(1000000000);
|
||||
|
||||
if (
|
||||
message.timestamp < lowerBound ||
|
||||
message.timestamp > upperBound
|
||||
) {
|
||||
throw new AssertionError(
|
||||
`Message timestamp not within the expected range. Expected between: ${lowerBound} and ${upperBound}. Got: ${message.timestamp}`
|
||||
);
|
||||
}
|
||||
}
|
||||
// In we don't send timestamp in the request we assert that the timestamp in the response is between now and (now-10s)
|
||||
else {
|
||||
const now = BigInt(Date.now()) * BigInt(1_000_000);
|
||||
const tenSecondsAgo = now - BigInt(10_000_000_000);
|
||||
|
||||
if (message.timestamp < tenSecondsAgo || message.timestamp > now) {
|
||||
throw new AssertionError(
|
||||
`Message timestamp not within the expected range. Expected between: ${tenSecondsAgo} and ${now}. Got: ${message.timestamp}`
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// js-waku message specific assertions
|
||||
expect(message.pubSubTopic).to.eq(
|
||||
options.expectedPubSubTopic || DefaultPubSubTopic,
|
||||
`Message pub/sub topic mismatch. Expected: ${
|
||||
options.expectedPubSubTopic || DefaultPubSubTopic
|
||||
}. Got: ${message.pubSubTopic}`
|
||||
);
|
||||
|
||||
expect(bytesToUtf8(message.payload)).to.eq(
|
||||
options.expectedMessageText,
|
||||
`Message text mismatch. Expected: ${
|
||||
options.expectedMessageText
|
||||
}. Got: ${bytesToUtf8(message.payload)}`
|
||||
);
|
||||
|
||||
const shouldCheckTimestamp =
|
||||
options.checkTimestamp !== undefined ? options.checkTimestamp : true;
|
||||
if (shouldCheckTimestamp && message.timestamp) {
|
||||
const now = Date.now();
|
||||
const tenSecondsAgo = now - 10000;
|
||||
expect(message.timestamp.getTime()).to.be.within(
|
||||
tenSecondsAgo,
|
||||
now,
|
||||
`Message timestamp not within the expected range. Expected between: ${tenSecondsAgo} and ${now}. Got: ${message.timestamp.getTime()}`
|
||||
);
|
||||
}
|
||||
|
||||
expect([
|
||||
options.expectedMeta,
|
||||
undefined,
|
||||
new Uint8Array(0)
|
||||
]).to.deep.include(
|
||||
message.meta,
|
||||
`Message meta mismatch. Expected: ${
|
||||
options.expectedMeta
|
||||
? JSON.stringify(options.expectedMeta)
|
||||
: "undefined or " + JSON.stringify(new Uint8Array(0))
|
||||
}. Got: ${JSON.stringify(message.meta)}`
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -52,4 +52,5 @@ export interface MessageRpcResponse {
|
|||
contentTopic?: string;
|
||||
version?: number;
|
||||
timestamp?: bigint; // Unix epoch time in nanoseconds as a 64-bits integer value.
|
||||
ephemeral?: boolean;
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@ import { DefaultPubSubTopic } from "@waku/core";
|
|||
import { isDefined } from "@waku/utils";
|
||||
import { bytesToHex, hexToBytes } from "@waku/utils/bytes";
|
||||
import debug from "debug";
|
||||
import pRetry from "p-retry";
|
||||
import portfinder from "portfinder";
|
||||
|
||||
import { existsAsync, mkdirAsync, openAsync } from "../async_fs.js";
|
||||
|
@ -164,6 +165,25 @@ export class NimGoNode {
|
|||
}
|
||||
}
|
||||
|
||||
async startWithRetries(
|
||||
args: Args,
|
||||
options: {
|
||||
retries: number;
|
||||
}
|
||||
): Promise<void> {
|
||||
await pRetry(
|
||||
async () => {
|
||||
try {
|
||||
await this.start(args);
|
||||
} catch (error) {
|
||||
log("Nwaku node failed to start:", error);
|
||||
throw error;
|
||||
}
|
||||
},
|
||||
{ retries: options.retries }
|
||||
);
|
||||
}
|
||||
|
||||
public async stop(): Promise<void> {
|
||||
await this.docker?.container?.stop();
|
||||
delete this.docker;
|
||||
|
@ -189,6 +209,17 @@ export class NimGoNode {
|
|||
return this.rpcCall<RpcInfoResponse>("get_waku_v2_debug_v1_info", []);
|
||||
}
|
||||
|
||||
async ensureSubscriptions(
|
||||
pubsubTopics: [string] = [DefaultPubSubTopic]
|
||||
): Promise<boolean> {
|
||||
this.checkProcess();
|
||||
|
||||
return this.rpcCall<boolean>(
|
||||
"post_waku_v2_relay_v1_subscriptions",
|
||||
pubsubTopics
|
||||
);
|
||||
}
|
||||
|
||||
async sendMessage(
|
||||
message: MessageRpcQuery,
|
||||
pubSubTopic: string = DefaultPubSubTopic
|
||||
|
|
|
@ -0,0 +1,23 @@
|
|||
import { LightNode } from "@waku/interfaces";
|
||||
import debug from "debug";
|
||||
|
||||
import { NimGoNode } from "./index.js";
|
||||
|
||||
const log = debug("waku:test");
|
||||
|
||||
export function tearDownNodes(
|
||||
nwakuNodes: NimGoNode[],
|
||||
wakuNodes: LightNode[]
|
||||
): void {
|
||||
nwakuNodes.forEach((nwaku) => {
|
||||
if (nwaku) {
|
||||
nwaku.stop().catch((e) => log("Nwaku failed to stop", e));
|
||||
}
|
||||
});
|
||||
|
||||
wakuNodes.forEach((waku) => {
|
||||
if (waku) {
|
||||
waku.stop().catch((e) => log("Waku failed to stop", e));
|
||||
}
|
||||
});
|
||||
}
|
|
@ -1,254 +0,0 @@
|
|||
import {
|
||||
createDecoder,
|
||||
createEncoder,
|
||||
DecodedMessage,
|
||||
Decoder,
|
||||
DefaultPubSubTopic,
|
||||
Encoder,
|
||||
waitForRemotePeer
|
||||
} from "@waku/core";
|
||||
import { IFilterSubscription, LightNode, Protocols } from "@waku/interfaces";
|
||||
// import { createLightNode } from "@waku/sdk";
|
||||
import { createLightNode } from "@waku/sdk";
|
||||
import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes";
|
||||
import { expect } from "chai";
|
||||
import debug from "debug";
|
||||
import { Context } from "mocha";
|
||||
import pRetry from "p-retry";
|
||||
|
||||
import {
|
||||
delay,
|
||||
makeLogFileName,
|
||||
NimGoNode,
|
||||
NOISE_KEY_1
|
||||
} from "../../src/index.js";
|
||||
|
||||
// Constants for test configuration.
|
||||
export const log = debug("waku:test:filter");
|
||||
export const TestContentTopic = "/test/1/waku-filter";
|
||||
export const TestEncoder = createEncoder({ contentTopic: TestContentTopic });
|
||||
export const TestDecoder = createDecoder(TestContentTopic);
|
||||
export const messageText = "Filtering works!";
|
||||
export const messagePayload = { payload: utf8ToBytes(messageText) };
|
||||
|
||||
/**
|
||||
* Class responsible for collecting messages.
|
||||
* It provides utility methods to interact with the collected messages,
|
||||
* and offers a way to wait for incoming messages.
|
||||
*/
|
||||
export class MessageCollector {
|
||||
list: Array<DecodedMessage> = [];
|
||||
|
||||
// Callback to handle incoming messages.
|
||||
callback = (msg: DecodedMessage): void => {
|
||||
log("Got a message");
|
||||
this.list.push(msg);
|
||||
};
|
||||
|
||||
get count(): number {
|
||||
return this.list.length;
|
||||
}
|
||||
|
||||
getMessage(index: number): DecodedMessage {
|
||||
return this.list[index];
|
||||
}
|
||||
|
||||
async waitForMessages(
|
||||
numMessages: number,
|
||||
timeoutDuration: number = 400
|
||||
): Promise<boolean> {
|
||||
const startTime = Date.now();
|
||||
|
||||
while (this.count < numMessages) {
|
||||
if (Date.now() - startTime > timeoutDuration * numMessages) {
|
||||
return false;
|
||||
}
|
||||
await delay(10);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
// Verifies a received message against expected values.
|
||||
verifyReceivedMessage(options: {
|
||||
index: number;
|
||||
expectedContentTopic?: string;
|
||||
expectedPubSubTopic?: string;
|
||||
expectedMessageText?: string | Uint8Array;
|
||||
expectedVersion?: number;
|
||||
expectedMeta?: Uint8Array;
|
||||
expectedEphemeral?: boolean;
|
||||
checkTimestamp?: boolean; // Used to determine if we need to check the timestamp
|
||||
}): void {
|
||||
expect(this.list.length).to.be.greaterThan(
|
||||
options.index,
|
||||
`The message list does not have a message at index ${options.index}`
|
||||
);
|
||||
|
||||
const message = this.getMessage(options.index);
|
||||
expect(message.contentTopic).to.eq(
|
||||
options.expectedContentTopic || TestContentTopic,
|
||||
`Message content topic mismatch. Expected: ${
|
||||
options.expectedContentTopic || TestContentTopic
|
||||
}. Got: ${message.contentTopic}`
|
||||
);
|
||||
|
||||
expect(message.pubSubTopic).to.eq(
|
||||
options.expectedPubSubTopic || DefaultPubSubTopic,
|
||||
`Message pub/sub topic mismatch. Expected: ${
|
||||
options.expectedPubSubTopic || DefaultPubSubTopic
|
||||
}. Got: ${message.pubSubTopic}`
|
||||
);
|
||||
|
||||
expect(bytesToUtf8(message.payload)).to.eq(
|
||||
options.expectedMessageText || messageText,
|
||||
`Message text mismatch. Expected: ${
|
||||
options.expectedMessageText || messageText
|
||||
}. Got: ${bytesToUtf8(message.payload)}`
|
||||
);
|
||||
|
||||
expect(message.version).to.eq(
|
||||
options.expectedVersion || 0,
|
||||
`Message version mismatch. Expected: ${
|
||||
options.expectedVersion || 0
|
||||
}. Got: ${message.version}`
|
||||
);
|
||||
|
||||
const shouldCheckTimestamp =
|
||||
options.checkTimestamp !== undefined ? options.checkTimestamp : true;
|
||||
if (shouldCheckTimestamp && message.timestamp) {
|
||||
const now = Date.now();
|
||||
const tenSecondsAgo = now - 10000;
|
||||
expect(message.timestamp.getTime()).to.be.within(
|
||||
tenSecondsAgo,
|
||||
now,
|
||||
`Message timestamp not within the expected range. Expected between: ${tenSecondsAgo} and ${now}. Got: ${message.timestamp.getTime()}`
|
||||
);
|
||||
}
|
||||
|
||||
expect([
|
||||
options.expectedMeta,
|
||||
undefined,
|
||||
new Uint8Array(0)
|
||||
]).to.deep.include(
|
||||
message.meta,
|
||||
`Message meta mismatch. Expected: ${
|
||||
options.expectedMeta
|
||||
? JSON.stringify(options.expectedMeta)
|
||||
: "undefined or " + JSON.stringify(new Uint8Array(0))
|
||||
}. Got: ${JSON.stringify(message.meta)}`
|
||||
);
|
||||
|
||||
expect(message.ephemeral).to.eq(
|
||||
options.expectedEphemeral !== undefined
|
||||
? options.expectedEphemeral
|
||||
: false,
|
||||
`Message ephemeral value mismatch. Expected: ${
|
||||
options.expectedEphemeral !== undefined
|
||||
? options.expectedEphemeral
|
||||
: false
|
||||
}. Got: ${message.ephemeral}`
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Utility to generate test data for multiple topics tests.
|
||||
export function generateTestData(topicCount: number): {
|
||||
contentTopics: string[];
|
||||
encoders: Encoder[];
|
||||
decoders: Decoder[];
|
||||
} {
|
||||
const contentTopics = Array.from(
|
||||
{ length: topicCount },
|
||||
(_, i) => `/test/${i + 1}/waku-multi`
|
||||
);
|
||||
const encoders = contentTopics.map((topic) =>
|
||||
createEncoder({ contentTopic: topic })
|
||||
);
|
||||
const decoders = contentTopics.map((topic) => createDecoder(topic));
|
||||
return {
|
||||
contentTopics,
|
||||
encoders,
|
||||
decoders
|
||||
};
|
||||
}
|
||||
|
||||
// Utility to validate errors related to pings in the subscription.
|
||||
export async function validatePingError(
|
||||
subscription: IFilterSubscription
|
||||
): Promise<void> {
|
||||
try {
|
||||
await subscription.ping();
|
||||
throw new Error(
|
||||
"Ping was successful but was expected to fail with a specific error."
|
||||
);
|
||||
} catch (err) {
|
||||
if (
|
||||
err instanceof Error &&
|
||||
err.message.includes("peer has no subscriptions")
|
||||
) {
|
||||
return;
|
||||
} else {
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
interface SetupReturn {
|
||||
nwaku: NimGoNode;
|
||||
waku: LightNode;
|
||||
subscription: IFilterSubscription;
|
||||
messageCollector: MessageCollector;
|
||||
}
|
||||
|
||||
// Setup before each test to initialize nodes and message collector.
|
||||
export async function setupNodes(currentTest: Context): Promise<SetupReturn> {
|
||||
const nwaku = new NimGoNode(makeLogFileName(currentTest));
|
||||
// Sometimes the node setup fails, when that happens we retry it max 3 times.
|
||||
await pRetry(
|
||||
async () => {
|
||||
try {
|
||||
await nwaku.start({
|
||||
filter: true,
|
||||
lightpush: true,
|
||||
relay: true
|
||||
});
|
||||
} catch (error) {
|
||||
log("nwaku node failed to start:", error);
|
||||
throw error;
|
||||
}
|
||||
},
|
||||
{ retries: 3 }
|
||||
);
|
||||
|
||||
let waku: LightNode | undefined;
|
||||
try {
|
||||
waku = await createLightNode({
|
||||
staticNoiseKey: NOISE_KEY_1,
|
||||
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }
|
||||
});
|
||||
await waku.start();
|
||||
} catch (error) {
|
||||
log("jswaku node failed to start:", error);
|
||||
}
|
||||
if (waku) {
|
||||
await waku.dial(await nwaku.getMultiaddrWithId());
|
||||
await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]);
|
||||
const subscription = await waku.filter.createSubscription();
|
||||
const messageCollector = new MessageCollector();
|
||||
|
||||
return { nwaku, waku, subscription, messageCollector };
|
||||
} else {
|
||||
throw new Error("Failed to initialize waku");
|
||||
}
|
||||
}
|
||||
|
||||
export function tearDownNodes(
|
||||
nwaku: NimGoNode,
|
||||
waku: LightNode,
|
||||
nwaku2?: NimGoNode
|
||||
): void {
|
||||
!!nwaku && nwaku.stop().catch((e) => log("Nwaku failed to stop", e));
|
||||
!!nwaku2 && nwaku2.stop().catch((e) => log("Nwaku2 failed to stop", e));
|
||||
!!waku && waku.stop().catch((e) => log("Waku failed to stop", e));
|
||||
}
|
|
@ -2,17 +2,15 @@ import type { IFilterSubscription, LightNode } from "@waku/interfaces";
|
|||
import { utf8ToBytes } from "@waku/utils/bytes";
|
||||
import { expect } from "chai";
|
||||
|
||||
import { NimGoNode } from "../../src/index.js";
|
||||
import { MessageCollector, NimGoNode, tearDownNodes } from "../../src/index.js";
|
||||
|
||||
import {
|
||||
MessageCollector,
|
||||
setupNodes,
|
||||
tearDownNodes,
|
||||
runNodes,
|
||||
TestContentTopic,
|
||||
TestDecoder,
|
||||
TestEncoder,
|
||||
validatePingError
|
||||
} from "./filter_test_utils.js";
|
||||
} from "./utils.js";
|
||||
|
||||
describe("Waku Filter V2: Ping", function () {
|
||||
// Set the timeout for all tests in this suite. Can be overwritten at test level
|
||||
|
@ -22,17 +20,15 @@ describe("Waku Filter V2: Ping", function () {
|
|||
let subscription: IFilterSubscription;
|
||||
let messageCollector: MessageCollector;
|
||||
|
||||
this.afterEach(async function () {
|
||||
tearDownNodes(nwaku, waku);
|
||||
});
|
||||
|
||||
this.beforeEach(async function () {
|
||||
this.timeout(15000);
|
||||
const setup = await setupNodes(this);
|
||||
nwaku = setup.nwaku;
|
||||
waku = setup.waku;
|
||||
subscription = setup.subscription;
|
||||
messageCollector = setup.messageCollector;
|
||||
[nwaku, waku] = await runNodes(this);
|
||||
subscription = await waku.filter.createSubscription();
|
||||
messageCollector = new MessageCollector(TestContentTopic);
|
||||
});
|
||||
|
||||
this.afterEach(async function () {
|
||||
tearDownNodes([nwaku], [waku]);
|
||||
});
|
||||
|
||||
it("Ping on subscribed peer", async function () {
|
||||
|
|
|
@ -6,20 +6,20 @@ import { expect } from "chai";
|
|||
|
||||
import {
|
||||
delay,
|
||||
MessageCollector,
|
||||
NimGoNode,
|
||||
tearDownNodes,
|
||||
TEST_STRING,
|
||||
TEST_TIMESTAMPS
|
||||
} from "../../src/index.js";
|
||||
|
||||
import {
|
||||
MessageCollector,
|
||||
messageText,
|
||||
setupNodes,
|
||||
tearDownNodes,
|
||||
runNodes,
|
||||
TestContentTopic,
|
||||
TestDecoder,
|
||||
TestEncoder
|
||||
} from "./filter_test_utils.js";
|
||||
} from "./utils.js";
|
||||
|
||||
describe("Waku Filter V2: FilterPush", function () {
|
||||
// Set the timeout for all tests in this suite. Can be overwritten at test level
|
||||
|
@ -29,17 +29,15 @@ describe("Waku Filter V2: FilterPush", function () {
|
|||
let subscription: IFilterSubscription;
|
||||
let messageCollector: MessageCollector;
|
||||
|
||||
this.afterEach(async function () {
|
||||
tearDownNodes(nwaku, waku);
|
||||
});
|
||||
|
||||
this.beforeEach(async function () {
|
||||
this.timeout(15000);
|
||||
const setup = await setupNodes(this);
|
||||
nwaku = setup.nwaku;
|
||||
waku = setup.waku;
|
||||
subscription = setup.subscription;
|
||||
messageCollector = setup.messageCollector;
|
||||
[nwaku, waku] = await runNodes(this);
|
||||
subscription = await waku.filter.createSubscription();
|
||||
messageCollector = new MessageCollector(TestContentTopic);
|
||||
});
|
||||
|
||||
this.afterEach(async function () {
|
||||
tearDownNodes([nwaku], [waku]);
|
||||
});
|
||||
|
||||
TEST_STRING.forEach((testItem) => {
|
||||
|
@ -50,8 +48,7 @@ describe("Waku Filter V2: FilterPush", function () {
|
|||
});
|
||||
|
||||
expect(await messageCollector.waitForMessages(1)).to.eq(true);
|
||||
messageCollector.verifyReceivedMessage({
|
||||
index: 0,
|
||||
messageCollector.verifyReceivedMessage(0, {
|
||||
expectedMessageText: testItem.value
|
||||
});
|
||||
});
|
||||
|
@ -72,8 +69,8 @@ describe("Waku Filter V2: FilterPush", function () {
|
|||
]);
|
||||
|
||||
expect(await messageCollector.waitForMessages(1)).to.eq(true);
|
||||
messageCollector.verifyReceivedMessage({
|
||||
index: 0,
|
||||
messageCollector.verifyReceivedMessage(0, {
|
||||
expectedMessageText: messageText,
|
||||
checkTimestamp: false
|
||||
});
|
||||
|
||||
|
@ -82,7 +79,7 @@ describe("Waku Filter V2: FilterPush", function () {
|
|||
if (testItem == undefined) {
|
||||
expect(timestamp).to.eq(undefined);
|
||||
}
|
||||
if (timestamp !== undefined) {
|
||||
if (timestamp !== undefined && timestamp instanceof Date) {
|
||||
expect(testItem?.toString()).to.contain(timestamp.getTime().toString());
|
||||
}
|
||||
});
|
||||
|
@ -219,7 +216,9 @@ describe("Waku Filter V2: FilterPush", function () {
|
|||
]);
|
||||
|
||||
expect(await messageCollector.waitForMessages(1)).to.eq(true);
|
||||
messageCollector.verifyReceivedMessage({ index: 0 });
|
||||
messageCollector.verifyReceivedMessage(0, {
|
||||
expectedMessageText: messageText
|
||||
});
|
||||
});
|
||||
|
||||
// Will be skipped until https://github.com/waku-org/js-waku/issues/1464 si done
|
||||
|
@ -245,12 +244,10 @@ describe("Waku Filter V2: FilterPush", function () {
|
|||
|
||||
// Confirm both messages were received.
|
||||
expect(await messageCollector.waitForMessages(2)).to.eq(true);
|
||||
messageCollector.verifyReceivedMessage({
|
||||
index: 0,
|
||||
messageCollector.verifyReceivedMessage(0, {
|
||||
expectedMessageText: "M1"
|
||||
});
|
||||
messageCollector.verifyReceivedMessage({
|
||||
index: 1,
|
||||
messageCollector.verifyReceivedMessage(1, {
|
||||
expectedMessageText: "M2"
|
||||
});
|
||||
});
|
||||
|
@ -270,12 +267,10 @@ describe("Waku Filter V2: FilterPush", function () {
|
|||
|
||||
// Confirm both messages were received.
|
||||
expect(await messageCollector.waitForMessages(2)).to.eq(true);
|
||||
messageCollector.verifyReceivedMessage({
|
||||
index: 0,
|
||||
messageCollector.verifyReceivedMessage(0, {
|
||||
expectedMessageText: "M1"
|
||||
});
|
||||
messageCollector.verifyReceivedMessage({
|
||||
index: 1,
|
||||
messageCollector.verifyReceivedMessage(1, {
|
||||
expectedMessageText: "M2"
|
||||
});
|
||||
});
|
||||
|
|
|
@ -12,21 +12,21 @@ import { expect } from "chai";
|
|||
import {
|
||||
delay,
|
||||
makeLogFileName,
|
||||
MessageCollector,
|
||||
NimGoNode,
|
||||
tearDownNodes,
|
||||
TEST_STRING
|
||||
} from "../../src/index.js";
|
||||
|
||||
import {
|
||||
generateTestData,
|
||||
MessageCollector,
|
||||
messagePayload,
|
||||
messageText,
|
||||
setupNodes,
|
||||
tearDownNodes,
|
||||
runNodes,
|
||||
TestContentTopic,
|
||||
TestDecoder,
|
||||
TestEncoder
|
||||
} from "./filter_test_utils.js";
|
||||
} from "./utils.js";
|
||||
|
||||
describe("Waku Filter V2: Subscribe", function () {
|
||||
// Set the timeout for all tests in this suite. Can be overwritten at test level
|
||||
|
@ -37,17 +37,18 @@ describe("Waku Filter V2: Subscribe", function () {
|
|||
let subscription: IFilterSubscription;
|
||||
let messageCollector: MessageCollector;
|
||||
|
||||
this.afterEach(async function () {
|
||||
tearDownNodes(nwaku, waku, nwaku2);
|
||||
});
|
||||
|
||||
this.beforeEach(async function () {
|
||||
this.timeout(15000);
|
||||
const setup = await setupNodes(this);
|
||||
nwaku = setup.nwaku;
|
||||
waku = setup.waku;
|
||||
subscription = setup.subscription;
|
||||
messageCollector = setup.messageCollector;
|
||||
[nwaku, waku] = await runNodes(this);
|
||||
subscription = await waku.filter.createSubscription();
|
||||
messageCollector = new MessageCollector(TestContentTopic);
|
||||
|
||||
// Nwaku subscribe to the default pubsub topic
|
||||
await nwaku.ensureSubscriptions();
|
||||
});
|
||||
|
||||
this.afterEach(async function () {
|
||||
tearDownNodes([nwaku, nwaku2], [waku]);
|
||||
});
|
||||
|
||||
it("Subscribe and receive messages via lightPush", async function () {
|
||||
|
@ -56,7 +57,9 @@ describe("Waku Filter V2: Subscribe", function () {
|
|||
await waku.lightPush.send(TestEncoder, messagePayload);
|
||||
|
||||
expect(await messageCollector.waitForMessages(1)).to.eq(true);
|
||||
messageCollector.verifyReceivedMessage({ index: 0 });
|
||||
messageCollector.verifyReceivedMessage(0, {
|
||||
expectedMessageText: messageText
|
||||
});
|
||||
expect((await nwaku.messages()).length).to.eq(1);
|
||||
});
|
||||
|
||||
|
@ -74,7 +77,9 @@ describe("Waku Filter V2: Subscribe", function () {
|
|||
);
|
||||
|
||||
expect(await messageCollector.waitForMessages(1)).to.eq(true);
|
||||
messageCollector.verifyReceivedMessage({ index: 0 });
|
||||
messageCollector.verifyReceivedMessage(0, {
|
||||
expectedMessageText: messageText
|
||||
});
|
||||
expect((await nwaku.messages()).length).to.eq(1);
|
||||
});
|
||||
|
||||
|
@ -84,7 +89,9 @@ describe("Waku Filter V2: Subscribe", function () {
|
|||
await waku.lightPush.send(TestEncoder, messagePayload);
|
||||
|
||||
expect(await messageCollector.waitForMessages(1)).to.eq(true);
|
||||
messageCollector.verifyReceivedMessage({ index: 0 });
|
||||
messageCollector.verifyReceivedMessage(0, {
|
||||
expectedMessageText: messageText
|
||||
});
|
||||
|
||||
// Send another message on the same topic.
|
||||
const newMessageText = "Filtering still works!";
|
||||
|
@ -94,9 +101,8 @@ describe("Waku Filter V2: Subscribe", function () {
|
|||
|
||||
// Verify that the second message was successfully received.
|
||||
expect(await messageCollector.waitForMessages(2)).to.eq(true);
|
||||
messageCollector.verifyReceivedMessage({
|
||||
expectedMessageText: newMessageText,
|
||||
index: 1
|
||||
messageCollector.verifyReceivedMessage(1, {
|
||||
expectedMessageText: newMessageText
|
||||
});
|
||||
expect((await nwaku.messages()).length).to.eq(2);
|
||||
});
|
||||
|
@ -106,7 +112,9 @@ describe("Waku Filter V2: Subscribe", function () {
|
|||
await subscription.subscribe([TestDecoder], messageCollector.callback);
|
||||
await waku.lightPush.send(TestEncoder, messagePayload);
|
||||
expect(await messageCollector.waitForMessages(1)).to.eq(true);
|
||||
messageCollector.verifyReceivedMessage({ index: 0 });
|
||||
messageCollector.verifyReceivedMessage(0, {
|
||||
expectedMessageText: messageText
|
||||
});
|
||||
|
||||
// Modify subscription to include a new content topic and send a message.
|
||||
const newMessageText = "Filtering still works!";
|
||||
|
@ -119,18 +127,16 @@ describe("Waku Filter V2: Subscribe", function () {
|
|||
payload: utf8ToBytes(newMessageText)
|
||||
});
|
||||
expect(await messageCollector.waitForMessages(2)).to.eq(true);
|
||||
messageCollector.verifyReceivedMessage({
|
||||
messageCollector.verifyReceivedMessage(1, {
|
||||
expectedContentTopic: newContentTopic,
|
||||
expectedMessageText: newMessageText,
|
||||
index: 1
|
||||
expectedMessageText: newMessageText
|
||||
});
|
||||
|
||||
// Send another message on the initial content topic to verify it still works.
|
||||
await waku.lightPush.send(TestEncoder, newMessagePayload);
|
||||
expect(await messageCollector.waitForMessages(3)).to.eq(true);
|
||||
messageCollector.verifyReceivedMessage({
|
||||
expectedMessageText: newMessageText,
|
||||
index: 2
|
||||
messageCollector.verifyReceivedMessage(2, {
|
||||
expectedMessageText: newMessageText
|
||||
});
|
||||
expect((await nwaku.messages()).length).to.eq(3);
|
||||
});
|
||||
|
@ -154,10 +160,9 @@ describe("Waku Filter V2: Subscribe", function () {
|
|||
// Verify that each message was received on the corresponding topic.
|
||||
expect(await messageCollector.waitForMessages(20)).to.eq(true);
|
||||
td.contentTopics.forEach((topic, index) => {
|
||||
messageCollector.verifyReceivedMessage({
|
||||
messageCollector.verifyReceivedMessage(index, {
|
||||
expectedContentTopic: topic,
|
||||
expectedMessageText: `Message for Topic ${index + 1}`,
|
||||
index: index
|
||||
expectedMessageText: `Message for Topic ${index + 1}`
|
||||
});
|
||||
});
|
||||
});
|
||||
|
@ -179,10 +184,9 @@ describe("Waku Filter V2: Subscribe", function () {
|
|||
// Verify that each message was received on the corresponding topic.
|
||||
expect(await messageCollector.waitForMessages(30)).to.eq(true);
|
||||
td.contentTopics.forEach((topic, index) => {
|
||||
messageCollector.verifyReceivedMessage({
|
||||
messageCollector.verifyReceivedMessage(index, {
|
||||
expectedContentTopic: topic,
|
||||
expectedMessageText: `Message for Topic ${index + 1}`,
|
||||
index: index
|
||||
expectedMessageText: `Message for Topic ${index + 1}`
|
||||
});
|
||||
});
|
||||
});
|
||||
|
@ -253,12 +257,10 @@ describe("Waku Filter V2: Subscribe", function () {
|
|||
|
||||
// Confirm both messages were received.
|
||||
expect(await messageCollector.waitForMessages(2)).to.eq(true);
|
||||
messageCollector.verifyReceivedMessage({
|
||||
index: 0,
|
||||
messageCollector.verifyReceivedMessage(0, {
|
||||
expectedMessageText: "M1"
|
||||
});
|
||||
messageCollector.verifyReceivedMessage({
|
||||
index: 1,
|
||||
messageCollector.verifyReceivedMessage(1, {
|
||||
expectedMessageText: "M2"
|
||||
});
|
||||
});
|
||||
|
@ -273,8 +275,8 @@ describe("Waku Filter V2: Subscribe", function () {
|
|||
await waku.lightPush.send(newEncoder, messagePayload);
|
||||
|
||||
expect(await messageCollector.waitForMessages(1)).to.eq(true);
|
||||
messageCollector.verifyReceivedMessage({
|
||||
index: 0,
|
||||
messageCollector.verifyReceivedMessage(0, {
|
||||
expectedMessageText: messageText,
|
||||
expectedContentTopic: newContentTopic
|
||||
});
|
||||
});
|
||||
|
@ -295,12 +297,10 @@ describe("Waku Filter V2: Subscribe", function () {
|
|||
|
||||
// Check if both messages were received
|
||||
expect(await messageCollector.waitForMessages(2)).to.eq(true);
|
||||
messageCollector.verifyReceivedMessage({
|
||||
index: 0,
|
||||
messageCollector.verifyReceivedMessage(0, {
|
||||
expectedMessageText: "M1"
|
||||
});
|
||||
messageCollector.verifyReceivedMessage({
|
||||
index: 1,
|
||||
messageCollector.verifyReceivedMessage(1, {
|
||||
expectedContentTopic: newContentTopic,
|
||||
expectedMessageText: "M2"
|
||||
});
|
||||
|
@ -332,12 +332,10 @@ describe("Waku Filter V2: Subscribe", function () {
|
|||
|
||||
// Check if both messages were received
|
||||
expect(await messageCollector.waitForMessages(2)).to.eq(true);
|
||||
messageCollector.verifyReceivedMessage({
|
||||
index: 0,
|
||||
messageCollector.verifyReceivedMessage(0, {
|
||||
expectedMessageText: "M1"
|
||||
});
|
||||
messageCollector.verifyReceivedMessage({
|
||||
index: 1,
|
||||
messageCollector.verifyReceivedMessage(1, {
|
||||
expectedContentTopic: newContentTopic,
|
||||
expectedMessageText: "M2"
|
||||
});
|
||||
|
|
|
@ -3,18 +3,17 @@ import type { IFilterSubscription, LightNode } from "@waku/interfaces";
|
|||
import { utf8ToBytes } from "@waku/utils/bytes";
|
||||
import { expect } from "chai";
|
||||
|
||||
import { NimGoNode } from "../../src/index.js";
|
||||
import { MessageCollector, NimGoNode, tearDownNodes } from "../../src/index.js";
|
||||
|
||||
import {
|
||||
generateTestData,
|
||||
MessageCollector,
|
||||
messagePayload,
|
||||
setupNodes,
|
||||
tearDownNodes,
|
||||
messageText,
|
||||
runNodes,
|
||||
TestContentTopic,
|
||||
TestDecoder,
|
||||
TestEncoder
|
||||
} from "./filter_test_utils.js";
|
||||
} from "./utils.js";
|
||||
|
||||
describe("Waku Filter V2: Unsubscribe", function () {
|
||||
// Set the timeout for all tests in this suite. Can be overwritten at test level
|
||||
|
@ -24,17 +23,18 @@ describe("Waku Filter V2: Unsubscribe", function () {
|
|||
let subscription: IFilterSubscription;
|
||||
let messageCollector: MessageCollector;
|
||||
|
||||
this.afterEach(async function () {
|
||||
tearDownNodes(nwaku, waku);
|
||||
});
|
||||
|
||||
this.beforeEach(async function () {
|
||||
this.timeout(15000);
|
||||
const setup = await setupNodes(this);
|
||||
nwaku = setup.nwaku;
|
||||
waku = setup.waku;
|
||||
subscription = setup.subscription;
|
||||
messageCollector = setup.messageCollector;
|
||||
[nwaku, waku] = await runNodes(this);
|
||||
subscription = await waku.filter.createSubscription();
|
||||
messageCollector = new MessageCollector(TestContentTopic);
|
||||
|
||||
// Nwaku subscribe to the default pubsub topic
|
||||
await nwaku.ensureSubscriptions();
|
||||
});
|
||||
|
||||
this.afterEach(async function () {
|
||||
tearDownNodes([nwaku], [waku]);
|
||||
});
|
||||
|
||||
it("Unsubscribe 1 topic - node subscribed to 1 topic", async function () {
|
||||
|
@ -48,7 +48,9 @@ describe("Waku Filter V2: Unsubscribe", function () {
|
|||
expect(await messageCollector.waitForMessages(2)).to.eq(false);
|
||||
|
||||
// Check that from 2 messages send only the 1st was received
|
||||
messageCollector.verifyReceivedMessage({ index: 0 });
|
||||
messageCollector.verifyReceivedMessage(0, {
|
||||
expectedMessageText: messageText
|
||||
});
|
||||
expect(messageCollector.count).to.eq(1);
|
||||
expect((await nwaku.messages()).length).to.eq(2);
|
||||
});
|
||||
|
|
|
@ -0,0 +1,97 @@
|
|||
import {
|
||||
createDecoder,
|
||||
createEncoder,
|
||||
Decoder,
|
||||
Encoder,
|
||||
waitForRemotePeer
|
||||
} from "@waku/core";
|
||||
import { IFilterSubscription, LightNode, Protocols } from "@waku/interfaces";
|
||||
import { createLightNode } from "@waku/sdk";
|
||||
import { utf8ToBytes } from "@waku/utils/bytes";
|
||||
import debug from "debug";
|
||||
import { Context } from "mocha";
|
||||
|
||||
import { makeLogFileName, NimGoNode, NOISE_KEY_1 } from "../../src/index.js";
|
||||
|
||||
// Constants for test configuration.
|
||||
export const log = debug("waku:test:filter");
|
||||
export const TestContentTopic = "/test/1/waku-filter";
|
||||
export const TestEncoder = createEncoder({ contentTopic: TestContentTopic });
|
||||
export const TestDecoder = createDecoder(TestContentTopic);
|
||||
export const messageText = "Filtering works!";
|
||||
export const messagePayload = { payload: utf8ToBytes(messageText) };
|
||||
|
||||
// Utility to generate test data for multiple topics tests.
|
||||
export function generateTestData(topicCount: number): {
|
||||
contentTopics: string[];
|
||||
encoders: Encoder[];
|
||||
decoders: Decoder[];
|
||||
} {
|
||||
const contentTopics = Array.from(
|
||||
{ length: topicCount },
|
||||
(_, i) => `/test/${i + 1}/waku-multi`
|
||||
);
|
||||
const encoders = contentTopics.map((topic) =>
|
||||
createEncoder({ contentTopic: topic })
|
||||
);
|
||||
const decoders = contentTopics.map((topic) => createDecoder(topic));
|
||||
return {
|
||||
contentTopics,
|
||||
encoders,
|
||||
decoders
|
||||
};
|
||||
}
|
||||
|
||||
// Utility to validate errors related to pings in the subscription.
|
||||
export async function validatePingError(
|
||||
subscription: IFilterSubscription
|
||||
): Promise<void> {
|
||||
try {
|
||||
await subscription.ping();
|
||||
throw new Error(
|
||||
"Ping was successful but was expected to fail with a specific error."
|
||||
);
|
||||
} catch (err) {
|
||||
if (
|
||||
err instanceof Error &&
|
||||
err.message.includes("peer has no subscriptions")
|
||||
) {
|
||||
return;
|
||||
} else {
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export async function runNodes(
|
||||
currentTest: Context
|
||||
): Promise<[NimGoNode, LightNode]> {
|
||||
const nwaku = new NimGoNode(makeLogFileName(currentTest));
|
||||
await nwaku.startWithRetries(
|
||||
{
|
||||
filter: true,
|
||||
lightpush: true,
|
||||
relay: true
|
||||
},
|
||||
{ retries: 3 }
|
||||
);
|
||||
|
||||
let waku: LightNode | undefined;
|
||||
try {
|
||||
waku = await createLightNode({
|
||||
staticNoiseKey: NOISE_KEY_1,
|
||||
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }
|
||||
});
|
||||
await waku.start();
|
||||
} catch (error) {
|
||||
log("jswaku node failed to start:", error);
|
||||
}
|
||||
|
||||
if (waku) {
|
||||
await waku.dial(await nwaku.getMultiaddrWithId());
|
||||
await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]);
|
||||
return [nwaku, waku];
|
||||
} else {
|
||||
throw new Error("Failed to initialize waku");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,49 @@
|
|||
import { LightNode } from "@waku/interfaces";
|
||||
import { utf8ToBytes } from "@waku/utils/bytes";
|
||||
import { expect } from "chai";
|
||||
|
||||
import { MessageCollector, NimGoNode, tearDownNodes } from "../../src/index.js";
|
||||
|
||||
import {
|
||||
messageText,
|
||||
runNodes,
|
||||
TestContentTopic,
|
||||
TestEncoder
|
||||
} from "./utils.js";
|
||||
|
||||
describe("Waku Light Push [node only] - custom pubsub topic", function () {
|
||||
this.timeout(15000);
|
||||
let waku: LightNode;
|
||||
let nwaku: NimGoNode;
|
||||
let messageCollector: MessageCollector;
|
||||
const customPubSubTopic = "/waku/2/custom-dapp/proto";
|
||||
|
||||
beforeEach(async function () {
|
||||
[nwaku, waku] = await runNodes(this, customPubSubTopic);
|
||||
messageCollector = new MessageCollector(
|
||||
TestContentTopic,
|
||||
nwaku,
|
||||
customPubSubTopic
|
||||
);
|
||||
});
|
||||
|
||||
this.afterEach(async function () {
|
||||
tearDownNodes([nwaku], [waku]);
|
||||
});
|
||||
|
||||
it("Push message", async function () {
|
||||
const nimPeerId = await nwaku.getPeerId();
|
||||
|
||||
const pushResponse = await waku.lightPush.send(TestEncoder, {
|
||||
payload: utf8ToBytes(messageText)
|
||||
});
|
||||
|
||||
expect(pushResponse.recipients[0].toString()).to.eq(nimPeerId.toString());
|
||||
|
||||
expect(await messageCollector.waitForMessages(1)).to.eq(true);
|
||||
messageCollector.verifyReceivedMessage(0, {
|
||||
expectedMessageText: messageText,
|
||||
expectedContentTopic: TestContentTopic
|
||||
});
|
||||
});
|
||||
});
|
|
@ -0,0 +1,234 @@
|
|||
import { createEncoder, DefaultPubSubTopic } from "@waku/core";
|
||||
import { IRateLimitProof, LightNode, SendError } from "@waku/interfaces";
|
||||
import { utf8ToBytes } from "@waku/utils/bytes";
|
||||
import { expect } from "chai";
|
||||
|
||||
import {
|
||||
MessageCollector,
|
||||
NimGoNode,
|
||||
tearDownNodes,
|
||||
TEST_STRING
|
||||
} from "../../src/index.js";
|
||||
import { generateRandomUint8Array } from "../../src/random_array.js";
|
||||
|
||||
import {
|
||||
messagePayload,
|
||||
messageText,
|
||||
runNodes,
|
||||
TestContentTopic,
|
||||
TestEncoder
|
||||
} from "./utils.js";
|
||||
|
||||
describe("Waku Light Push [node only]", function () {
|
||||
// Set the timeout for all tests in this suite. Can be overwritten at test level
|
||||
this.timeout(15000);
|
||||
let waku: LightNode;
|
||||
let nwaku: NimGoNode;
|
||||
let messageCollector: MessageCollector;
|
||||
|
||||
this.beforeEach(async function () {
|
||||
this.timeout(15000);
|
||||
[nwaku, waku] = await runNodes(this);
|
||||
messageCollector = new MessageCollector(
|
||||
TestContentTopic,
|
||||
nwaku,
|
||||
DefaultPubSubTopic
|
||||
);
|
||||
});
|
||||
|
||||
this.afterEach(async function () {
|
||||
tearDownNodes([nwaku], [waku]);
|
||||
});
|
||||
|
||||
TEST_STRING.forEach((testItem) => {
|
||||
it(`Push message with ${testItem.description} payload`, async function () {
|
||||
const pushResponse = await waku.lightPush.send(TestEncoder, {
|
||||
payload: utf8ToBytes(testItem.value)
|
||||
});
|
||||
expect(pushResponse.recipients.length).to.eq(1);
|
||||
|
||||
expect(await messageCollector.waitForMessages(1)).to.eq(true);
|
||||
messageCollector.verifyReceivedMessage(0, {
|
||||
expectedMessageText: testItem.value
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
it("Push 30 different messages", async function () {
|
||||
const generateMessageText = (index: number): string => `M${index}`;
|
||||
|
||||
for (let i = 0; i < 30; i++) {
|
||||
const pushResponse = await waku.lightPush.send(TestEncoder, {
|
||||
payload: utf8ToBytes(generateMessageText(i))
|
||||
});
|
||||
expect(pushResponse.recipients.length).to.eq(1);
|
||||
}
|
||||
|
||||
expect(await messageCollector.waitForMessages(30)).to.eq(true);
|
||||
|
||||
for (let i = 0; i < 30; i++) {
|
||||
messageCollector.verifyReceivedMessage(i, {
|
||||
expectedMessageText: generateMessageText(i)
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
it("Fails to push message with empty payload", async function () {
|
||||
const pushResponse = await waku.lightPush.send(TestEncoder, {
|
||||
payload: utf8ToBytes("")
|
||||
});
|
||||
|
||||
if (nwaku.type() == "go-waku") {
|
||||
expect(pushResponse.recipients.length).to.eq(1);
|
||||
expect(await messageCollector.waitForMessages(1)).to.eq(true);
|
||||
messageCollector.verifyReceivedMessage(0, {
|
||||
expectedMessageText: undefined
|
||||
});
|
||||
} else {
|
||||
expect(pushResponse.recipients.length).to.eq(0);
|
||||
expect(pushResponse.errors).to.include(SendError.NO_RPC_RESPONSE);
|
||||
expect(await messageCollector.waitForMessages(1)).to.eq(false);
|
||||
}
|
||||
});
|
||||
|
||||
TEST_STRING.forEach((testItem) => {
|
||||
it(`Push message with content topic containing ${testItem.description}`, async function () {
|
||||
const customEncoder = createEncoder({
|
||||
contentTopic: testItem.value
|
||||
});
|
||||
const pushResponse = await waku.lightPush.send(
|
||||
customEncoder,
|
||||
messagePayload
|
||||
);
|
||||
expect(pushResponse.recipients.length).to.eq(1);
|
||||
|
||||
expect(await messageCollector.waitForMessages(1)).to.eq(true);
|
||||
messageCollector.verifyReceivedMessage(0, {
|
||||
expectedMessageText: messageText,
|
||||
expectedContentTopic: testItem.value
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
it("Fails to push message with empty content topic", async function () {
|
||||
try {
|
||||
createEncoder({ contentTopic: "" });
|
||||
expect.fail("Expected an error but didn't get one");
|
||||
} catch (error) {
|
||||
expect((error as Error).message).to.equal(
|
||||
"Content topic must be specified"
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
it("Push message with meta", async function () {
|
||||
const customTestEncoder = createEncoder({
|
||||
contentTopic: TestContentTopic,
|
||||
metaSetter: () => new Uint8Array(10)
|
||||
});
|
||||
|
||||
const pushResponse = await waku.lightPush.send(
|
||||
customTestEncoder,
|
||||
messagePayload
|
||||
);
|
||||
expect(pushResponse.recipients.length).to.eq(1);
|
||||
|
||||
expect(await messageCollector.waitForMessages(1)).to.eq(true);
|
||||
messageCollector.verifyReceivedMessage(0, {
|
||||
expectedMessageText: messageText
|
||||
});
|
||||
});
|
||||
|
||||
it("Fails to push message with large meta", async function () {
|
||||
const customTestEncoder = createEncoder({
|
||||
contentTopic: TestContentTopic,
|
||||
metaSetter: () => new Uint8Array(10 ** 6)
|
||||
});
|
||||
|
||||
const pushResponse = await waku.lightPush.send(
|
||||
customTestEncoder,
|
||||
messagePayload
|
||||
);
|
||||
|
||||
if (nwaku.type() == "go-waku") {
|
||||
expect(pushResponse.recipients.length).to.eq(1);
|
||||
expect(await messageCollector.waitForMessages(1)).to.eq(true);
|
||||
messageCollector.verifyReceivedMessage(0, {
|
||||
expectedMessageText: messageText
|
||||
});
|
||||
} else {
|
||||
expect(pushResponse.recipients.length).to.eq(0);
|
||||
expect(pushResponse.errors).to.include(SendError.NO_RPC_RESPONSE);
|
||||
expect(await messageCollector.waitForMessages(1)).to.eq(false);
|
||||
}
|
||||
});
|
||||
|
||||
it("Push message with rate limit", async function () {
|
||||
const rateLimitProof: IRateLimitProof = {
|
||||
proof: utf8ToBytes("proofData"),
|
||||
merkleRoot: utf8ToBytes("merkleRootData"),
|
||||
epoch: utf8ToBytes("epochData"),
|
||||
shareX: utf8ToBytes("shareXData"),
|
||||
shareY: utf8ToBytes("shareYData"),
|
||||
nullifier: utf8ToBytes("nullifierData"),
|
||||
rlnIdentifier: utf8ToBytes("rlnIdentifierData")
|
||||
};
|
||||
|
||||
const pushResponse = await waku.lightPush.send(TestEncoder, {
|
||||
payload: utf8ToBytes(messageText),
|
||||
rateLimitProof: rateLimitProof
|
||||
});
|
||||
expect(pushResponse.recipients.length).to.eq(1);
|
||||
|
||||
expect(await messageCollector.waitForMessages(1)).to.eq(true);
|
||||
messageCollector.verifyReceivedMessage(0, {
|
||||
expectedMessageText: messageText
|
||||
});
|
||||
});
|
||||
|
||||
[
|
||||
Date.now() - 3600000 * 24 * 356,
|
||||
Date.now() - 3600000,
|
||||
Date.now() + 3600000
|
||||
].forEach((testItem) => {
|
||||
it(`Push message with custom timestamp: ${testItem}`, async function () {
|
||||
const customTimeNanos = BigInt(testItem) * BigInt(1000000);
|
||||
const pushResponse = await waku.lightPush.send(TestEncoder, {
|
||||
payload: utf8ToBytes(messageText),
|
||||
timestamp: new Date(testItem)
|
||||
});
|
||||
expect(pushResponse.recipients.length).to.eq(1);
|
||||
|
||||
expect(await messageCollector.waitForMessages(1)).to.eq(true);
|
||||
messageCollector.verifyReceivedMessage(0, {
|
||||
expectedMessageText: messageText,
|
||||
expectedTimestamp: customTimeNanos
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
it("Push message equal or less that 1MB", async function () {
|
||||
const oneMbPayload = generateRandomUint8Array(1024 ** 2);
|
||||
let pushResponse = await waku.lightPush.send(TestEncoder, {
|
||||
payload: oneMbPayload
|
||||
});
|
||||
expect(pushResponse.recipients.length).to.greaterThan(0);
|
||||
|
||||
const bigPayload = generateRandomUint8Array(65536);
|
||||
pushResponse = await waku.lightPush.send(TestEncoder, {
|
||||
payload: bigPayload
|
||||
});
|
||||
expect(pushResponse.recipients.length).to.greaterThan(0);
|
||||
});
|
||||
|
||||
it("Fails to push message bigger that 1MB", async function () {
|
||||
const MB = 1024 ** 2;
|
||||
|
||||
const pushResponse = await waku.lightPush.send(TestEncoder, {
|
||||
payload: generateRandomUint8Array(MB + 65536)
|
||||
});
|
||||
expect(pushResponse.recipients.length).to.eq(0);
|
||||
expect(pushResponse.errors).to.include(SendError.SIZE_TOO_BIG);
|
||||
expect(await messageCollector.waitForMessages(1)).to.eq(false);
|
||||
});
|
||||
});
|
|
@ -0,0 +1,48 @@
|
|||
import { createEncoder, waitForRemotePeer } from "@waku/core";
|
||||
import { LightNode, Protocols } from "@waku/interfaces";
|
||||
import { createLightNode, utf8ToBytes } from "@waku/sdk";
|
||||
import debug from "debug";
|
||||
|
||||
import { makeLogFileName, NimGoNode, NOISE_KEY_1 } from "../../src/index.js";
|
||||
|
||||
// Constants for test configuration.
|
||||
export const log = debug("waku:test:lightpush");
|
||||
export const TestContentTopic = "/test/1/waku-light-push/utf8";
|
||||
export const TestEncoder = createEncoder({ contentTopic: TestContentTopic });
|
||||
export const messageText = "Light Push works!";
|
||||
export const messagePayload = { payload: utf8ToBytes(messageText) };
|
||||
|
||||
export async function runNodes(
|
||||
context: Mocha.Context,
|
||||
pubSubTopic?: string
|
||||
): Promise<[NimGoNode, LightNode]> {
|
||||
const nwakuOptional = pubSubTopic ? { topic: pubSubTopic } : {};
|
||||
const nwaku = new NimGoNode(makeLogFileName(context));
|
||||
await nwaku.startWithRetries(
|
||||
{
|
||||
lightpush: true,
|
||||
relay: true,
|
||||
...nwakuOptional
|
||||
},
|
||||
{ retries: 3 }
|
||||
);
|
||||
|
||||
let waku: LightNode | undefined;
|
||||
try {
|
||||
waku = await createLightNode({
|
||||
pubSubTopic,
|
||||
staticNoiseKey: NOISE_KEY_1
|
||||
});
|
||||
await waku.start();
|
||||
} catch (error) {
|
||||
log("jswaku node failed to start:", error);
|
||||
}
|
||||
|
||||
if (waku) {
|
||||
await waku.dial(await nwaku.getMultiaddrWithId());
|
||||
await waitForRemotePeer(waku, [Protocols.LightPush]);
|
||||
return [nwaku, waku];
|
||||
} else {
|
||||
throw new Error("Failed to initialize waku");
|
||||
}
|
||||
}
|
|
@ -1,158 +0,0 @@
|
|||
import { createEncoder, waitForRemotePeer } from "@waku/core";
|
||||
import { LightNode, SendError } from "@waku/interfaces";
|
||||
import { Protocols } from "@waku/interfaces";
|
||||
import { createLightNode } from "@waku/sdk";
|
||||
import { utf8ToBytes } from "@waku/utils/bytes";
|
||||
import { expect } from "chai";
|
||||
import debug from "debug";
|
||||
|
||||
import {
|
||||
base64ToUtf8,
|
||||
delay,
|
||||
makeLogFileName,
|
||||
NimGoNode,
|
||||
NOISE_KEY_1
|
||||
} from "../src/index.js";
|
||||
import { MessageRpcResponse } from "../src/node/interfaces.js";
|
||||
import { generateRandomUint8Array } from "../src/random_array.js";
|
||||
|
||||
const log = debug("waku:test:lightpush");
|
||||
|
||||
const TestContentTopic = "/test/1/waku-light-push/utf8";
|
||||
const TestEncoder = createEncoder({
|
||||
contentTopic: TestContentTopic
|
||||
});
|
||||
|
||||
async function runNodes(
|
||||
context: Mocha.Context,
|
||||
pubSubTopic?: string
|
||||
): Promise<[NimGoNode, LightNode]> {
|
||||
const nwakuOptional = pubSubTopic ? { topic: pubSubTopic } : {};
|
||||
const nwaku = new NimGoNode(makeLogFileName(context));
|
||||
await nwaku.start({
|
||||
lightpush: true,
|
||||
relay: true,
|
||||
...nwakuOptional
|
||||
});
|
||||
|
||||
const waku = await createLightNode({
|
||||
pubSubTopic,
|
||||
staticNoiseKey: NOISE_KEY_1
|
||||
});
|
||||
await waku.start();
|
||||
await waku.dial(await nwaku.getMultiaddrWithId());
|
||||
await waitForRemotePeer(waku, [Protocols.LightPush]);
|
||||
|
||||
return [nwaku, waku];
|
||||
}
|
||||
|
||||
describe("Waku Light Push [node only]", () => {
|
||||
let waku: LightNode;
|
||||
let nwaku: NimGoNode;
|
||||
|
||||
beforeEach(async function () {
|
||||
this.timeout(15_000);
|
||||
[nwaku, waku] = await runNodes(this);
|
||||
});
|
||||
|
||||
afterEach(async function () {
|
||||
try {
|
||||
await nwaku?.stop();
|
||||
await waku?.stop();
|
||||
} catch (e) {
|
||||
console.error("Failed to stop nodes: ", e);
|
||||
}
|
||||
});
|
||||
|
||||
it("Push successfully", async function () {
|
||||
this.timeout(15_000);
|
||||
|
||||
const messageText = "Light Push works!";
|
||||
|
||||
const pushResponse = await waku.lightPush.send(TestEncoder, {
|
||||
payload: utf8ToBytes(messageText)
|
||||
});
|
||||
expect(pushResponse.recipients.length).to.eq(1);
|
||||
|
||||
let msgs: MessageRpcResponse[] = [];
|
||||
|
||||
while (msgs.length === 0) {
|
||||
await delay(200);
|
||||
msgs = await nwaku.messages();
|
||||
}
|
||||
|
||||
expect(msgs[0].contentTopic).to.equal(TestContentTopic);
|
||||
expect(base64ToUtf8(msgs[0].payload)).to.equal(messageText);
|
||||
});
|
||||
|
||||
it("Pushes messages equal or less that 1MB", async function () {
|
||||
this.timeout(15_000);
|
||||
const MB = 1024 ** 2;
|
||||
|
||||
let pushResponse = await waku.lightPush.send(TestEncoder, {
|
||||
payload: generateRandomUint8Array(MB)
|
||||
});
|
||||
expect(pushResponse.recipients.length).to.greaterThan(0);
|
||||
|
||||
pushResponse = await waku.lightPush.send(TestEncoder, {
|
||||
payload: generateRandomUint8Array(65536)
|
||||
});
|
||||
expect(pushResponse.recipients.length).to.greaterThan(0);
|
||||
});
|
||||
|
||||
it("Fails to push message bigger that 1MB", async function () {
|
||||
this.timeout(15_000);
|
||||
const MB = 1024 ** 2;
|
||||
|
||||
const pushResponse = await waku.lightPush.send(TestEncoder, {
|
||||
payload: generateRandomUint8Array(MB + 65536)
|
||||
});
|
||||
expect(pushResponse.recipients.length).to.eq(0);
|
||||
expect(pushResponse.errors).to.include(SendError.SIZE_TOO_BIG);
|
||||
});
|
||||
});
|
||||
|
||||
describe("Waku Light Push [node only] - custom pubsub topic", () => {
|
||||
let waku: LightNode;
|
||||
let nwaku: NimGoNode;
|
||||
const customPubSubTopic = "/waku/2/custom-dapp/proto";
|
||||
|
||||
beforeEach(async function () {
|
||||
this.timeout(15_000);
|
||||
[nwaku, waku] = await runNodes(this, customPubSubTopic);
|
||||
});
|
||||
|
||||
afterEach(async function () {
|
||||
try {
|
||||
await nwaku?.stop();
|
||||
await waku?.stop();
|
||||
} catch (e) {
|
||||
console.error("Failed to stop nodes: ", e);
|
||||
}
|
||||
});
|
||||
|
||||
it("Push message", async function () {
|
||||
this.timeout(15_000);
|
||||
|
||||
const nimPeerId = await nwaku.getPeerId();
|
||||
const messageText = "Light Push works!";
|
||||
|
||||
log("Send message via lightpush");
|
||||
const pushResponse = await waku.lightPush.send(TestEncoder, {
|
||||
payload: utf8ToBytes(messageText)
|
||||
});
|
||||
log("Ack received", pushResponse);
|
||||
expect(pushResponse.recipients[0].toString()).to.eq(nimPeerId.toString());
|
||||
|
||||
let msgs: MessageRpcResponse[] = [];
|
||||
|
||||
log("Waiting for message to show in nwaku");
|
||||
while (msgs.length === 0) {
|
||||
await delay(200);
|
||||
msgs = await nwaku.messages(customPubSubTopic);
|
||||
}
|
||||
|
||||
expect(msgs[0].contentTopic).to.equal(TestContentTopic);
|
||||
expect(base64ToUtf8(msgs[0].payload)).to.equal(messageText);
|
||||
});
|
||||
});
|
|
@ -411,6 +411,9 @@ describe("Waku Relay [node only]", () => {
|
|||
|
||||
await waku.dial(await nwaku.getMultiaddrWithId());
|
||||
await waitForRemotePeer(waku, [Protocols.Relay]);
|
||||
|
||||
// Nwaku subscribe to the default pubsub topic
|
||||
await nwaku.ensureSubscriptions();
|
||||
});
|
||||
|
||||
afterEach(async function () {
|
||||
|
|
Loading…
Reference in New Issue