feat: lightpush & filter send requests to multiple peers (#1779)

* feat: lightpush & filter send requests to multiple peers

* build message-hash before core

* chore: restructure folder heirrarchy

* fix: imports

* chore: move @waku/core to dev deps

* feat: create a new `ServiceNodes` wrapper class to encapsulate service node and message collector with redundancy accounted for

* chore(filter): move tests against single service node to a subdir

* feat: support relay, add strict checking, add tests

* fix(filter): handle errors

* chore(tests): add tests for ping

* add tests for push

* chore: abstract redundancy

* feat: add unsubscribe tests

* fix: tests

* add lightpush tests

* fix: imports

* fix: merge & add warning

* merge: master

* fix: breaking tests with master

* address comments

* make num peers configurable

* fix: typo
This commit is contained in:
Danish Arora 2024-01-24 18:24:03 +05:30 committed by GitHub
parent 3e7b95e604
commit 7affbe222d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
27 changed files with 2159 additions and 197 deletions

4
package-lock.json generated
View File

@ -8,11 +8,11 @@
"workspaces": [
"packages/interfaces",
"packages/utils",
"packages/message-hash",
"packages/proto",
"packages/enr",
"packages/core",
"packages/relay",
"packages/message-hash",
"packages/peer-exchange",
"packages/dns-discovery",
"packages/message-encryption",
@ -27635,6 +27635,7 @@
"@noble/hashes": "^1.3.2",
"@waku/enr": "^0.0.20",
"@waku/interfaces": "0.0.21",
"@waku/message-hash": "^0.1.10",
"@waku/proto": "0.0.6",
"@waku/utils": "0.0.14",
"debug": "^4.3.4",
@ -31818,6 +31819,7 @@
"@waku/build-utils": "*",
"@waku/enr": "^0.0.20",
"@waku/interfaces": "0.0.21",
"@waku/message-hash": "^0.1.10",
"@waku/proto": "0.0.6",
"@waku/utils": "0.0.14",
"chai": "^4.3.10",

View File

@ -5,11 +5,11 @@
"workspaces": [
"packages/interfaces",
"packages/utils",
"packages/message-hash",
"packages/proto",
"packages/enr",
"packages/core",
"packages/relay",
"packages/message-hash",
"packages/peer-exchange",
"packages/dns-discovery",
"packages/message-encryption",

View File

@ -76,6 +76,7 @@
"@noble/hashes": "^1.3.2",
"@waku/enr": "^0.0.20",
"@waku/interfaces": "0.0.21",
"@waku/message-hash": "^0.1.10",
"@waku/proto": "0.0.6",
"@waku/utils": "0.0.14",
"debug": "^4.3.4",

View File

@ -17,6 +17,8 @@ import {
import { filterPeersByDiscovery } from "./filterPeers.js";
import { StreamManager } from "./stream_manager.js";
const DEFAULT_NUM_PEERS_TO_USE = 3;
/**
* A class with predefined helpers, to be used as a base to implement Waku
* Protocols.
@ -24,6 +26,7 @@ import { StreamManager } from "./stream_manager.js";
export class BaseProtocol implements IBaseProtocol {
public readonly addLibp2pEventListener: Libp2p["addEventListener"];
public readonly removeLibp2pEventListener: Libp2p["removeEventListener"];
readonly numPeersToUse: number;
protected streamManager: StreamManager;
protected pubsubTopics: PubsubTopic[];
@ -35,6 +38,8 @@ export class BaseProtocol implements IBaseProtocol {
) {
this.pubsubTopics = this.initializePubsubTopic(options);
this.numPeersToUse = options?.numPeersToUse ?? DEFAULT_NUM_PEERS_TO_USE;
this.addLibp2pEventListener = components.events.addEventListener.bind(
components.events
);
@ -124,6 +129,12 @@ export class BaseProtocol implements IBaseProtocol {
);
}
if (sortedFilteredPeers.length < numPeers) {
this.log.warn(
`Only ${sortedFilteredPeers.length} peers found. Requested ${numPeers}.`
);
}
return sortedFilteredPeers;
}

View File

@ -11,13 +11,13 @@ import type {
IProtoMessage,
IReceiver,
Libp2p,
PeerIdStr,
ProtocolCreateOptions,
PubsubTopic,
SingleShardInfo,
Unsubscribe
} from "@waku/interfaces";
import { DefaultPubsubTopic } from "@waku/interfaces";
import { messageHashStr } from "@waku/message-hash";
import { WakuMessage } from "@waku/proto";
import {
ensurePubsubTopicIsConfigured,
@ -50,10 +50,14 @@ export const FilterCodecs = {
PUSH: "/vac/waku/filter-push/2.0.0-beta1"
};
/**
* A subscription object refers to a subscription to a given pubsub topic.
*/
class Subscription {
private readonly peer: Peer;
readonly peers: Peer[];
private readonly pubsubTopic: PubsubTopic;
private newStream: (peer: Peer) => Promise<Stream>;
readonly receivedMessagesHashStr: string[] = [];
private subscriptionCallbacks: Map<
ContentTopic,
@ -62,10 +66,10 @@ class Subscription {
constructor(
pubsubTopic: PubsubTopic,
remotePeer: Peer,
remotePeers: Peer[],
newStream: (peer: Peer) => Promise<Stream>
) {
this.peer = remotePeer;
this.peers = remotePeers;
this.pubsubTopic = pubsubTopic;
this.newStream = newStream;
this.subscriptionCallbacks = new Map();
@ -89,53 +93,59 @@ class Subscription {
const decodersGroupedByCT = groupByContentTopic(decodersArray);
const contentTopics = Array.from(decodersGroupedByCT.keys());
const stream = await this.newStream(this.peer);
const promises = this.peers.map(async (peer) => {
const stream = await this.newStream(peer);
const request = FilterSubscribeRpc.createSubscribeRequest(
this.pubsubTopic,
contentTopics
);
try {
const res = await pipe(
[request.encode()],
lp.encode,
stream,
lp.decode,
async (source) => await all(source)
);
if (!res || !res.length) {
throw Error(
`No response received for request ${request.requestId}: ${res}`
);
}
const { statusCode, requestId, statusDesc } =
FilterSubscribeResponse.decode(res[0].slice());
if (statusCode < 200 || statusCode >= 300) {
throw new Error(
`Filter subscribe request ${requestId} failed with status code ${statusCode}: ${statusDesc}`
);
}
log.info(
"Subscribed to peer ",
this.peer.id.toString(),
"for content topics",
const request = FilterSubscribeRpc.createSubscribeRequest(
this.pubsubTopic,
contentTopics
);
} catch (e) {
throw new Error(
"Error subscribing to peer: " +
this.peer.id.toString() +
" for content topics: " +
contentTopics +
": " +
e
);
}
try {
const res = await pipe(
[request.encode()],
lp.encode,
stream,
lp.decode,
async (source) => await all(source)
);
if (!res || !res.length) {
throw Error(
`No response received for request ${request.requestId}: ${res}`
);
}
const { statusCode, requestId, statusDesc } =
FilterSubscribeResponse.decode(res[0].slice());
if (statusCode < 200 || statusCode >= 300) {
throw new Error(
`Filter subscribe request ${requestId} failed with status code ${statusCode}: ${statusDesc}`
);
}
log.info(
"Subscribed to peer ",
peer.id.toString(),
"for content topics",
contentTopics
);
} catch (e) {
throw new Error(
"Error subscribing to peer: " +
peer.id.toString() +
" for content topics: " +
contentTopics +
": " +
e
);
}
});
const results = await Promise.allSettled(promises);
this.handleErrors(results, "subscribe");
// Save the callback functions by content topics so they
// can easily be removed (reciprocally replaced) if `unsubscribe` (reciprocally `subscribe`)
@ -155,125 +165,195 @@ class Subscription {
}
async unsubscribe(contentTopics: ContentTopic[]): Promise<void> {
const stream = await this.newStream(this.peer);
const unsubscribeRequest = FilterSubscribeRpc.createUnsubscribeRequest(
this.pubsubTopic,
contentTopics
);
const promises = this.peers.map(async (peer) => {
const stream = await this.newStream(peer);
const unsubscribeRequest = FilterSubscribeRpc.createUnsubscribeRequest(
this.pubsubTopic,
contentTopics
);
try {
await pipe([unsubscribeRequest.encode()], lp.encode, stream.sink);
} catch (error) {
throw new Error("Error subscribing: " + error);
}
try {
await pipe([unsubscribeRequest.encode()], lp.encode, stream.sink);
} catch (error) {
throw new Error("Error unsubscribing: " + error);
}
contentTopics.forEach((contentTopic: string) => {
this.subscriptionCallbacks.delete(contentTopic);
contentTopics.forEach((contentTopic: string) => {
this.subscriptionCallbacks.delete(contentTopic);
});
});
const results = await Promise.allSettled(promises);
this.handleErrors(results, "unsubscribe");
}
async ping(): Promise<void> {
const stream = await this.newStream(this.peer);
const promises = this.peers.map(async (peer) => {
const stream = await this.newStream(peer);
const request = FilterSubscribeRpc.createSubscriberPingRequest();
const request = FilterSubscribeRpc.createSubscriberPingRequest();
try {
const res = await pipe(
[request.encode()],
lp.encode,
stream,
lp.decode,
async (source) => await all(source)
);
if (!res || !res.length) {
throw Error(
`No response received for request ${request.requestId}: ${res}`
try {
const res = await pipe(
[request.encode()],
lp.encode,
stream,
lp.decode,
async (source) => await all(source)
);
if (!res || !res.length) {
throw Error(
`No response received for request ${request.requestId}: ${res}`
);
}
const { statusCode, requestId, statusDesc } =
FilterSubscribeResponse.decode(res[0].slice());
if (statusCode < 200 || statusCode >= 300) {
throw new Error(
`Filter ping request ${requestId} failed with status code ${statusCode}: ${statusDesc}`
);
}
log.info(`Ping successful for peer ${peer.id.toString()}`);
} catch (error) {
log.error("Error pinging: ", error);
throw error; // Rethrow the actual error instead of wrapping it
}
});
const { statusCode, requestId, statusDesc } =
FilterSubscribeResponse.decode(res[0].slice());
const results = await Promise.allSettled(promises);
if (statusCode < 200 || statusCode >= 300) {
throw new Error(
`Filter ping request ${requestId} failed with status code ${statusCode}: ${statusDesc}`
);
}
log.info("Ping successful");
} catch (error) {
log.error("Error pinging: ", error);
throw new Error("Error pinging: " + error);
}
this.handleErrors(results, "ping");
}
async unsubscribeAll(): Promise<void> {
const stream = await this.newStream(this.peer);
const promises = this.peers.map(async (peer) => {
const stream = await this.newStream(peer);
const request = FilterSubscribeRpc.createUnsubscribeAllRequest(
this.pubsubTopic
);
try {
const res = await pipe(
[request.encode()],
lp.encode,
stream,
lp.decode,
async (source) => await all(source)
const request = FilterSubscribeRpc.createUnsubscribeAllRequest(
this.pubsubTopic
);
if (!res || !res.length) {
throw Error(
`No response received for request ${request.requestId}: ${res}`
try {
const res = await pipe(
[request.encode()],
lp.encode,
stream,
lp.decode,
async (source) => await all(source)
);
}
const { statusCode, requestId, statusDesc } =
FilterSubscribeResponse.decode(res[0].slice());
if (!res || !res.length) {
throw Error(
`No response received for request ${request.requestId}: ${res}`
);
}
if (statusCode < 200 || statusCode >= 300) {
const { statusCode, requestId, statusDesc } =
FilterSubscribeResponse.decode(res[0].slice());
if (statusCode < 200 || statusCode >= 300) {
throw new Error(
`Filter unsubscribe all request ${requestId} failed with status code ${statusCode}: ${statusDesc}`
);
}
this.subscriptionCallbacks.clear();
log.info(
`Unsubscribed from all content topics for pubsub topic ${this.pubsubTopic}`
);
} catch (error) {
throw new Error(
`Filter unsubscribe all request ${requestId} failed with status code ${statusCode}: ${statusDesc}`
"Error unsubscribing from all content topics: " + error
);
}
});
this.subscriptionCallbacks.clear();
log.info("Unsubscribed from all content topics");
} catch (error) {
throw new Error("Error unsubscribing from all content topics: " + error);
}
const results = await Promise.allSettled(promises);
this.handleErrors(results, "unsubscribeAll");
}
async processMessage(message: WakuMessage): Promise<void> {
const contentTopic = message.contentTopic;
const hashedMessageStr = messageHashStr(
this.pubsubTopic,
message as IProtoMessage
);
if (this.receivedMessagesHashStr.includes(hashedMessageStr)) {
log.info("Message already received, skipping");
return;
}
this.receivedMessagesHashStr.push(hashedMessageStr);
const { contentTopic } = message;
const subscriptionCallback = this.subscriptionCallbacks.get(contentTopic);
if (!subscriptionCallback) {
log.error("No subscription callback available for ", contentTopic);
return;
}
log.info(
"Processing message with content topic ",
contentTopic,
" on pubsub topic ",
this.pubsubTopic
);
await pushMessage(subscriptionCallback, this.pubsubTopic, message);
}
// Filter out only the rejected promises and extract & handle their reasons
private handleErrors(
results: PromiseSettledResult<void>[],
type: "ping" | "subscribe" | "unsubscribe" | "unsubscribeAll"
): void {
const errors = results
.filter(
(result): result is PromiseRejectedResult =>
result.status === "rejected"
)
.map((rejectedResult) => rejectedResult.reason);
if (errors.length === this.peers.length) {
const errorCounts = new Map<string, number>();
// TODO: streamline error logging with https://github.com/orgs/waku-org/projects/2/views/1?pane=issue&itemId=42849952
errors.forEach((error) => {
const message = error instanceof Error ? error.message : String(error);
errorCounts.set(message, (errorCounts.get(message) || 0) + 1);
});
const uniqueErrorMessages = Array.from(
errorCounts,
([message, count]) => `${message} (occurred ${count} times)`
).join(", ");
throw new Error(`Error ${type} all peers: ${uniqueErrorMessages}`);
} else if (errors.length > 0) {
// TODO: handle renewing faulty peers with new peers (https://github.com/waku-org/js-waku/issues/1463)
log.warn(
`Some ${type} failed. These will be refreshed with new peers`,
errors
);
} else {
log.info(`${type} successful for all peers`);
}
}
}
class Filter extends BaseProtocol implements IReceiver {
private activeSubscriptions = new Map<string, Subscription>();
private readonly NUM_PEERS_PROTOCOL = 1;
private getActiveSubscription(
pubsubTopic: PubsubTopic,
peerIdStr: PeerIdStr
pubsubTopic: PubsubTopic
): Subscription | undefined {
return this.activeSubscriptions.get(`${pubsubTopic}_${peerIdStr}`);
return this.activeSubscriptions.get(pubsubTopic);
}
private setActiveSubscription(
pubsubTopic: PubsubTopic,
peerIdStr: PeerIdStr,
subscription: Subscription
): Subscription {
this.activeSubscriptions.set(`${pubsubTopic}_${peerIdStr}`, subscription);
this.activeSubscriptions.set(pubsubTopic, subscription);
return subscription;
}
@ -287,6 +367,12 @@ class Filter extends BaseProtocol implements IReceiver {
this.activeSubscriptions = new Map();
}
/**
* Creates a new subscription to the given pubsub topic.
* The subscription is made to multiple peers for decentralization.
* @param pubsubTopicShardInfo The pubsub topic to subscribe to.
* @returns The subscription object.
*/
async createSubscription(
pubsubTopicShardInfo: SingleShardInfo | PubsubTopic = DefaultPubsubTopic
): Promise<Subscription> {
@ -297,23 +383,24 @@ class Filter extends BaseProtocol implements IReceiver {
ensurePubsubTopicIsConfigured(pubsubTopic, this.pubsubTopics);
const peer = (
await this.getPeers({
maxBootstrapPeers: 1,
numPeers: this.NUM_PEERS_PROTOCOL
})
)[0];
if (!peer) {
const peers = await this.getPeers({
maxBootstrapPeers: 1,
numPeers: this.numPeersToUse
});
if (peers.length === 0) {
throw new Error("No peer found to initiate subscription.");
}
log.info(
`Creating filter subscription with ${peers.length} peers: `,
peers.map((peer) => peer.id.toString())
);
const subscription =
this.getActiveSubscription(pubsubTopic, peer.id.toString()) ??
this.getActiveSubscription(pubsubTopic) ??
this.setActiveSubscription(
pubsubTopic,
peer.id.toString(),
new Subscription(pubsubTopic, peer, this.getStream.bind(this, peer))
new Subscription(pubsubTopic, peers, this.getStream.bind(this))
);
return subscription;
@ -360,8 +447,11 @@ class Filter extends BaseProtocol implements IReceiver {
}
private onRequest(streamData: IncomingStreamData): void {
const { connection, stream } = streamData;
const { remotePeer } = connection;
log.info(`Received message from ${remotePeer.toString()}`);
try {
pipe(streamData.stream, lp.decode, async (source) => {
pipe(stream, lp.decode, async (source) => {
for await (const bytes of source) {
const response = FilterPushRpc.decode(bytes.slice());
@ -377,11 +467,7 @@ class Filter extends BaseProtocol implements IReceiver {
return;
}
const peerIdStr = streamData.connection.remotePeer.toString();
const subscription = this.getActiveSubscription(
pubsubTopic,
peerIdStr
);
const subscription = this.getActiveSubscription(pubsubTopic);
if (!subscription) {
log.error(

View File

@ -42,8 +42,6 @@ type PreparePushMessageResult =
* Implements the [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/).
*/
class LightPush extends BaseProtocol implements ILightPush {
private readonly NUM_PEERS_PROTOCOL = 1;
constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(LightPushCodec, libp2p.components, log, options);
}
@ -106,7 +104,7 @@ class LightPush extends BaseProtocol implements ILightPush {
const peers = await this.getPeers({
maxBootstrapPeers: 1,
numPeers: this.NUM_PEERS_PROTOCOL
numPeers: this.numPeersToUse
});
if (!peers.length) {
@ -172,6 +170,8 @@ class LightPush extends BaseProtocol implements ILightPush {
});
const results = await Promise.allSettled(promises);
// TODO: handle renewing faulty peers with new peers (https://github.com/waku-org/js-waku/issues/1463)
const errors = results
.filter(
(

View File

@ -69,6 +69,14 @@ export type ProtocolCreateOptions = {
* Notes that some values are overridden by {@link @waku/core!WakuNode} to ensure it implements the Waku protocol.
*/
libp2p?: Partial<CreateLibp2pOptions>;
/**
* Number of peers to connect to, for the usage of the protocol.
* This is used by:
* - Light Push to send messages,
* - Filter to retrieve messages.
* Defaults to 3.
*/
numPeersToUse?: number;
/**
* Byte array used as key for the noise protocol used for connection encryption
* by [`Libp2p.create`](https://github.com/libp2p/js-libp2p/blob/master/doc/API.md#create)

View File

@ -1,6 +1,6 @@
import { sha256 } from "@noble/hashes/sha256";
import type { IProtoMessage } from "@waku/interfaces";
import { concat, utf8ToBytes } from "@waku/utils/bytes";
import { bytesToUtf8, concat, utf8ToBytes } from "@waku/utils/bytes";
/**
* Deterministic Message Hashing as defined in
@ -27,3 +27,12 @@ export function messageHash(
}
return sha256(bytes);
}
export function messageHashStr(
pubsubTopic: string,
message: IProtoMessage
): string {
const hash = messageHash(pubsubTopic, message);
const hashStr = bytesToUtf8(hash);
return hashStr;
}

View File

@ -1,2 +1,294 @@
export * from "./message_collector.js";
export * from "./service_node.js";
import { DecodedMessage } from "@waku/core";
import {
DefaultPubsubTopic,
PubsubTopic,
ShardingParams
} from "@waku/interfaces";
import { Logger } from "@waku/utils";
import { expect } from "chai";
import { Args, MessageRpcQuery, MessageRpcResponse } from "../types";
import { delay, makeLogFileName } from "../utils/index.js";
import { MessageCollector } from "./message_collector.js";
import { defaultArgs, ServiceNode } from "./service_node.js";
export { ServiceNode, MessageCollector, defaultArgs };
const log = new Logger("test:message-collector");
/**
* This class is a wrapper over the ServiceNode & MessageCollector class
* that allows for the creation & handling of multiple ServiceNodes
*/
export class ServiceNodesFleet {
static async createAndRun(
mochaContext: Mocha.Context,
pubsubTopics: PubsubTopic[],
nodesToCreate: number = 3,
strictChecking: boolean = false,
shardInfo?: ShardingParams,
_args?: Args,
withoutFilter = false
): Promise<ServiceNodesFleet> {
const serviceNodePromises = Array.from(
{ length: nodesToCreate },
async () => {
const node = new ServiceNode(
makeLogFileName(mochaContext) +
Math.random().toString(36).substring(7)
);
const args = getArgs(pubsubTopics, shardInfo, _args);
await node.start(args, {
retries: 3
});
return node;
}
);
const nodes = await Promise.all(serviceNodePromises);
return new ServiceNodesFleet(nodes, withoutFilter, strictChecking);
}
/**
* Convert a [[WakuMessage]] to a [[WakuRelayMessage]]. The latter is used
* by the nwaku JSON-RPC API.
*/
static toMessageRpcQuery(message: {
payload: Uint8Array;
contentTopic: string;
timestamp?: Date;
}): MessageRpcQuery {
return ServiceNode.toMessageRpcQuery(message);
}
public messageCollector: MultipleNodesMessageCollector;
private constructor(
public nodes: ServiceNode[],
relay: boolean,
private strictChecking: boolean
) {
const _messageCollectors: MessageCollector[] = [];
this.nodes.forEach((node) => {
_messageCollectors.push(new MessageCollector(node));
});
this.messageCollector = new MultipleNodesMessageCollector(
_messageCollectors,
relay ? this.nodes : undefined,
strictChecking
);
}
get type(): "go-waku" | "nwaku" {
const nodeType = new Set(
this.nodes.map((node) => {
return node.type();
})
);
if (nodeType.size > 1) {
throw new Error("Multiple node types");
}
return nodeType.values().next().value;
}
async start(): Promise<void> {
const startPromises = this.nodes.map((node) => node.start());
await Promise.all(startPromises);
}
async sendRelayMessage(
message: MessageRpcQuery,
pubsubTopic?: string,
raw = false
): Promise<boolean> {
let relayMessagePromises: Promise<boolean>[];
if (raw) {
relayMessagePromises = this.nodes.map((node) =>
node.rpcCall<boolean>("post_waku_v2_relay_v1_message", [
pubsubTopic && pubsubTopic,
message
])
);
} else {
relayMessagePromises = this.nodes.map((node) =>
node.sendMessage(message, pubsubTopic)
);
}
const relayMessages = await Promise.all(relayMessagePromises);
return relayMessages.every((message) => message);
}
async confirmMessageLength(numMessages: number): Promise<void> {
if (this.strictChecking) {
await Promise.all(
this.nodes.map(async (node) =>
expect(await node.messages()).to.have.length(numMessages)
)
);
} else {
// Wait for all promises to resolve and check if any meets the condition
const results = await Promise.all(
this.nodes.map(async (node) => {
const msgs = await node.messages();
return msgs.length === numMessages;
})
);
// Check if at least one result meets the condition
const conditionMet = results.some((result) => result);
expect(conditionMet).to.be.true;
}
}
}
class MultipleNodesMessageCollector {
callback: (msg: DecodedMessage) => void = () => {};
messageList: Array<DecodedMessage> = [];
constructor(
private messageCollectors: MessageCollector[],
private relayNodes?: ServiceNode[],
private strictChecking: boolean = false
) {
this.callback = (msg: DecodedMessage): void => {
log.info("Got a message");
this.messageList.push(msg);
};
}
get count(): number {
return this.messageList.length;
}
public hasMessage(topic: string, text: string): boolean {
if (this.strictChecking) {
return this.messageCollectors.every((collector) =>
collector.hasMessage(topic, text)
);
} else {
return this.messageCollectors.some((collector) =>
collector.hasMessage(topic, text)
);
}
}
getMessage(index: number): MessageRpcResponse | DecodedMessage {
return this.messageList[index];
}
/**
* Verifies a received message against expected values on all nodes.
* Returns true if any node's collector verifies the message successfully.
*/
verifyReceivedMessage(
index: number,
options: {
expectedMessageText: string | Uint8Array | undefined;
expectedContentTopic?: string;
expectedPubsubTopic?: string;
expectedVersion?: number;
expectedMeta?: Uint8Array;
expectedEphemeral?: boolean;
expectedTimestamp?: bigint | number;
checkTimestamp?: boolean;
}
): boolean {
if (this.strictChecking) {
return this.messageCollectors.every((collector) => {
try {
collector.verifyReceivedMessage(index, options);
return true; // Verification successful
} catch (error) {
return false; // Verification failed, continue with the next collector
}
});
} else {
return this.messageCollectors.some((collector) => {
try {
collector.verifyReceivedMessage(index, options);
return true; // Verification successful
} catch (error) {
return false; // Verification failed, continue with the next collector
}
});
}
}
/**
* Waits for a total number of messages across all nodes.
*/
async waitForMessages(
numMessages: number,
options?: {
pubsubTopic?: string;
timeoutDuration?: number;
exact?: boolean;
}
): Promise<boolean> {
const startTime = Date.now();
const pubsubTopic = options?.pubsubTopic || DefaultPubsubTopic;
const timeoutDuration = options?.timeoutDuration || 400;
const exact = options?.exact || false;
while (this.messageList.length < numMessages) {
if (this.relayNodes) {
if (this.strictChecking) {
const results = await Promise.all(
this.relayNodes.map(async (node) => {
const msgs = await node.messages(pubsubTopic);
return msgs.length >= numMessages;
})
);
return results.every((result) => result);
} else {
const results = await Promise.all(
this.relayNodes.map(async (node) => {
const msgs = await node.messages(pubsubTopic);
return msgs.length >= numMessages;
})
);
return results.some((result) => result);
}
}
if (Date.now() - startTime > timeoutDuration * numMessages) {
return false;
}
await delay(10);
}
if (exact) {
if (this.messageList.length == numMessages) {
return true;
} else {
log.warn(
`Was expecting exactly ${numMessages} messages. Received: ${this.messageList.length}`
);
return false;
}
} else {
return true;
}
}
}
function getArgs(
pubsubTopics: PubsubTopic[],
shardInfo?: ShardingParams,
args?: Args
): Args {
const defaultArgs = {
lightpush: true,
filter: true,
discv5Discovery: true,
peerExchange: true,
relay: true,
pubsubTopic: pubsubTopics,
...(shardInfo && { clusterId: shardInfo.clusterId })
} as Args;
return { ...defaultArgs, ...args };
}

View File

@ -5,8 +5,11 @@ import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes";
import { AssertionError, expect } from "chai";
import { equals } from "uint8arrays/equals";
import { base64ToUtf8, delay, ServiceNode } from "../index.js";
import { MessageRpcResponse } from "../types.js";
import { base64ToUtf8 } from "../utils/base64_utf8.js";
import { delay } from "../utils/delay.js";
import { ServiceNode } from "./service_node.js";
const log = new Logger("test:message-collector");

View File

@ -4,3 +4,4 @@ export * from "./random_array.js";
export * from "./wait_for_remote_peer_with_codec.js";
export * from "./delay.js";
export * from "./base64_utf8.js";
export * from "./waitForConnections.js";

View File

@ -0,0 +1,20 @@
import type { LightNode } from "@waku/interfaces";
export async function waitForConnections(
numPeers: number,
waku: LightNode
): Promise<void> {
let connectionsLen = waku.libp2p.getConnections().length;
if (connectionsLen >= numPeers) {
return;
}
await new Promise<void>((resolve) => {
const cb = (): void => {
connectionsLen++;
if (connectionsLen >= numPeers) {
waku.libp2p.removeEventListener("peer:identify", cb);
resolve();
}
};
waku.libp2p.addEventListener("peer:identify", cb);
});
}

View File

@ -0,0 +1,20 @@
import type { LightNode } from "@waku/interfaces";
export async function waitForConnections(
numPeers: number,
waku: LightNode
): Promise<void> {
let connectionsLen = waku.libp2p.getConnections().length;
if (connectionsLen >= numPeers) {
return;
}
await new Promise<void>((resolve) => {
const cb = (): void => {
connectionsLen++;
if (connectionsLen >= numPeers) {
waku.libp2p.removeEventListener("peer:identify", cb);
resolve();
}
};
waku.libp2p.addEventListener("peer:identify", cb);
});
}

View File

@ -0,0 +1,117 @@
import {
DefaultPubsubTopic,
IFilterSubscription,
LightNode
} from "@waku/interfaces";
import { utf8ToBytes } from "@waku/sdk";
import { expect } from "chai";
import { ServiceNodesFleet } from "../../src/index.js";
import {
runMultipleNodes,
teardownNodesWithRedundancy,
TestContentTopic,
TestDecoder,
TestEncoder,
validatePingError
} from "./utils";
const runTests = (strictCheckNodes: boolean): void => {
describe(`Waku Filter V2: Ping: Multiple Nodes: Strict Checking: ${strictCheckNodes}`, function () {
// Set the timeout for all tests in this suite. Can be overwritten at test level
this.timeout(10000);
let waku: LightNode;
let serviceNodes: ServiceNodesFleet;
let subscription: IFilterSubscription;
this.beforeEach(async function () {
this.timeout(15000);
[serviceNodes, waku] = await runMultipleNodes(this, [DefaultPubsubTopic]);
subscription = await waku.filter.createSubscription();
});
this.afterEach(async function () {
this.timeout(15000);
await teardownNodesWithRedundancy(serviceNodes, waku);
});
it("Ping on subscribed peer", async function () {
await subscription.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
// If ping is successfull(node has active subscription) we receive a success status code.
await subscription.ping();
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") });
// Confirm new messages are received after a ping.
expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq(
true
);
});
it("Ping on peer without subscriptions", async function () {
await validatePingError(subscription);
});
it("Ping on unsubscribed peer", async function () {
await subscription.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
await subscription.ping();
await subscription.unsubscribe([TestContentTopic]);
// Ping imediately after unsubscribe
await validatePingError(subscription);
});
it("Reopen subscription with peer with lost subscription", async function () {
const openSubscription = async (): Promise<void> => {
await subscription.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
};
const unsubscribe = async (): Promise<void> => {
await subscription.unsubscribe([TestContentTopic]);
};
const pingAndReinitiateSubscription = async (): Promise<void> => {
try {
await subscription.ping();
} catch (error) {
if (
error instanceof Error &&
error.message.includes("peer has no subscriptions")
) {
await openSubscription();
} else {
throw error;
}
}
};
// open subscription & ping -> should pass
await openSubscription();
await pingAndReinitiateSubscription();
// unsubscribe & ping -> should fail and reinitiate subscription
await unsubscribe();
await pingAndReinitiateSubscription();
// ping -> should pass as subscription is reinitiated
await pingAndReinitiateSubscription();
});
});
};
[true, false].map(runTests);

View File

@ -0,0 +1,321 @@
import { waitForRemotePeer } from "@waku/core";
import {
DefaultPubsubTopic,
IFilterSubscription,
LightNode,
Protocols
} from "@waku/interfaces";
import { utf8ToBytes } from "@waku/sdk";
import { expect } from "chai";
import {
delay,
ServiceNodesFleet,
TEST_STRING,
TEST_TIMESTAMPS
} from "../../src/index.js";
import {
messageText,
runMultipleNodes,
teardownNodesWithRedundancy,
TestContentTopic,
TestDecoder,
TestEncoder
} from "./utils.js";
const runTests = (strictCheckNodes: boolean): void => {
describe(`Waku Filter V2: FilterPush: Multiple Nodes: Strict Checking: ${strictCheckNodes}`, function () {
// Set the timeout for all tests in this suite. Can be overwritten at test level
this.timeout(10000);
let waku: LightNode;
let serviceNodes: ServiceNodesFleet;
let subscription: IFilterSubscription;
this.beforeEach(async function () {
this.timeout(15000);
[serviceNodes, waku] = await runMultipleNodes(this, [DefaultPubsubTopic]);
subscription = await waku.filter.createSubscription();
});
this.afterEach(async function () {
this.timeout(15000);
await teardownNodesWithRedundancy(serviceNodes, waku);
});
TEST_STRING.forEach((testItem) => {
it(`Check received message containing ${testItem.description}`, async function () {
await subscription.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes(testItem.value)
});
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: testItem.value,
expectedContentTopic: TestContentTopic
});
});
});
TEST_TIMESTAMPS.forEach((testItem) => {
it(`Check received message with timestamp: ${testItem} `, async function () {
await subscription.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
await delay(400);
await serviceNodes.sendRelayMessage(
{
contentTopic: TestContentTopic,
payload: Buffer.from(utf8ToBytes(messageText)).toString("base64"),
timestamp: testItem as any
},
DefaultPubsubTopic,
true
);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
checkTimestamp: false,
expectedContentTopic: TestContentTopic
});
// Check if the timestamp matches
const timestamp = serviceNodes.messageCollector.getMessage(0).timestamp;
if (testItem == undefined) {
expect(timestamp).to.eq(undefined);
}
if (timestamp !== undefined && timestamp instanceof Date) {
expect(testItem?.toString()).to.contain(
timestamp.getTime().toString()
);
}
});
});
it("Check message with invalid timestamp is not received", async function () {
await subscription.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
await delay(400);
await serviceNodes.sendRelayMessage(
{
contentTopic: TestContentTopic,
payload: Buffer.from(utf8ToBytes(messageText)).toString("base64"),
timestamp: "2023-09-06T12:05:38.609Z" as any
},
DefaultPubsubTopic,
true
);
// Verify that no message was received
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
false
);
});
it("Check message on other pubsub topic is not received", async function () {
await subscription.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
await delay(400);
await serviceNodes.sendRelayMessage(
{
contentTopic: TestContentTopic,
payload: Buffer.from(utf8ToBytes(messageText)).toString("base64"),
timestamp: BigInt(Date.now()) * BigInt(1000000)
},
"DefaultPubsubTopic"
);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
false
);
});
it("Check message with no pubsub topic is not received", async function () {
await subscription.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
await delay(400);
await serviceNodes.sendRelayMessage(
{
contentTopic: TestContentTopic,
payload: Buffer.from(utf8ToBytes(messageText)).toString("base64"),
timestamp: BigInt(Date.now()) * BigInt(1000000)
},
undefined,
true
);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
false
);
});
it("Check message with no content topic is not received", async function () {
await subscription.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
await delay(400);
await serviceNodes.sendRelayMessage(
{
payload: Buffer.from(utf8ToBytes(messageText)).toString("base64"),
timestamp: BigInt(Date.now()) * BigInt(1000000)
},
DefaultPubsubTopic
);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
false
);
});
it("Check message with no payload is not received", async function () {
await subscription.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
await delay(400);
await serviceNodes.sendRelayMessage(
{
contentTopic: TestContentTopic,
timestamp: BigInt(Date.now()) * BigInt(1000000),
payload: undefined as any
},
DefaultPubsubTopic,
true
);
// For go-waku the message is received (it is possible to send a message with no payload)
if (serviceNodes.type == "go-waku") {
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
} else {
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
false
);
}
});
it("Check message with non string payload is not received", async function () {
await subscription.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
await delay(400);
await serviceNodes.sendRelayMessage(
{
contentTopic: TestContentTopic,
payload: 12345 as unknown as string,
timestamp: BigInt(Date.now()) * BigInt(1000000)
},
DefaultPubsubTopic
);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
false
);
});
// Will be skipped until https://github.com/waku-org/js-waku/issues/1464 si done
it.skip("Check message received after jswaku node is restarted", async function () {
// Subscribe and send message
await subscription.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
// Restart js-waku node
await waku.stop();
expect(waku.isStarted()).to.eq(false);
await waku.start();
expect(waku.isStarted()).to.eq(true);
// Redo the connection and create a new subscription
for (const node of this.serviceNodes) {
await waku.dial(await node.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]);
}
subscription = await waku.filter.createSubscription();
await subscription.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") });
// Confirm both messages were received.
expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: "M1",
expectedContentTopic: TestContentTopic
});
serviceNodes.messageCollector.verifyReceivedMessage(1, {
expectedMessageText: "M2",
expectedContentTopic: TestContentTopic
});
});
// Will be skipped until https://github.com/waku-org/js-waku/issues/1464 si done
it.skip("Check message received after nwaku node is restarted", async function () {
await subscription.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
// Restart nwaku node
await teardownNodesWithRedundancy(serviceNodes, []);
await serviceNodes.start();
await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") });
// Confirm both messages were received.
expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: "M1",
expectedContentTopic: TestContentTopic
});
serviceNodes.messageCollector.verifyReceivedMessage(1, {
expectedMessageText: "M2",
expectedContentTopic: TestContentTopic
});
});
});
};
[true, false].map(runTests);

View File

@ -21,7 +21,8 @@ import {
ServiceNode,
tearDownNodes
} from "../../../src/index.js";
import { runNodes } from "../utils.js";
import { runNodes } from "./utils.js";
describe("Waku Filter V2: Multiple PubsubTopics", function () {
// Set the timeout for all tests in this suite. Can be overwritten at test level

View File

@ -1,6 +1,9 @@
import { DefaultPubsubTopic } from "@waku/interfaces";
import type { IFilterSubscription, LightNode } from "@waku/interfaces";
import { utf8ToBytes } from "@waku/utils/bytes";
import {
DefaultPubsubTopic,
IFilterSubscription,
LightNode
} from "@waku/interfaces";
import { utf8ToBytes } from "@waku/sdk";
import { expect } from "chai";
import {
@ -9,13 +12,14 @@ import {
tearDownNodes
} from "../../../src/index.js";
import {
runNodes,
TestContentTopic,
TestDecoder,
TestEncoder,
validatePingError
} from "../utils.js";
import { runNodes } from "./utils.js";
describe("Waku Filter V2: Ping", function () {
// Set the timeout for all tests in this suite. Can be overwritten at test level
this.timeout(10000);

View File

@ -1,8 +1,11 @@
import { waitForRemotePeer } from "@waku/core";
import type { IFilterSubscription, LightNode } from "@waku/interfaces";
import { DefaultPubsubTopic } from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import { utf8ToBytes } from "@waku/utils/bytes";
import {
DefaultPubsubTopic,
IFilterSubscription,
LightNode,
Protocols
} from "@waku/interfaces";
import { utf8ToBytes } from "@waku/sdk";
import { expect } from "chai";
import {
@ -13,9 +16,9 @@ import {
TEST_STRING,
TEST_TIMESTAMPS
} from "../../../src/index.js";
import { runNodes } from "../../light-push/utils";
import {
messageText,
runNodes,
TestContentTopic,
TestDecoder,
TestEncoder

View File

@ -1,7 +1,10 @@
import { createDecoder, createEncoder, waitForRemotePeer } from "@waku/core";
import type { IFilterSubscription, LightNode } from "@waku/interfaces";
import { DefaultPubsubTopic } from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import {
DefaultPubsubTopic,
IFilterSubscription,
LightNode,
Protocols
} from "@waku/interfaces";
import {
ecies,
generatePrivateKey,
@ -9,7 +12,7 @@ import {
getPublicKey,
symmetric
} from "@waku/message-encryption";
import { utf8ToBytes } from "@waku/utils/bytes";
import { utf8ToBytes } from "@waku/sdk";
import { expect } from "chai";
import {
@ -25,13 +28,14 @@ import {
import {
messagePayload,
messageText,
runNodes,
TestContentTopic,
TestDecoder,
TestEncoder
} from "../utils.js";
describe("Waku Filter V2: Subscribe", function () {
import { runNodes } from "./utils.js";
describe("Waku Filter V2: Subscribe: Single Service Node", function () {
// Set the timeout for all tests in this suite. Can be overwritten at test level
this.timeout(10000);
let waku: LightNode;
@ -371,7 +375,7 @@ describe("Waku Filter V2: Subscribe", function () {
// Send messages to the first set of topics.
for (let i = 0; i < topicCount1; i++) {
const messageText = `Message for Topic ${i + 1}`;
const messageText = `Topic Set 1: Message Number: ${i + 1}`;
await waku.lightPush.send(td1.encoders[i], {
payload: utf8ToBytes(messageText)
});
@ -379,7 +383,8 @@ describe("Waku Filter V2: Subscribe", function () {
// Send messages to the second set of topics.
for (let i = 0; i < topicCount2; i++) {
const messageText = `Message for Topic ${i + 1}`;
const messageText = `Topic Set 2: Message Number: ${i + 1}`;
await waku.lightPush.send(td2.encoders[i], {
payload: utf8ToBytes(messageText)
});

View File

@ -1,7 +1,7 @@
import { createDecoder, createEncoder } from "@waku/core";
import type { IFilterSubscription, LightNode } from "@waku/interfaces";
import { DefaultPubsubTopic } from "@waku/interfaces";
import { utf8ToBytes } from "@waku/utils/bytes";
import { DefaultPubsubTopic, IFilterSubscription } from "@waku/interfaces";
import { LightNode } from "@waku/interfaces";
import { utf8ToBytes } from "@waku/sdk";
import { expect } from "chai";
import {
@ -10,10 +10,10 @@ import {
ServiceNode,
tearDownNodes
} from "../../../src/index.js";
import { runNodes } from "../../light-push/utils";
import {
messagePayload,
messageText,
runNodes,
TestContentTopic,
TestDecoder,
TestEncoder

View File

@ -0,0 +1,67 @@
import { waitForRemotePeer } from "@waku/core";
import {
DefaultPubsubTopic,
LightNode,
ProtocolCreateOptions,
Protocols,
ShardingParams
} from "@waku/interfaces";
import { createLightNode } from "@waku/sdk";
import { Logger } from "@waku/utils";
import { Context } from "mocha";
import {
makeLogFileName,
NOISE_KEY_1,
ServiceNode
} from "../../../src/index.js";
export const log = new Logger("test:filter:single_node");
export async function runNodes(
context: Context,
//TODO: change this to use `ShardInfo` instead of `string[]`
pubsubTopics: string[],
shardInfo?: ShardingParams
): Promise<[ServiceNode, LightNode]> {
const nwaku = new ServiceNode(makeLogFileName(context));
await nwaku.start(
{
filter: true,
lightpush: true,
relay: true,
pubsubTopic: pubsubTopics,
...(shardInfo && { clusterId: shardInfo.clusterId })
},
{ retries: 3 }
);
const waku_options: ProtocolCreateOptions = {
staticNoiseKey: NOISE_KEY_1,
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } },
pubsubTopics: shardInfo ? undefined : pubsubTopics,
...((pubsubTopics.length !== 1 ||
pubsubTopics[0] !== DefaultPubsubTopic) && {
shardInfo: shardInfo
})
};
log.info("Starting js waku node with :", JSON.stringify(waku_options));
let waku: LightNode | undefined;
try {
waku = await createLightNode(waku_options);
await waku.start();
} catch (error) {
log.error("jswaku node failed to start:", error);
}
if (waku) {
await waku.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]);
await nwaku.ensureSubscriptions(pubsubTopics);
return [nwaku, waku];
} else {
throw new Error("Failed to initialize waku");
}
}

View File

@ -0,0 +1,453 @@
import { createDecoder, createEncoder } from "@waku/core";
import {
DefaultPubsubTopic,
IFilterSubscription,
LightNode
} from "@waku/interfaces";
import {
ecies,
generatePrivateKey,
generateSymmetricKey,
getPublicKey,
symmetric
} from "@waku/message-encryption";
import { utf8ToBytes } from "@waku/sdk";
import { expect } from "chai";
import {
delay,
generateTestData,
ServiceNodesFleet,
TEST_STRING
} from "../../src/index.js";
import {
messagePayload,
messageText,
runMultipleNodes,
teardownNodesWithRedundancy,
TestContentTopic,
TestDecoder,
TestEncoder
} from "./utils.js";
const runTests = (strictCheckNodes: boolean): void => {
describe(`Waku Filter V2: Subscribe: Multiple Service Nodes: Strict Check mode: ${strictCheckNodes}`, function () {
this.timeout(100000);
let waku: LightNode;
let serviceNodes: ServiceNodesFleet;
let subscription: IFilterSubscription;
this.beforeEach(async function () {
this.timeout(15000);
[serviceNodes, waku] = await runMultipleNodes(
this,
[DefaultPubsubTopic],
strictCheckNodes
);
subscription = await waku.filter.createSubscription();
});
this.afterEach(async function () {
this.timeout(15000);
await teardownNodesWithRedundancy(serviceNodes, waku);
});
it("Subscribe and receive messages via lightPush", async function () {
expect(waku.libp2p.getConnections()).has.length(3);
await subscription.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(TestEncoder, messagePayload);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: TestContentTopic
});
await serviceNodes.confirmMessageLength(1);
});
it("Subscribe and receive ecies encrypted messages via lightPush", async function () {
const privateKey = generatePrivateKey();
const publicKey = getPublicKey(privateKey);
const encoder = ecies.createEncoder({
contentTopic: TestContentTopic,
publicKey
});
const decoder = ecies.createDecoder(TestContentTopic, privateKey);
await subscription.subscribe(
[decoder],
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(encoder, messagePayload);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: TestContentTopic,
expectedVersion: 1
});
await serviceNodes.confirmMessageLength(1);
});
it("Subscribe and receive symmetrically encrypted messages via lightPush", async function () {
const symKey = generateSymmetricKey();
const encoder = symmetric.createEncoder({
contentTopic: TestContentTopic,
symKey
});
const decoder = symmetric.createDecoder(TestContentTopic, symKey);
await subscription.subscribe(
[decoder],
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(encoder, messagePayload);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: TestContentTopic,
expectedVersion: 1
});
await serviceNodes.confirmMessageLength(1);
});
it("Subscribe and receive messages via waku relay post", async function () {
await subscription.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
await delay(400);
// Send a test message using the relay post method.
const relayMessage = ServiceNodesFleet.toMessageRpcQuery({
contentTopic: TestContentTopic,
payload: utf8ToBytes(messageText)
});
await serviceNodes.sendRelayMessage(relayMessage);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: TestContentTopic
});
await serviceNodes.confirmMessageLength(1);
});
it("Subscribe and receive 2 messages on the same topic", async function () {
await subscription.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(TestEncoder, messagePayload);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: TestContentTopic
});
// Send another message on the same topic.
const newMessageText = "Filtering still works!";
await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes(newMessageText)
});
// Verify that the second message was successfully received.
expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(1, {
expectedMessageText: newMessageText,
expectedContentTopic: TestContentTopic
});
await serviceNodes.confirmMessageLength(2);
});
it("Subscribe and receive messages on 2 different content topics", async function () {
// Subscribe to the first content topic and send a message.
await subscription.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(TestEncoder, messagePayload);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: TestContentTopic
});
// Modify subscription to include a new content topic and send a message.
const newMessageText = "Filtering still works!";
const newMessagePayload = { payload: utf8ToBytes(newMessageText) };
const newContentTopic = "/test/2/waku-filter";
const newEncoder = createEncoder({ contentTopic: newContentTopic });
const newDecoder = createDecoder(newContentTopic);
await subscription.subscribe(
[newDecoder],
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(newEncoder, {
payload: utf8ToBytes(newMessageText)
});
expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(1, {
expectedContentTopic: newContentTopic,
expectedMessageText: newMessageText
});
// Send another message on the initial content topic to verify it still works.
await waku.lightPush.send(TestEncoder, newMessagePayload);
expect(await serviceNodes.messageCollector.waitForMessages(3)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(2, {
expectedMessageText: newMessageText,
expectedContentTopic: TestContentTopic
});
await serviceNodes.confirmMessageLength(3);
});
it("Subscribe and receives messages on 20 topics", async function () {
const topicCount = 20;
const td = generateTestData(topicCount);
// Subscribe to all 20 topics.
for (let i = 0; i < topicCount; i++) {
await subscription.subscribe(
[td.decoders[i]],
serviceNodes.messageCollector.callback
);
}
// Send a unique message on each topic.
for (let i = 0; i < topicCount; i++) {
await waku.lightPush.send(td.encoders[i], {
payload: utf8ToBytes(`Message for Topic ${i + 1}`)
});
}
// Verify that each message was received on the corresponding topic.
expect(await serviceNodes.messageCollector.waitForMessages(20)).to.eq(
true
);
td.contentTopics.forEach((topic, index) => {
serviceNodes.messageCollector.verifyReceivedMessage(index, {
expectedContentTopic: topic,
expectedMessageText: `Message for Topic ${index + 1}`
});
});
});
it("Subscribe to 30 topics at once and receives messages", async function () {
const topicCount = 30;
const td = generateTestData(topicCount);
// Subscribe to all 30 topics.
await subscription.subscribe(
td.decoders,
serviceNodes.messageCollector.callback
);
// Send a unique message on each topic.
for (let i = 0; i < topicCount; i++) {
await waku.lightPush.send(td.encoders[i], {
payload: utf8ToBytes(`Message for Topic ${i + 1}`)
});
}
// Verify that each message was received on the corresponding topic.
expect(await serviceNodes.messageCollector.waitForMessages(30)).to.eq(
true
);
td.contentTopics.forEach((topic, index) => {
serviceNodes.messageCollector.verifyReceivedMessage(index, {
expectedContentTopic: topic,
expectedMessageText: `Message for Topic ${index + 1}`
});
});
});
it("Error when try to subscribe to more than 30 topics", async function () {
const topicCount = 31;
const td = generateTestData(topicCount);
// Attempt to subscribe to 31 topics
try {
await subscription.subscribe(
td.decoders,
serviceNodes.messageCollector.callback
);
throw new Error(
"Subscribe to 31 topics was successful but was expected to fail with a specific error."
);
} catch (err) {
if (
err instanceof Error &&
err.message.includes("exceeds maximum content topics: 30")
) {
return;
} else {
throw err;
}
}
});
it("Overlapping topic subscription", async function () {
// Define two sets of test data with overlapping topics.
const topicCount1 = 2;
const td1 = generateTestData(topicCount1);
const topicCount2 = 4;
const td2 = generateTestData(topicCount2);
// Subscribe to the first set of topics.
await subscription.subscribe(
td1.decoders,
serviceNodes.messageCollector.callback
);
// Subscribe to the second set of topics which has overlapping topics with the first set.
await subscription.subscribe(
td2.decoders,
serviceNodes.messageCollector.callback
);
// Send messages to the first set of topics.
for (let i = 0; i < topicCount1; i++) {
const messageText = `Topic Set 1: Message Number: ${i + 1}`;
await waku.lightPush.send(td1.encoders[i], {
payload: utf8ToBytes(messageText)
});
}
// Send messages to the second set of topics.
for (let i = 0; i < topicCount2; i++) {
const messageText = `Topic Set 2: Message Number: ${i + 1}`;
await waku.lightPush.send(td2.encoders[i], {
payload: utf8ToBytes(messageText)
});
}
// Check if all messages were received.
// Since there are overlapping topics, there should be 6 messages in total (2 from the first set + 4 from the second set).
expect(
await serviceNodes.messageCollector.waitForMessages(6, { exact: true })
).to.eq(true);
});
it("Refresh subscription", async function () {
await subscription.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
// Resubscribe (refresh) to the same topic and send another message.
await subscription.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") });
// Confirm both messages were received.
expect(
await serviceNodes.messageCollector.waitForMessages(2, { exact: true })
).to.eq(true);
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: "M1",
expectedContentTopic: TestContentTopic
});
serviceNodes.messageCollector.verifyReceivedMessage(1, {
expectedMessageText: "M2",
expectedContentTopic: TestContentTopic
});
});
TEST_STRING.forEach((testItem) => {
it(`Subscribe to topic containing ${testItem.description} and receive message`, async function () {
const newContentTopic = testItem.value;
const newEncoder = createEncoder({ contentTopic: newContentTopic });
const newDecoder = createDecoder(newContentTopic);
await subscription.subscribe(
[newDecoder],
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(newEncoder, messagePayload);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: newContentTopic
});
});
});
it("Add multiple subscription objects on single nwaku node", async function () {
await subscription.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
// Create a second subscription on a different topic
const subscription2 = await waku.filter.createSubscription();
const newContentTopic = "/test/2/waku-filter";
const newEncoder = createEncoder({ contentTopic: newContentTopic });
const newDecoder = createDecoder(newContentTopic);
await subscription2.subscribe(
[newDecoder],
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M2") });
// Check if both messages were received
expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: "M1",
expectedContentTopic: TestContentTopic
});
serviceNodes.messageCollector.verifyReceivedMessage(1, {
expectedContentTopic: newContentTopic,
expectedMessageText: "M2"
});
});
});
};
[true, false].map((strictCheckNodes) => runTests(strictCheckNodes));

View File

@ -0,0 +1,216 @@
import { createDecoder, createEncoder } from "@waku/core";
import {
DefaultPubsubTopic,
IFilterSubscription,
LightNode
} from "@waku/interfaces";
import { utf8ToBytes } from "@waku/sdk";
import { expect } from "chai";
import { generateTestData, ServiceNodesFleet } from "../../src/index.js";
import {
messagePayload,
messageText,
runMultipleNodes,
teardownNodesWithRedundancy,
TestContentTopic,
TestDecoder,
TestEncoder
} from "./utils.js";
const runTests = (strictCheckNodes: boolean): void => {
describe(`Waku Filter V2: Unsubscribe: Multiple Nodes: Strict Checking: ${strictCheckNodes}`, function () {
// Set the timeout for all tests in this suite. Can be overwritten at test level
this.timeout(10000);
let waku: LightNode;
let serviceNodes: ServiceNodesFleet;
let subscription: IFilterSubscription;
this.beforeEach(async function () {
this.timeout(15000);
[serviceNodes, waku] = await runMultipleNodes(this, [DefaultPubsubTopic]);
subscription = await waku.filter.createSubscription();
});
this.afterEach(async function () {
this.timeout(15000);
await teardownNodesWithRedundancy(serviceNodes, waku);
});
it("Unsubscribe 1 topic - node subscribed to 1 topic", async function () {
await subscription.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(TestEncoder, messagePayload);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
// Unsubscribe from the topic and send again
await subscription.unsubscribe([TestContentTopic]);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq(
false
);
// Check that from 2 messages send only the 1st was received
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: TestContentTopic
});
expect(serviceNodes.messageCollector.count).to.eq(1);
await serviceNodes.confirmMessageLength(2);
});
it("Unsubscribe 1 topic - node subscribed to 2 topics", async function () {
// Subscribe to 2 topics and send messages
await subscription.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
const newContentTopic = "/test/2/waku-filter";
const newEncoder = createEncoder({ contentTopic: newContentTopic });
const newDecoder = createDecoder(newContentTopic);
await subscription.subscribe(
[newDecoder],
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M2") });
expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq(
true
);
// Unsubscribe from the first topic and send again
await subscription.unsubscribe([TestContentTopic]);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M3") });
await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M4") });
expect(await serviceNodes.messageCollector.waitForMessages(3)).to.eq(
true
);
// Check that from 4 messages send 3 were received
expect(serviceNodes.messageCollector.count).to.eq(3);
await serviceNodes.confirmMessageLength(4);
});
it("Unsubscribe 2 topics - node subscribed to 2 topics", async function () {
// Subscribe to 2 topics and send messages
await subscription.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
const newContentTopic = "/test/2/waku-filter";
const newEncoder = createEncoder({ contentTopic: newContentTopic });
const newDecoder = createDecoder(newContentTopic);
await subscription.subscribe(
[newDecoder],
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M2") });
expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq(
true
);
// Unsubscribe from both and send again
await subscription.unsubscribe([TestContentTopic, newContentTopic]);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M3") });
await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M4") });
expect(await serviceNodes.messageCollector.waitForMessages(3)).to.eq(
false
);
// Check that from 4 messages send 2 were received
expect(serviceNodes.messageCollector.count).to.eq(2);
await serviceNodes.confirmMessageLength(4);
});
it("Unsubscribe topics the node is not subscribed to", async function () {
// Subscribe to 1 topic and send message
await subscription.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
expect(serviceNodes.messageCollector.count).to.eq(1);
// Unsubscribe from topics that the node is not not subscribed to and send again
await subscription.unsubscribe([]);
await subscription.unsubscribe(["/test/2/waku-filter"]);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") });
expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq(
true
);
// Check that both messages were received
expect(serviceNodes.messageCollector.count).to.eq(2);
await serviceNodes.confirmMessageLength(2);
});
it("Unsubscribes all - node subscribed to 1 topic", async function () {
await subscription.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
expect(serviceNodes.messageCollector.count).to.eq(1);
// Unsubscribe from all topics and send again
await subscription.unsubscribeAll();
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") });
expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq(
false
);
// Check that from 2 messages send only the 1st was received
expect(serviceNodes.messageCollector.count).to.eq(1);
await serviceNodes.confirmMessageLength(2);
});
it("Unsubscribes all - node subscribed to 10 topics", async function () {
// Subscribe to 10 topics and send message
const topicCount = 10;
const td = generateTestData(topicCount);
await subscription.subscribe(
td.decoders,
serviceNodes.messageCollector.callback
);
for (let i = 0; i < topicCount; i++) {
await waku.lightPush.send(td.encoders[i], {
payload: utf8ToBytes(`M${i + 1}`)
});
}
expect(await serviceNodes.messageCollector.waitForMessages(10)).to.eq(
true
);
// Unsubscribe from all topics and send again
await subscription.unsubscribeAll();
for (let i = 0; i < topicCount; i++) {
await waku.lightPush.send(td.encoders[i], {
payload: utf8ToBytes(`M${topicCount + i + 1}`)
});
}
expect(await serviceNodes.messageCollector.waitForMessages(11)).to.eq(
false
);
// Check that from 20 messages send only 10 were received
expect(serviceNodes.messageCollector.count).to.eq(10);
await serviceNodes.confirmMessageLength(20);
});
});
};
[true, false].map(runTests);

View File

@ -3,15 +3,22 @@ import {
DefaultPubsubTopic,
IFilterSubscription,
LightNode,
ProtocolCreateOptions,
Protocols,
ShardingParams
ShardingParams,
Waku
} from "@waku/interfaces";
import { createLightNode } from "@waku/sdk";
import { Logger } from "@waku/utils";
import { utf8ToBytes } from "@waku/utils/bytes";
import { Context } from "mocha";
import pRetry from "p-retry";
import { makeLogFileName, NOISE_KEY_1, ServiceNode } from "../../src/index.js";
import {
NOISE_KEY_1,
ServiceNodesFleet,
waitForConnections
} from "../../src/index.js";
// Constants for test configuration.
export const log = new Logger("test:filter");
@ -42,28 +49,31 @@ export async function validatePingError(
}
}
export async function runNodes(
export async function runMultipleNodes(
context: Context,
//TODO: change this to use `ShardInfo` instead of `string[]`
pubsubTopics: string[],
shardInfo?: ShardingParams
): Promise<[ServiceNode, LightNode]> {
const nwaku = new ServiceNode(makeLogFileName(context));
await nwaku.start(
{
filter: true,
lightpush: true,
relay: true,
pubsubTopic: pubsubTopics,
...(shardInfo && { clusterId: shardInfo.clusterId })
},
{ retries: 3 }
strictChecking: boolean = false,
shardInfo?: ShardingParams,
numServiceNodes = 3,
withoutFilter = false
): Promise<[ServiceNodesFleet, LightNode]> {
// create numServiceNodes nodes
const serviceNodes = await ServiceNodesFleet.createAndRun(
context,
pubsubTopics,
numServiceNodes,
strictChecking,
shardInfo,
undefined,
withoutFilter
);
const waku_options = {
const waku_options: ProtocolCreateOptions = {
staticNoiseKey: NOISE_KEY_1,
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } },
libp2p: {
addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] }
},
pubsubTopics: shardInfo ? undefined : pubsubTopics,
...((pubsubTopics.length !== 1 ||
pubsubTopics[0] !== DefaultPubsubTopic) && {
@ -84,18 +94,61 @@ export async function runNodes(
throw new Error("Failed to initialize waku");
}
await waku.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]);
await nwaku.ensureSubscriptions(pubsubTopics);
for (const node of serviceNodes.nodes) {
await waku.dial(await node.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]);
await node.ensureSubscriptions(pubsubTopics);
const wakuConnections = waku.libp2p.getConnections();
const nwakuPeers = await nwaku.peers();
const wakuConnections = waku.libp2p.getConnections();
const nodePeers = await node.peers();
if (wakuConnections.length < 1 || nwakuPeers.length < 1) {
throw new Error(
`Expected at least 1 peer in each node. Got waku connections: ${wakuConnections.length} and nwaku: ${nwakuPeers.length}`
);
if (wakuConnections.length < 1 || nodePeers.length < 1) {
throw new Error(
`Expected at least 1 peer in each node. Got waku connections: ${wakuConnections.length} and service nodes: ${nodePeers.length}`
);
}
}
return [nwaku, waku];
await waitForConnections(numServiceNodes, waku);
return [serviceNodes, waku];
}
export async function teardownNodesWithRedundancy(
serviceNodes: ServiceNodesFleet,
wakuNodes: Waku | Waku[]
): Promise<void> {
const wNodes = Array.isArray(wakuNodes) ? wakuNodes : [wakuNodes];
const stopNwakuNodes = serviceNodes.nodes.map(async (node) => {
await pRetry(
async () => {
try {
await node.stop();
} catch (error) {
log.error("Service Node failed to stop:", error);
throw error;
}
},
{ retries: 3 }
);
});
const stopWakuNodes = wNodes.map(async (waku) => {
if (waku) {
await pRetry(
async () => {
try {
await waku.stop();
} catch (error) {
log.error("Waku failed to stop:", error);
throw error;
}
},
{ retries: 3 }
);
}
});
await Promise.all([...stopNwakuNodes, ...stopWakuNodes]);
}

View File

@ -0,0 +1,268 @@
import { createEncoder } from "@waku/core";
import {
DefaultPubsubTopic,
IRateLimitProof,
LightNode,
SendError
} from "@waku/interfaces";
import { utf8ToBytes } from "@waku/sdk";
import { expect } from "chai";
import {
generateRandomUint8Array,
ServiceNodesFleet,
TEST_STRING
} from "../../src";
import {
runMultipleNodes,
teardownNodesWithRedundancy
} from "../filter/utils.js";
import {
messagePayload,
messageText,
TestContentTopic,
TestEncoder
} from "./utils";
const runTests = (strictNodeCheck: boolean): void => {
const numServiceNodes = 3;
describe(`Waku Light Push: Multiple Nodes: Strict Check: ${strictNodeCheck}`, function () {
// Set the timeout for all tests in this suite. Can be overwritten at test level
this.timeout(15000);
let waku: LightNode;
let serviceNodes: ServiceNodesFleet;
this.beforeEach(async function () {
this.timeout(15000);
[serviceNodes, waku] = await runMultipleNodes(
this,
[DefaultPubsubTopic],
strictNodeCheck,
undefined,
numServiceNodes,
true
);
});
this.afterEach(async function () {
this.timeout(15000);
await teardownNodesWithRedundancy(serviceNodes, waku);
});
TEST_STRING.forEach((testItem) => {
it(`Push message with ${testItem.description} payload`, async function () {
const pushResponse = await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes(testItem.value)
});
expect(pushResponse.recipients.length).to.eq(numServiceNodes);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: testItem.value,
expectedContentTopic: TestContentTopic
});
});
});
it("Push 30 different messages", async function () {
const generateMessageText = (index: number): string => `M${index}`;
for (let i = 0; i < 30; i++) {
const pushResponse = await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes(generateMessageText(i))
});
expect(pushResponse.recipients.length).to.eq(numServiceNodes);
}
expect(await serviceNodes.messageCollector.waitForMessages(30)).to.eq(
true
);
for (let i = 0; i < 30; i++) {
serviceNodes.messageCollector.verifyReceivedMessage(i, {
expectedMessageText: generateMessageText(i),
expectedContentTopic: TestContentTopic
});
}
});
it("Throws when trying to push message with empty payload", async function () {
const pushResponse = await waku.lightPush.send(TestEncoder, {
payload: new Uint8Array()
});
expect(pushResponse.recipients.length).to.eq(0);
console.log("validated 1");
expect(pushResponse.errors).to.include(SendError.EMPTY_PAYLOAD);
console.log("validated 2");
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
false
);
console.log("validated 3");
});
TEST_STRING.forEach((testItem) => {
it(`Push message with content topic containing ${testItem.description}`, async function () {
const customEncoder = createEncoder({
contentTopic: testItem.value
});
const pushResponse = await waku.lightPush.send(
customEncoder,
messagePayload
);
expect(pushResponse.recipients.length).to.eq(numServiceNodes);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: testItem.value
});
});
});
it("Fails to push message with empty content topic", async function () {
try {
createEncoder({ contentTopic: "" });
expect.fail("Expected an error but didn't get one");
} catch (error) {
expect((error as Error).message).to.equal(
"Content topic must be specified"
);
}
});
it("Push message with meta", async function () {
const customTestEncoder = createEncoder({
contentTopic: TestContentTopic,
metaSetter: () => new Uint8Array(10)
});
const pushResponse = await waku.lightPush.send(
customTestEncoder,
messagePayload
);
expect(pushResponse.recipients.length).to.eq(numServiceNodes);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: TestContentTopic
});
});
it("Fails to push message with large meta", async function () {
const customTestEncoder = createEncoder({
contentTopic: TestContentTopic,
metaSetter: () => new Uint8Array(105024) // see the note below ***
});
// *** note: this test used 10 ** 6 when `nwaku` node had MaxWakuMessageSize == 1MiB ( 1*2^20 .)
// `nwaku` establishes the max lightpush msg size as `const MaxRpcSize* = MaxWakuMessageSize + 64 * 1024`
// see: https://github.com/waku-org/nwaku/blob/07beea02095035f4f4c234ec2dec1f365e6955b8/waku/waku_lightpush/rpc_codec.nim#L15
// In the PR https://github.com/waku-org/nwaku/pull/2298 we reduced the MaxWakuMessageSize
// from 1MiB to 150KiB. Therefore, the 105024 number comes from substracting ( 1*2^20 - 150*2^10 )
// to the original 10^6 that this test had when MaxWakuMessageSize == 1*2^20
const pushResponse = await waku.lightPush.send(
customTestEncoder,
messagePayload
);
if (serviceNodes.type == "go-waku") {
expect(pushResponse.recipients.length).to.eq(numServiceNodes);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: TestContentTopic
});
} else {
expect(pushResponse.recipients.length).to.eq(0);
expect(pushResponse.errors).to.include(SendError.REMOTE_PEER_REJECTED);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
false
);
}
});
it("Push message with rate limit", async function () {
const rateLimitProof: IRateLimitProof = {
proof: utf8ToBytes("proofData"),
merkleRoot: utf8ToBytes("merkleRootData"),
epoch: utf8ToBytes("epochData"),
shareX: utf8ToBytes("shareXData"),
shareY: utf8ToBytes("shareYData"),
nullifier: utf8ToBytes("nullifierData"),
rlnIdentifier: utf8ToBytes("rlnIdentifierData")
};
const pushResponse = await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes(messageText),
rateLimitProof: rateLimitProof
});
expect(pushResponse.recipients.length).to.eq(numServiceNodes);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: TestContentTopic
});
});
[
Date.now() - 3600000 * 24 * 356,
Date.now() - 3600000,
Date.now() + 3600000
].forEach((testItem) => {
it(`Push message with custom timestamp: ${testItem}`, async function () {
const pushResponse = await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes(messageText),
timestamp: new Date(testItem)
});
expect(pushResponse.recipients.length).to.eq(numServiceNodes);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedTimestamp: testItem,
expectedContentTopic: TestContentTopic
});
});
});
it("Push message equal or less that 1MB", async function () {
const bigPayload = generateRandomUint8Array(65536);
const pushResponse = await waku.lightPush.send(TestEncoder, {
payload: bigPayload
});
expect(pushResponse.recipients.length).to.greaterThan(0);
});
it("Fails to push message bigger that 1MB", async function () {
const MB = 1024 ** 2;
const pushResponse = await waku.lightPush.send(TestEncoder, {
payload: generateRandomUint8Array(MB + 65536)
});
expect(pushResponse.recipients.length).to.eq(0);
expect(pushResponse.errors).to.include(SendError.SIZE_TOO_BIG);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
false
);
});
});
};
[true].map(runTests);

View File

@ -5,7 +5,7 @@ import {
LightNode,
SendError
} from "@waku/interfaces";
import { utf8ToBytes } from "@waku/utils/bytes";
import { utf8ToBytes } from "@waku/sdk";
import { expect } from "chai";
import {
@ -23,7 +23,7 @@ import {
TestEncoder
} from "../utils.js";
describe("Waku Light Push", function () {
describe("Waku Light Push: Single Node", function () {
// Set the timeout for all tests in this suite. Can be overwritten at test level
this.timeout(15000);
let waku: LightNode;

View File

@ -26,6 +26,7 @@ export async function runNodes(
await nwaku.start(
{
lightpush: true,
filter: true,
relay: true,
pubsubTopic: pubsubTopics,
...(shardInfo && { clusterId: shardInfo.clusterId })