chore!: remove deprecated filter implementation (#2433)

* chore: remove deprecated filter implementation

* update tests

* skip IReceiver test, remove unused utility

* fix typo

* address comments

* unskip Filter e2e test

* address more comments, remove duplication

* skip CI test
This commit is contained in:
Sasha 2025-07-02 12:37:54 +02:00 committed by GitHub
parent cd1d909de3
commit 981248eedd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
32 changed files with 995 additions and 3570 deletions

5
package-lock.json generated
View File

@ -12233,6 +12233,7 @@
"version": "3.9.0",
"resolved": "https://registry.npmjs.org/ci-info/-/ci-info-3.9.0.tgz",
"integrity": "sha512-NIxF55hv4nSqQswkAeiOi1r83xy8JldOFDTWiug55KBu9Jnblncd2U6ViHmYgHf01TPZS77NJBhBMKdWj9HQMQ==",
"dev": true,
"funding": [
{
"type": "github",
@ -16777,6 +16778,7 @@
"version": "1.0.16",
"resolved": "https://registry.npmjs.org/fastest-levenshtein/-/fastest-levenshtein-1.0.16.tgz",
"integrity": "sha512-eRnCtTTtGZFpQCwhJiUOuxPQWRXVKYDn0b2PeHfXL6/Zi53SLAzAHfVhVWK2AryC/WH05kGfxhFIPvTF0SXQzg==",
"dev": true,
"license": "MIT",
"engines": {
"node": ">= 4.9.1"
@ -22610,6 +22612,7 @@
"version": "9.0.3",
"resolved": "https://registry.npmjs.org/minimatch/-/minimatch-9.0.3.tgz",
"integrity": "sha512-RHiac9mvaRw0x3AYRgDC1CxAP7HTcNrrECeA8YYJeWnpo+2Q5CegtZjaotWTWxDG3UeGA1coE05iH1mPjT/2mg==",
"dev": true,
"license": "ISC",
"dependencies": {
"brace-expansion": "^2.0.1"
@ -23017,6 +23020,7 @@
"version": "2.0.0",
"resolved": "https://registry.npmjs.org/mute-stream/-/mute-stream-2.0.0.tgz",
"integrity": "sha512-WWdIxpyjEn+FhQJQQv9aQAYlHoNVdzIzUySNV1gHUPDSdZJ3yZn7pAAbQcV7B56Mvu881q9FZV+0Vx2xC44VWA==",
"dev": true,
"license": "ISC",
"engines": {
"node": "^18.17.0 || >=20.5.0"
@ -30387,6 +30391,7 @@
"version": "4.1.0",
"resolved": "https://registry.npmjs.org/read/-/read-4.1.0.tgz",
"integrity": "sha512-uRfX6K+f+R8OOrYScaM3ixPY4erg69f8DN6pgTvMcA9iRc8iDhwrA4m3Yu8YYKsXJgVvum+m8PkRboZwwuLzYA==",
"dev": true,
"license": "ISC",
"dependencies": {
"mute-stream": "^2.0.0"

View File

@ -1,17 +1,78 @@
import type { PeerId } from "@libp2p/interface";
import type { IDecodedMessage, IDecoder } from "./message.js";
import type { ContentTopic, ThisOrThat } from "./misc.js";
import type {
Callback,
ProtocolError,
SDKProtocolResult
} from "./protocols.js";
import type { IReceiver } from "./receiver.js";
import type { Callback } from "./protocols.js";
export type SubscriptionCallback<T extends IDecodedMessage> = {
decoders: IDecoder<T>[];
callback: Callback<T>;
export type IFilter = {
readonly multicodec: string;
/**
* Subscribes to messages that match the filtering criteria defined in the specified decoders.
* Executes a callback upon receiving each message.
* Checks for a valid peer connection before starting. Will wait until a peer is available.
*
* @param decoders - One or more decoders that specify the filtering criteria for this subscription.
* @param callback - Function called when a message matching the filtering criteria is received.
* @returns Promise that resolves to boolean indicating if the subscription was created successfully.
*
* @example
* // Subscribe to a single decoder
* await filter.subscribe(decoder, (msg) => console.log(msg));
*
* @example
* // Subscribe to multiple decoders with the same pubsub topic
* await filter.subscribe([decoder1, decoder2], (msg) => console.log(msg));
*
* @example
* // Handle subscription failure
* const success = await filter.subscribe(decoder, handleMessage);
* if (!success) {
* console.error("Failed to subscribe");
* }
*/
subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>
): Promise<boolean>;
/**
* Unsubscribes from messages with specified decoders.
*
* @param decoders - Single decoder or array of decoders to unsubscribe from. All decoders must share the same pubsubTopic.
* @returns Promise that resolves to true if unsubscription was successful, false otherwise.
*
* @example
* // Unsubscribe from a single decoder
* await filter.unsubscribe(decoder);
*
* @example
* // Unsubscribe from multiple decoders at once
* await filter.unsubscribe([decoder1, decoder2]);
*
* @example
* // Handle unsubscription failure
* const success = await filter.unsubscribe(decoder);
* if (!success) {
* console.error("Failed to unsubscribe");
* }
*/
unsubscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[]
): Promise<boolean>;
/**
* Unsubscribes from all active subscriptions across all pubsub topics.
*
* @example
* // Clean up all subscriptions when React component unmounts
* useEffect(() => {
* return () => filter.unsubscribeAll();
* }, [filter]);
*
* @example
* // Reset subscriptions and start over
* filter.unsubscribeAll();
* await filter.subscribe(newDecoder, newCallback);
*/
unsubscribeAll(): void;
};
export type FilterProtocolOptions = {
@ -30,52 +91,9 @@ export type FilterProtocolOptions = {
pingsBeforePeerRenewed: number;
/**
* Enables js-waku to send probe LightPush message over subscribed pubsubTopics on created subscription.
* In case message won't be received back through Filter - js-waku will attempt to subscribe to another peer.
* Number of peers to be used for establishing subscriptions.
*
* @default false
* @default 2
*/
enableLightPushFilterCheck: boolean;
numPeersToUse: number;
};
export interface ISubscription {
subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>
): Promise<SDKProtocolResult>;
unsubscribe(contentTopics: ContentTopic[]): Promise<SDKProtocolResult>;
ping(peerId?: PeerId): Promise<SDKProtocolResult>;
unsubscribeAll(): Promise<SDKProtocolResult>;
}
export type IFilter = IReceiver & {
readonly multicodec: string;
subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>
): Promise<SubscribeResult>;
};
export type SubscribeResult = SubscriptionSuccess | SubscriptionError;
type SubscriptionSuccess = {
subscription: ISubscription;
error: null;
results: SDKProtocolResult;
};
type SubscriptionError = {
subscription: null;
error: ProtocolError;
results: null;
};
export type CreateSubscriptionResult = ThisOrThat<
"subscription",
ISubscription,
"error",
ProtocolError
>;

View File

@ -1,98 +0,0 @@
import type { IDecodedMessage, IDecoder } from "./message.js";
import type { Callback } from "./protocols.js";
export type INextFilter = {
readonly multicodec: string;
/**
* Subscribes to messages with specified decoders and executes callback when a message is received.
* In case no peers available initially - will delay subscription till connects to any peer.
*
* @param decoders - Single decoder or array of decoders to subscribe to. All decoders must share the same pubsubTopic.
* @param callback - Function called when a message matching the decoder's contentTopic is received.
* @returns Promise that resolves to true if subscription was successful, false otherwise.
*
* @example
* // Subscribe to a single content topic
* await filter.subscribe(decoder, (msg) => console.log(msg));
*
* @example
* // Subscribe to multiple content topics with the same pubsub topic
* await filter.subscribe([decoder1, decoder2], (msg) => console.log(msg));
*
* @example
* // Handle subscription failure
* const success = await filter.subscribe(decoder, handleMessage);
* if (!success) {
* console.error("Failed to subscribe");
* }
*/
subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>
): Promise<boolean>;
/**
* Unsubscribes from messages with specified decoders.
*
* @param decoders - Single decoder or array of decoders to unsubscribe from. All decoders must share the same pubsubTopic.
* @returns Promise that resolves to true if unsubscription was successful, false otherwise.
*
* @example
* // Unsubscribe from a single decoder
* await filter.unsubscribe(decoder);
*
* @example
* // Unsubscribe from multiple decoders at once
* await filter.unsubscribe([decoder1, decoder2]);
*
* @example
* // Handle unsubscription failure
* const success = await filter.unsubscribe(decoder);
* if (!success) {
* console.error("Failed to unsubscribe");
* }
*/
unsubscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[]
): Promise<boolean>;
/**
* Unsubscribes from all active subscriptions across all pubsub topics.
*
* @example
* // Clean up all subscriptions when React component unmounts
* useEffect(() => {
* return () => filter.unsubscribeAll();
* }, [filter]);
*
* @example
* // Reset subscriptions and start over
* filter.unsubscribeAll();
* await filter.subscribe(newDecoder, newCallback);
*/
unsubscribeAll(): void;
};
export type NextFilterOptions = {
/**
* Interval with which Filter subscription will attempt to send ping requests to subscribed peers.
*
* @default 60_000
*/
keepAliveIntervalMs: number;
/**
* Number of failed pings allowed to make to a remote peer before attempting to subscribe to a new one.
*
* @default 3
*/
pingsBeforePeerRenewed: number;
/**
* Number of peers to be used for establishing subscriptions.
*
* @default 2
*/
numPeersToUse: number;
};

View File

@ -1,6 +1,5 @@
export * from "./enr.js";
export * from "./filter.js";
export * from "./filter_next.js";
export * from "./light_push.js";
export * from "./message.js";
export * from "./peer_exchange.js";

View File

@ -1,14 +1,10 @@
import type { IDecodedMessage, IDecoder } from "./message.js";
import type {
ContentTopic,
IAsyncIterator,
PubsubTopic,
Unsubscribe
} from "./misc.js";
import type { IAsyncIterator, Unsubscribe } from "./misc.js";
import type { Callback } from "./protocols.js";
export type ActiveSubscriptions = Map<PubsubTopic, ContentTopic[]>;
/**
* @deprecated will be replaced in next version
*/
export interface IReceiver {
toSubscriptionIterator: <T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[]

View File

@ -3,7 +3,6 @@ import type { MultiaddrInput } from "@multiformats/multiaddr";
import type { IConnectionManager } from "./connection_manager.js";
import type { IFilter } from "./filter.js";
import type { INextFilter } from "./filter_next.js";
import type { IHealthIndicator } from "./health_indicator.js";
import type { Libp2p } from "./libp2p.js";
import type { ILightPush } from "./light_push.js";
@ -36,12 +35,7 @@ export interface IWaku {
relay?: IRelay;
store?: IStore;
/**
* @deprecated use IWaku.nextFilter instead
*/
filter?: IFilter;
nextFilter?: INextFilter;
lightPush?: ILightPush;
connectionManager: IConnectionManager;
health: IHealthIndicator;
@ -217,7 +211,6 @@ export interface LightNode extends IWaku {
relay: undefined;
store: IStore;
filter: IFilter;
nextFilter: INextFilter;
lightPush: ILightPush;
}

View File

@ -9,7 +9,6 @@ import { SignaturePolicy } from "@chainsafe/libp2p-gossipsub/types";
import type { PubSub as Libp2pPubsub } from "@libp2p/interface";
import { sha256 } from "@noble/hashes/sha256";
import {
ActiveSubscriptions,
Callback,
CreateNodeOptions,
IAsyncIterator,
@ -42,6 +41,8 @@ export type Observer<T extends IDecodedMessage> = {
export type RelayCreateOptions = CreateNodeOptions & GossipsubOpts;
export type ContentTopic = string;
type ActiveSubscriptions = Map<PubsubTopic, ContentTopic[]>;
type RelayConstructorParams = {
libp2p: Libp2p;
pubsubTopics: PubsubTopic[];

View File

@ -76,11 +76,11 @@ describe("Longevity", function () {
await waku.waitForPeers([Protocols.Filter]);
const decoder = createDecoder(ContentTopic, singleShardInfo);
const { error } = await waku.filter.subscribe(
const hasSubscribed = await waku.filter.subscribe(
[decoder],
messageCollector.callback
);
if (error) throw error;
if (!hasSubscribed) throw new Error("Failed to subscribe from the start.");
const encoder = createEncoder({
contentTopic: ContentTopic,

View File

@ -1,3 +0,0 @@
export const DEFAULT_KEEP_ALIVE = 60_000;
export const DEFAULT_MAX_PINGS = 3;
export const DEFAULT_LIGHT_PUSH_FILTER_CHECK = false;

View File

@ -1,297 +1,189 @@
import { ConnectionManager, FilterCore } from "@waku/core";
import type {
Callback,
CreateSubscriptionResult,
FilterProtocolOptions,
IAsyncIterator,
IDecodedMessage,
IDecoder,
IFilter,
ILightPush,
Libp2p,
PubsubTopic,
SubscribeResult,
Unsubscribe
Libp2p
} from "@waku/interfaces";
import { NetworkConfig, ProtocolError } from "@waku/interfaces";
import {
ensurePubsubTopicIsConfigured,
groupByContentTopic,
Logger,
shardInfoToPubsubTopics,
toAsyncIterator
} from "@waku/utils";
import { WakuMessage } from "@waku/proto";
import { Logger } from "@waku/utils";
import { PeerManager } from "../peer_manager/index.js";
import { Subscription } from "./subscription.js";
import { buildConfig } from "./utils.js";
import { FilterConstructorParams } from "./types.js";
const log = new Logger("sdk:filter");
const log = new Logger("sdk:next-filter");
type FilterConstructorParams = {
connectionManager: ConnectionManager;
libp2p: Libp2p;
peerManager: PeerManager;
lightPush?: ILightPush;
options?: Partial<FilterProtocolOptions>;
};
type PubsubTopic = string;
export class Filter implements IFilter {
public readonly protocol: FilterCore;
private readonly libp2p: Libp2p;
private readonly protocol: FilterCore;
private readonly peerManager: PeerManager;
private readonly connectionManager: ConnectionManager;
private readonly config: FilterProtocolOptions;
private connectionManager: ConnectionManager;
private libp2p: Libp2p;
private peerManager: PeerManager;
private lightPush?: ILightPush;
private activeSubscriptions = new Map<string, Subscription>();
private subscriptions = new Map<PubsubTopic, Subscription>();
public constructor(params: FilterConstructorParams) {
this.config = buildConfig(params.options);
this.lightPush = params.lightPush;
this.peerManager = params.peerManager;
this.config = {
numPeersToUse: 2,
pingsBeforePeerRenewed: 3,
keepAliveIntervalMs: 60_000,
...params.options
};
this.libp2p = params.libp2p;
this.peerManager = params.peerManager;
this.connectionManager = params.connectionManager;
this.protocol = new FilterCore(
async (pubsubTopic, wakuMessage, peerIdStr) => {
const subscription = this.getActiveSubscription(pubsubTopic);
if (!subscription) {
log.error(
`No subscription locally registered for topic ${pubsubTopic}`
);
return;
}
await subscription.processIncomingMessage(wakuMessage, peerIdStr);
},
this.onIncomingMessage.bind(this),
params.connectionManager.pubsubTopics,
params.libp2p
);
this.activeSubscriptions = new Map();
}
public get multicodec(): string {
return this.protocol.multicodec;
}
/**
* Opens a subscription with the Filter protocol using the provided decoders and callback.
* This method combines the functionality of creating a subscription and subscribing to it.
*
* @param {IDecoder<T> | IDecoder<T>[]} decoders - A single decoder or an array of decoders to use for decoding messages.
* @param {Callback<T>} callback - The callback function to be invoked with decoded messages.
*
* @returns {Promise<SubscribeResult>} A promise that resolves to an object containing:
* - subscription: The created subscription object if successful, or null if failed.
* - error: A ProtocolError if the subscription creation failed, or null if successful.
* - results: An object containing arrays of failures and successes from the subscription process.
* Only present if the subscription was created successfully.
*
* @throws {Error} If there's an unexpected error during the subscription process.
*
* @remarks
* This method attempts to create a subscription using the pubsub topic derived from the provided decoders,
* then tries to subscribe using the created subscription. The return value should be interpreted as follows:
* - If `subscription` is null and `error` is non-null, a critical error occurred and the subscription failed completely.
* - If `subscription` is non-null and `error` is null, the subscription was created successfully.
* In this case, check the `results` field for detailed information about successes and failures during the subscription process.
* - Even if the subscription was created successfully, there might be some failures in the results.
*
* @example
* ```typescript
* const {subscription, error, results} = await waku.filter.subscribe(decoders, callback);
* if (!subscription || error) {
* console.error("Failed to create subscription:", error);
* }
* console.log("Subscription created successfully");
* if (results.failures.length > 0) {
* console.warn("Some errors occurred during subscription:", results.failures);
* }
* console.log("Successful subscriptions:", results.successes);
*
* ```
*/
public async subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>
): Promise<SubscribeResult> {
const uniquePubsubTopics = this.getUniquePubsubTopics(decoders);
if (uniquePubsubTopics.length !== 1) {
return {
subscription: null,
error: ProtocolError.INVALID_DECODER_TOPICS,
results: null
};
public unsubscribeAll(): void {
for (const subscription of this.subscriptions.values()) {
subscription.stop();
}
const pubsubTopic = uniquePubsubTopics[0];
const { subscription, error } = await this.createSubscription(pubsubTopic);
if (error) {
return {
subscription: null,
error: error,
results: null
};
}
const { failures, successes } = await subscription.subscribe(
decoders,
callback
);
return {
subscription,
error: null,
results: {
failures: failures,
successes: successes
}
};
this.subscriptions.clear();
}
/**
* Creates a new subscription to the given pubsub topic.
* The subscription is made to multiple peers for decentralization.
* @param pubsubTopicShardInfo The pubsub topic to subscribe to.
* @returns The subscription object.
*/
private async createSubscription(
pubsubTopicShardInfo: NetworkConfig | PubsubTopic
): Promise<CreateSubscriptionResult> {
const pubsubTopic =
typeof pubsubTopicShardInfo == "string"
? pubsubTopicShardInfo
: shardInfoToPubsubTopics(pubsubTopicShardInfo)?.[0];
public async subscribe<T extends IDecodedMessage>(
decoder: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>
): Promise<boolean> {
const decoders = Array.isArray(decoder) ? decoder : [decoder];
ensurePubsubTopicIsConfigured(pubsubTopic, this.protocol.pubsubTopics);
if (decoders.length === 0) {
throw Error("Cannot subscribe with 0 decoders.");
}
const peerIds = this.peerManager.getPeers();
if (peerIds.length === 0) {
return {
error: ProtocolError.NO_PEER_AVAILABLE,
subscription: null
};
const pubsubTopics = decoders.map((v) => v.pubsubTopic);
const singlePubsubTopic = pubsubTopics[0];
const contentTopics = decoders.map((v) => v.contentTopic);
log.info(
`Subscribing to contentTopics: ${contentTopics}, pubsubTopic: ${singlePubsubTopic}`
);
this.throwIfTopicNotSame(pubsubTopics);
this.throwIfTopicNotSupported(singlePubsubTopic);
let subscription = this.subscriptions.get(singlePubsubTopic);
if (!subscription) {
subscription = new Subscription({
pubsubTopic: singlePubsubTopic,
libp2p: this.libp2p,
protocol: this.protocol,
config: this.config,
peerManager: this.peerManager
});
subscription.start();
}
const result = await subscription.add(decoders, callback);
this.subscriptions.set(singlePubsubTopic, subscription);
log.info(
`Subscription ${result ? "successful" : "failed"} for content topic: ${contentTopics}`
);
return result;
}
public async unsubscribe<T extends IDecodedMessage>(
decoder: IDecoder<T> | IDecoder<T>[]
): Promise<boolean> {
const decoders = Array.isArray(decoder) ? decoder : [decoder];
if (decoders.length === 0) {
throw Error("Cannot unsubscribe with 0 decoders.");
}
const pubsubTopics = decoders.map((v) => v.pubsubTopic);
const singlePubsubTopic = pubsubTopics[0];
const contentTopics = decoders.map((v) => v.contentTopic);
log.info(
`Unsubscribing from contentTopics: ${contentTopics}, pubsubTopic: ${singlePubsubTopic}`
);
this.throwIfTopicNotSame(pubsubTopics);
this.throwIfTopicNotSupported(singlePubsubTopic);
const subscription = this.subscriptions.get(singlePubsubTopic);
if (!subscription) {
log.warn("No subscriptions associated with the decoder.");
return false;
}
const result = await subscription.remove(decoders);
if (subscription.isEmpty()) {
log.warn("Subscription has no decoders anymore, terminating it.");
subscription.stop();
this.subscriptions.delete(singlePubsubTopic);
}
log.info(
`Creating filter subscription with ${peerIds.length} peers: `,
peerIds.map((id) => id.toString())
`Unsubscribing ${result ? "successful" : "failed"} for content topic: ${contentTopics}`
);
const subscription =
this.getActiveSubscription(pubsubTopic) ??
this.setActiveSubscription(
pubsubTopic,
new Subscription(
pubsubTopic,
this.protocol,
this.connectionManager,
this.peerManager,
this.libp2p,
this.config,
this.lightPush
)
);
return {
error: null,
subscription
};
return result;
}
/**
* This method is used to satisfy the `IReceiver` interface.
*
* @hidden
*
* @param decoders The decoders to use for the subscription.
* @param callback The callback function to use for the subscription.
* @param opts Optional protocol options for the subscription.
*
* @returns A Promise that resolves to a function that unsubscribes from the subscription.
*
* @remarks
* This method should not be used directly.
* Instead, use `createSubscription` to create a new subscription.
*/
public async subscribeWithUnsubscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>
): Promise<Unsubscribe> {
const uniquePubsubTopics = this.getUniquePubsubTopics<T>(decoders);
private async onIncomingMessage(
pubsubTopic: string,
message: WakuMessage,
peerId: string
): Promise<void> {
log.info(
`Received message for pubsubTopic:${pubsubTopic}, contentTopic:${message.contentTopic}, peerId:${peerId.toString()}`
);
if (uniquePubsubTopics.length === 0) {
const subscription = this.subscriptions.get(pubsubTopic);
if (!subscription) {
log.error(`No subscription locally registered for topic ${pubsubTopic}`);
return;
}
subscription.invoke(message, peerId);
}
// Limiting to one pubsubTopic for simplicity reasons, we can enable subscription for more than one PubsubTopic at once later when requested
private throwIfTopicNotSame(pubsubTopics: string[]): void {
const first = pubsubTopics[0];
const isSameTopic = pubsubTopics.every((t) => t === first);
if (!isSameTopic) {
throw Error(
"Failed to subscribe: no pubsubTopic found on decoders provided."
`Cannot subscribe to more than one pubsub topic at the same time, got pubsubTopics:${pubsubTopics}`
);
}
}
if (uniquePubsubTopics.length > 1) {
private throwIfTopicNotSupported(pubsubTopic: string): void {
const supportedPubsubTopic =
this.connectionManager.pubsubTopics.includes(pubsubTopic);
if (!supportedPubsubTopic) {
throw Error(
"Failed to subscribe: all decoders should have the same pubsub topic. Use createSubscription to be more agile."
`Pubsub topic ${pubsubTopic} has not been configured on this instance.`
);
}
const { subscription, error } = await this.createSubscription(
uniquePubsubTopics[0]
);
if (error) {
throw Error(`Failed to create subscription: ${error}`);
}
await subscription.subscribe(decoders, callback);
const contentTopics = Array.from(
groupByContentTopic(
Array.isArray(decoders) ? decoders : [decoders]
).keys()
);
return async () => {
await subscription.unsubscribe(contentTopics);
};
}
public toSubscriptionIterator<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[]
): Promise<IAsyncIterator<T>> {
return toAsyncIterator(this, decoders);
}
private getActiveSubscription(
pubsubTopic: PubsubTopic
): Subscription | undefined {
return this.activeSubscriptions.get(pubsubTopic);
}
private setActiveSubscription(
pubsubTopic: PubsubTopic,
subscription: Subscription
): Subscription {
this.activeSubscriptions.set(pubsubTopic, subscription);
return subscription;
}
private getUniquePubsubTopics<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[]
): string[] {
if (!Array.isArray(decoders)) {
return [decoders.pubsubTopic];
}
if (decoders.length === 0) {
return [];
}
const pubsubTopics = new Set(decoders.map((d) => d.pubsubTopic));
return [...pubsubTopics];
}
}

View File

@ -1,10 +1,10 @@
import type { PeerId } from "@libp2p/interface";
import { FilterCore } from "@waku/core";
import type {
FilterProtocolOptions,
IDecodedMessage,
IDecoder,
Libp2p,
NextFilterOptions
Libp2p
} from "@waku/interfaces";
import { WakuMessage } from "@waku/proto";
import { expect } from "chai";
@ -23,7 +23,7 @@ describe("Filter Subscription", () => {
let peerManager: PeerManager;
let subscription: Subscription;
let decoder: IDecoder<IDecodedMessage>;
let config: NextFilterOptions;
let config: FilterProtocolOptions;
beforeEach(() => {
libp2p = mockLibp2p();

View File

@ -1,260 +1,592 @@
import { ConnectionManager, createDecoder, FilterCore } from "@waku/core";
import {
type Callback,
type ContentTopic,
type CoreProtocolResult,
type EventHandler,
type PeerId,
TypedEventEmitter
} from "@libp2p/interface";
import { FilterCore, messageHashStr } from "@waku/core";
import type {
Callback,
FilterProtocolOptions,
type IDecodedMessage,
type IDecoder,
type ILightPush,
type IProtoMessage,
type ISubscription,
type Libp2p,
type PeerIdStr,
ProtocolError,
type PubsubTopic,
type SDKProtocolResult,
SubscriptionCallback
IDecodedMessage,
IDecoder,
IProtoMessage,
Libp2p
} from "@waku/interfaces";
import { WakuMessage } from "@waku/proto";
import { groupByContentTopic, Logger } from "@waku/utils";
import { Logger } from "@waku/utils";
import { PeerManager } from "../peer_manager/index.js";
import { SubscriptionMonitor } from "./subscription_monitor.js";
import { SubscriptionEvents, SubscriptionParams } from "./types.js";
import { TTLSet } from "./utils.js";
const log = new Logger("sdk:filter:subscription");
const log = new Logger("sdk:filter-subscription");
export class Subscription implements ISubscription {
private readonly monitor: SubscriptionMonitor;
type AttemptSubscribeParams = {
useNewContentTopics: boolean;
useOnlyNewPeers?: boolean;
};
private subscriptionCallbacks: Map<
ContentTopic,
SubscriptionCallback<IDecodedMessage>
> = new Map();
type AttemptUnsubscribeParams = {
useNewContentTopics: boolean;
};
public constructor(
private readonly pubsubTopic: PubsubTopic,
private readonly protocol: FilterCore,
connectionManager: ConnectionManager,
peerManager: PeerManager,
libp2p: Libp2p,
private readonly config: FilterProtocolOptions,
lightPush?: ILightPush
) {
this.pubsubTopic = pubsubTopic;
export class Subscription {
private readonly libp2p: Libp2p;
private readonly pubsubTopic: string;
private readonly protocol: FilterCore;
private readonly peerManager: PeerManager;
this.monitor = new SubscriptionMonitor({
pubsubTopic,
config,
libp2p,
connectionManager,
filter: protocol,
peerManager,
lightPush,
activeSubscriptions: this.subscriptionCallbacks
});
private readonly config: FilterProtocolOptions;
private isStarted: boolean = false;
private inProgress: boolean = false;
private peers = new Set<PeerId>();
private peerFailures = new Map<PeerId, number>();
private readonly receivedMessages = new TTLSet<string>(60_000);
private callbacks = new Map<
IDecoder<IDecodedMessage>,
EventHandler<CustomEvent<WakuMessage>>
>();
private messageEmitter = new TypedEventEmitter<SubscriptionEvents>();
private toSubscribeContentTopics = new Set<string>();
private toUnsubscribeContentTopics = new Set<string>();
private subscribeIntervalId: number | null = null;
private keepAliveIntervalId: number | null = null;
private get contentTopics(): string[] {
const allTopics = Array.from(this.callbacks.keys()).map(
(k) => k.contentTopic
);
const uniqueTopics = new Set(allTopics).values();
return Array.from(uniqueTopics);
}
public async subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
public constructor(params: SubscriptionParams) {
this.config = params.config;
this.pubsubTopic = params.pubsubTopic;
this.libp2p = params.libp2p;
this.protocol = params.protocol;
this.peerManager = params.peerManager;
this.onPeerConnected = this.onPeerConnected.bind(this);
this.onPeerDisconnected = this.onPeerDisconnected.bind(this);
}
public start(): void {
log.info(`Starting subscription for pubsubTopic: ${this.pubsubTopic}`);
if (this.isStarted || this.inProgress) {
log.info("Subscription already started or in progress, skipping start");
return;
}
this.inProgress = true;
void this.attemptSubscribe({
useNewContentTopics: false
});
this.setupSubscriptionInterval();
this.setupKeepAliveInterval();
this.setupEventListeners();
this.isStarted = true;
this.inProgress = false;
log.info(`Subscription started for pubsubTopic: ${this.pubsubTopic}`);
}
public stop(): void {
log.info(`Stopping subscription for pubsubTopic: ${this.pubsubTopic}`);
if (!this.isStarted || this.inProgress) {
log.info("Subscription not started or stop in progress, skipping stop");
return;
}
this.inProgress = true;
this.disposeEventListeners();
this.disposeIntervals();
void this.disposePeers();
this.disposeHandlers();
this.receivedMessages.dispose();
this.inProgress = false;
this.isStarted = false;
log.info(`Subscription stopped for pubsubTopic: ${this.pubsubTopic}`);
}
public isEmpty(): boolean {
return this.callbacks.size === 0;
}
public async add<T extends IDecodedMessage>(
decoder: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>
): Promise<SDKProtocolResult> {
const decodersArray = Array.isArray(decoders) ? decoders : [decoders];
): Promise<boolean> {
const decoders = Array.isArray(decoder) ? decoder : [decoder];
// check that all decoders are configured for the same pubsub topic as this subscription
for (const decoder of decodersArray) {
if (decoder.pubsubTopic !== this.pubsubTopic) {
return {
failures: [
{
error: ProtocolError.TOPIC_DECODER_MISMATCH
}
],
successes: []
};
}
for (const decoder of decoders) {
this.addSingle(decoder, callback);
}
if (this.config.enableLightPushFilterCheck) {
decodersArray.push(
createDecoder(
this.monitor.reservedContentTopic,
this.pubsubTopic
) as IDecoder<T>
return this.toSubscribeContentTopics.size > 0
? await this.attemptSubscribe({ useNewContentTopics: true })
: true; // if content topic is not new - subscription, most likely exists
}
public async remove<T extends IDecodedMessage>(
decoder: IDecoder<T> | IDecoder<T>[]
): Promise<boolean> {
const decoders = Array.isArray(decoder) ? decoder : [decoder];
for (const decoder of decoders) {
this.removeSingle(decoder);
}
return this.toUnsubscribeContentTopics.size > 0
? await this.attemptUnsubscribe({ useNewContentTopics: true })
: true; // no need to unsubscribe if there are other decoders on the contentTopic
}
public invoke(message: WakuMessage, _peerId: string): void {
if (this.isMessageReceived(message)) {
log.info(
`Skipping invoking callbacks for already received message: pubsubTopic:${this.pubsubTopic}, peerId:${_peerId.toString()}, contentTopic:${message.contentTopic}`
);
}
const decodersGroupedByCT = groupByContentTopic(decodersArray);
const contentTopics = Array.from(decodersGroupedByCT.keys());
const peers = await this.monitor.getPeers();
const promises = peers.map(async (peer) => {
return this.protocol.subscribe(this.pubsubTopic, peer, contentTopics);
});
const results = await Promise.allSettled(promises);
const finalResult = this.handleResult(results, "subscribe");
// Save the callback functions by content topics so they
// can easily be removed (reciprocally replaced) if `unsubscribe` (reciprocally `subscribe`)
// is called for those content topics
decodersGroupedByCT.forEach((decoders, contentTopic) => {
// Cast the type because a given `subscriptionCallbacks` map may hold
// Decoder that decode to different implementations of `IDecodedMessage`
const subscriptionCallback = {
decoders,
callback
} as unknown as SubscriptionCallback<IDecodedMessage>;
// don't handle case of internal content topic
if (contentTopic === this.monitor.reservedContentTopic) {
return;
}
// The callback and decoder may override previous values, this is on
// purpose as the user may call `subscribe` to refresh the subscription
this.subscriptionCallbacks.set(contentTopic, subscriptionCallback);
});
this.monitor.start();
return finalResult;
}
public async unsubscribe(
contentTopics: ContentTopic[]
): Promise<SDKProtocolResult> {
const peers = await this.monitor.getPeers();
const promises = peers.map(async (peer) => {
const response = await this.protocol.unsubscribe(
this.pubsubTopic,
peer,
contentTopics
);
contentTopics.forEach((contentTopic: string) => {
this.subscriptionCallbacks.delete(contentTopic);
});
return response;
});
const results = await Promise.allSettled(promises);
const finalResult = this.handleResult(results, "unsubscribe");
if (this.subscriptionCallbacks.size === 0) {
this.monitor.stop();
}
return finalResult;
}
public async ping(): Promise<SDKProtocolResult> {
const peers = await this.monitor.getPeers();
const promises = peers.map((peer) => this.protocol.ping(peer));
const results = await Promise.allSettled(promises);
return this.handleResult(results, "ping");
}
public async unsubscribeAll(): Promise<SDKProtocolResult> {
const peers = await this.monitor.getPeers();
const promises = peers.map(async (peer) =>
this.protocol.unsubscribeAll(this.pubsubTopic, peer)
);
const results = await Promise.allSettled(promises);
this.subscriptionCallbacks.clear();
const finalResult = this.handleResult(results, "unsubscribeAll");
this.monitor.stop();
return finalResult;
}
public async processIncomingMessage(
message: WakuMessage,
peerIdStr: PeerIdStr
): Promise<void> {
const received = this.monitor.notifyMessageReceived(
peerIdStr,
message as IProtoMessage
);
if (received) {
log.info("Message already received, skipping");
return;
}
const { contentTopic } = message;
const subscriptionCallback = this.subscriptionCallbacks.get(contentTopic);
if (!subscriptionCallback) {
log.error("No subscription callback available for ", contentTopic);
return;
}
log.info(
"Processing message with content topic ",
contentTopic,
" on pubsub topic ",
this.pubsubTopic
log.info(`Invoking message for contentTopic: ${message.contentTopic}`);
this.messageEmitter.dispatchEvent(
new CustomEvent<WakuMessage>(message.contentTopic, {
detail: message
})
);
await pushMessage(subscriptionCallback, this.pubsubTopic, message);
}
private handleResult(
results: PromiseSettledResult<CoreProtocolResult>[],
type: "ping" | "subscribe" | "unsubscribe" | "unsubscribeAll"
): SDKProtocolResult {
const result: SDKProtocolResult = { failures: [], successes: [] };
private addSingle<T extends IDecodedMessage>(
decoder: IDecoder<T>,
callback: Callback<T>
): void {
log.info(`Adding subscription for contentTopic: ${decoder.contentTopic}`);
for (const promiseResult of results) {
if (promiseResult.status === "rejected") {
log.error(
`Failed to resolve ${type} promise successfully: `,
promiseResult.reason
);
result.failures.push({ error: ProtocolError.GENERIC_FAIL });
} else {
const coreResult = promiseResult.value;
if (coreResult.failure) {
result.failures.push(coreResult.failure);
} else {
result.successes.push(coreResult.success);
const isNewContentTopic = !this.contentTopics.includes(
decoder.contentTopic
);
if (isNewContentTopic) {
this.toSubscribeContentTopics.add(decoder.contentTopic);
}
if (this.callbacks.has(decoder)) {
log.warn(
`Replacing callback associated associated with decoder with pubsubTopic:${decoder.pubsubTopic} and contentTopic:${decoder.contentTopic}`
);
const callback = this.callbacks.get(decoder);
this.callbacks.delete(decoder);
this.messageEmitter.removeEventListener(decoder.contentTopic, callback);
}
const eventHandler = (event: CustomEvent<WakuMessage>): void => {
void (async (): Promise<void> => {
try {
const message = await decoder.fromProtoObj(
decoder.pubsubTopic,
event.detail as IProtoMessage
);
void callback(message!);
} catch (err) {
log.error("Error decoding message", err);
}
}
})();
};
this.callbacks.set(decoder, eventHandler);
this.messageEmitter.addEventListener(decoder.contentTopic, eventHandler);
log.info(
`Subscription added for contentTopic: ${decoder.contentTopic}, isNewContentTopic: ${isNewContentTopic}`
);
}
private removeSingle<T extends IDecodedMessage>(decoder: IDecoder<T>): void {
log.info(`Removing subscription for contentTopic: ${decoder.contentTopic}`);
const callback = this.callbacks.get(decoder);
if (!callback) {
log.warn(
`No callback associated with decoder with pubsubTopic:${decoder.pubsubTopic} and contentTopic:${decoder.contentTopic}`
);
}
return result;
}
}
async function pushMessage<T extends IDecodedMessage>(
subscriptionCallback: SubscriptionCallback<T>,
pubsubTopic: PubsubTopic,
message: WakuMessage
): Promise<void> {
const { decoders, callback } = subscriptionCallback;
this.callbacks.delete(decoder);
this.messageEmitter.removeEventListener(decoder.contentTopic, callback);
const { contentTopic } = message;
if (!contentTopic) {
log.warn("Message has no content topic, skipping");
return;
}
try {
const decodePromises = decoders.map((dec) =>
dec
.fromProtoObj(pubsubTopic, message as IProtoMessage)
.then((decoded) => decoded || Promise.reject("Decoding failed"))
const isCompletelyRemoved = !this.contentTopics.includes(
decoder.contentTopic
);
const decodedMessage = await Promise.any(decodePromises);
if (isCompletelyRemoved) {
this.toUnsubscribeContentTopics.add(decoder.contentTopic);
}
await callback(decodedMessage);
} catch (e) {
log.error("Error decoding message", e);
log.info(
`Subscription removed for contentTopic: ${decoder.contentTopic}, isCompletelyRemoved: ${isCompletelyRemoved}`
);
}
private isMessageReceived(message: WakuMessage): boolean {
try {
const messageHash = messageHashStr(
this.pubsubTopic,
message as IProtoMessage
);
if (this.receivedMessages.has(messageHash)) {
return true;
}
this.receivedMessages.add(messageHash);
} catch (e) {
// do nothing on throw, message will be handled as not received
}
return false;
}
private setupSubscriptionInterval(): void {
const subscriptionRefreshIntervalMs = 1000;
log.info(
`Setting up subscription interval with period ${subscriptionRefreshIntervalMs}ms`
);
this.subscribeIntervalId = setInterval(() => {
const run = async (): Promise<void> => {
if (this.toSubscribeContentTopics.size > 0) {
log.info(
`Subscription interval: ${this.toSubscribeContentTopics.size} topics to subscribe`
);
void (await this.attemptSubscribe({ useNewContentTopics: true }));
}
if (this.toUnsubscribeContentTopics.size > 0) {
log.info(
`Subscription interval: ${this.toUnsubscribeContentTopics.size} topics to unsubscribe`
);
void (await this.attemptUnsubscribe({ useNewContentTopics: true }));
}
};
void run();
}, subscriptionRefreshIntervalMs) as unknown as number;
}
private setupKeepAliveInterval(): void {
log.info(
`Setting up keep-alive interval with period ${this.config.keepAliveIntervalMs}ms`
);
this.keepAliveIntervalId = setInterval(() => {
const run = async (): Promise<void> => {
log.info(`Keep-alive interval running for ${this.peers.size} peers`);
let peersToReplace = await Promise.all(
Array.from(this.peers.values()).map(
async (peer): Promise<PeerId | undefined> => {
const response = await this.protocol.ping(peer);
if (response.success) {
log.info(`Ping successful for peer: ${peer.toString()}`);
this.peerFailures.set(peer, 0);
return;
}
let failures = this.peerFailures.get(peer) || 0;
failures += 1;
this.peerFailures.set(peer, failures);
log.warn(
`Ping failed for peer: ${peer.toString()}, failures: ${failures}/${this.config.pingsBeforePeerRenewed}`
);
if (failures < this.config.pingsBeforePeerRenewed) {
return;
}
log.info(
`Peer ${peer.toString()} exceeded max failures (${this.config.pingsBeforePeerRenewed}), will be replaced`
);
return peer;
}
)
);
peersToReplace = peersToReplace.filter((p) => !!p);
await Promise.all(
peersToReplace.map((p) => {
this.peers.delete(p as PeerId);
this.peerFailures.delete(p as PeerId);
return this.requestUnsubscribe(p as PeerId, this.contentTopics);
})
);
if (peersToReplace.length > 0) {
log.info(`Replacing ${peersToReplace.length} failed peers`);
void (await this.attemptSubscribe({
useNewContentTopics: false,
useOnlyNewPeers: true
}));
}
};
void run();
}, this.config.keepAliveIntervalMs) as unknown as number;
}
private setupEventListeners(): void {
this.libp2p.addEventListener(
"peer:connect",
(e) => void this.onPeerConnected(e)
);
this.libp2p.addEventListener(
"peer:disconnect",
(e) => void this.onPeerDisconnected(e)
);
}
private disposeIntervals(): void {
if (this.subscribeIntervalId) {
clearInterval(this.subscribeIntervalId);
}
if (this.keepAliveIntervalId) {
clearInterval(this.keepAliveIntervalId);
}
}
private disposeHandlers(): void {
for (const [decoder, handler] of this.callbacks.entries()) {
this.messageEmitter.removeEventListener(decoder.contentTopic, handler);
}
this.callbacks.clear();
}
private async disposePeers(): Promise<void> {
await this.attemptUnsubscribe({ useNewContentTopics: false });
this.peers.clear();
this.peerFailures = new Map();
}
private disposeEventListeners(): void {
this.libp2p.removeEventListener("peer:connect", this.onPeerConnected);
this.libp2p.removeEventListener("peer:disconnect", this.onPeerDisconnected);
}
private onPeerConnected(event: CustomEvent<PeerId>): void {
log.info(`Peer connected: ${event.detail.toString()}`);
// skip the peer we already subscribe to
if (this.peers.has(event.detail)) {
log.info(`Peer ${event.detail.toString()} already subscribed, skipping`);
return;
}
void this.attemptSubscribe({
useNewContentTopics: false,
useOnlyNewPeers: true
});
}
private onPeerDisconnected(event: CustomEvent<PeerId>): void {
log.info(`Peer disconnected: ${event.detail.toString()}`);
// ignore as the peer is not the one that is in use
if (!this.peers.has(event.detail)) {
log.info(
`Disconnected peer ${event.detail.toString()} not in use, ignoring`
);
return;
}
log.info(
`Active peer ${event.detail.toString()} disconnected, removing from peers list`
);
this.peers.delete(event.detail);
void this.attemptSubscribe({
useNewContentTopics: false,
useOnlyNewPeers: true
});
}
private async attemptSubscribe(
params: AttemptSubscribeParams
): Promise<boolean> {
const { useNewContentTopics, useOnlyNewPeers = false } = params;
const contentTopics = useNewContentTopics
? Array.from(this.toSubscribeContentTopics)
: this.contentTopics;
log.info(
`Attempting to subscribe: useNewContentTopics=${useNewContentTopics}, useOnlyNewPeers=${useOnlyNewPeers}, contentTopics=${contentTopics.length}`
);
if (!contentTopics.length) {
log.warn("Requested content topics is an empty array, skipping");
return false;
}
const prevPeers = new Set(this.peers);
const peersToAdd = this.peerManager.getPeers();
for (const peer of peersToAdd) {
if (this.peers.size >= this.config.numPeersToUse) {
break;
}
this.peers.add(peer);
}
const peersToUse = useOnlyNewPeers
? Array.from(this.peers.values()).filter((p) => !prevPeers.has(p))
: Array.from(this.peers.values());
log.info(
`Subscribing with ${peersToUse.length} peers for ${contentTopics.length} content topics`
);
if (useOnlyNewPeers && peersToUse.length === 0) {
log.warn(`Requested to use only new peers, but no peers found, skipping`);
return false;
}
const results = await Promise.all(
peersToUse.map((p) => this.requestSubscribe(p, contentTopics))
);
const successCount = results.filter((r) => r).length;
log.info(
`Subscribe attempts completed: ${successCount}/${results.length} successful`
);
if (useNewContentTopics) {
this.toSubscribeContentTopics = new Set();
}
return results.some((v) => v);
}
private async requestSubscribe(
peerId: PeerId,
contentTopics: string[]
): Promise<boolean> {
log.info(
`requestSubscribe: pubsubTopic:${this.pubsubTopic}\tcontentTopics:${contentTopics.join(",")}`
);
if (!contentTopics.length || !this.pubsubTopic) {
log.warn(
`requestSubscribe: no contentTopics or pubsubTopic provided, not sending subscribe request`
);
return false;
}
const response = await this.protocol.subscribe(
this.pubsubTopic,
peerId,
contentTopics
);
if (response.failure) {
log.warn(
`requestSubscribe: Failed to subscribe ${this.pubsubTopic} to ${peerId.toString()} with error:${response.failure.error} for contentTopics:${contentTopics}`
);
return false;
}
log.info(
`requestSubscribe: Subscribed ${this.pubsubTopic} to ${peerId.toString()} for contentTopics:${contentTopics}`
);
return true;
}
private async attemptUnsubscribe(
params: AttemptUnsubscribeParams
): Promise<boolean> {
const { useNewContentTopics } = params;
const contentTopics = useNewContentTopics
? Array.from(this.toUnsubscribeContentTopics)
: this.contentTopics;
log.info(
`Attempting to unsubscribe: useNewContentTopics=${useNewContentTopics}, contentTopics=${contentTopics.length}`
);
if (!contentTopics.length) {
log.warn("Requested content topics is an empty array, skipping");
return false;
}
const peersToUse = Array.from(this.peers.values());
const result = await Promise.all(
peersToUse.map((p) =>
this.requestUnsubscribe(
p,
useNewContentTopics ? contentTopics : undefined
)
)
);
const successCount = result.filter((r) => r).length;
log.info(
`Unsubscribe attempts completed: ${successCount}/${result.length} successful`
);
if (useNewContentTopics) {
this.toUnsubscribeContentTopics = new Set();
}
return result.some((v) => v);
}
private async requestUnsubscribe(
peerId: PeerId,
contentTopics?: string[]
): Promise<boolean> {
const response = contentTopics
? await this.protocol.unsubscribe(this.pubsubTopic, peerId, contentTopics)
: await this.protocol.unsubscribeAll(this.pubsubTopic, peerId);
if (response.failure) {
log.warn(
`requestUnsubscribe: Failed to unsubscribe for pubsubTopic:${this.pubsubTopic} from peerId:${peerId.toString()} with error:${response.failure?.error} for contentTopics:${contentTopics}`
);
return false;
}
log.info(
`requestUnsubscribe: Unsubscribed pubsubTopic:${this.pubsubTopic} from peerId:${peerId.toString()} for contentTopics:${contentTopics}`
);
return true;
}
}

View File

@ -1,292 +0,0 @@
import type { EventHandler, PeerId } from "@libp2p/interface";
import { FilterCore, messageHashStr } from "@waku/core";
import type {
FilterProtocolOptions,
IConnectionManager,
ILightPush,
IProtoMessage,
Libp2p
} from "@waku/interfaces";
import { EConnectionStateEvents } from "@waku/interfaces";
import { PeerManager } from "../peer_manager/index.js";
// TODO(weboko): consider adding as config property or combine with maxAllowedPings
const MAX_SUBSCRIBE_ATTEMPTS = 3;
type SubscriptionMonitorConstructorOptions = {
pubsubTopic: string;
config: FilterProtocolOptions;
libp2p: Libp2p;
connectionManager: IConnectionManager;
filter: FilterCore;
peerManager: PeerManager;
lightPush?: ILightPush;
activeSubscriptions: Map<string, unknown>;
};
export class SubscriptionMonitor {
/**
* Cached peers that are in use by subscription.
* Needed to understand if they disconnect later or not.
*/
public peerIds: PeerId[] = [];
private isStarted: boolean = false;
private readonly pubsubTopic: string;
private readonly config: FilterProtocolOptions;
private readonly libp2p: Libp2p;
private readonly filter: FilterCore;
private readonly peerManager: PeerManager;
private readonly connectionManager: IConnectionManager;
private readonly activeSubscriptions: Map<string, unknown>;
private keepAliveIntervalId: number | undefined;
private pingFailedAttempts = new Map<string, number>();
private receivedMessagesFormPeer = new Set<string>();
private receivedMessages = new Set<string>();
private verifiedPeers = new Set<string>();
public constructor(options: SubscriptionMonitorConstructorOptions) {
this.config = options.config;
this.connectionManager = options.connectionManager;
this.filter = options.filter;
this.peerManager = options.peerManager;
this.libp2p = options.libp2p;
this.activeSubscriptions = options.activeSubscriptions;
this.pubsubTopic = options.pubsubTopic;
this.onConnectionChange = this.onConnectionChange.bind(this);
this.onPeerConnected = this.onPeerConnected.bind(this);
this.onPeerDisconnected = this.onPeerDisconnected.bind(this);
}
/**
* @returns content topic used for Filter verification
*/
public get reservedContentTopic(): string {
return `/js-waku-subscription-ping/1/${this.libp2p.peerId.toString()}/utf8`;
}
/**
* Starts:
* - recurring ping queries;
* - connection event observers;
*/
public start(): void {
if (this.isStarted) {
return;
}
this.isStarted = true;
this.startKeepAlive();
this.startConnectionListener();
this.startPeerConnectionListener();
}
/**
* Stops all recurring queries, event listeners or timers.
*/
public stop(): void {
if (!this.isStarted) {
return;
}
this.isStarted = false;
this.stopKeepAlive();
this.stopConnectionListener();
this.stopPeerConnectionListener();
}
/**
* Method to get peers that are used by particular subscription or, if initially called, peers that can be used by subscription.
* @returns array of peers
*/
public async getPeers(): Promise<PeerId[]> {
if (!this.isStarted) {
this.peerIds = this.peerManager.getPeers();
}
return this.peerIds;
}
/**
* Notifies monitor if message was received.
*
* @param peerId peer from which message is received
* @param message received message
*
* @returns true if message was received from peer
*/
public notifyMessageReceived(
peerId: string,
message: IProtoMessage
): boolean {
const hash = this.buildMessageHash(message);
this.verifiedPeers.add(peerId);
this.receivedMessagesFormPeer.add(`${peerId}-${hash}`);
if (this.receivedMessages.has(hash)) {
return true;
}
this.receivedMessages.add(hash);
return false;
}
private buildMessageHash(message: IProtoMessage): string {
return messageHashStr(this.pubsubTopic, message);
}
private startConnectionListener(): void {
this.connectionManager.addEventListener(
EConnectionStateEvents.CONNECTION_STATUS,
this.onConnectionChange as (v: CustomEvent<boolean>) => void
);
}
private stopConnectionListener(): void {
this.connectionManager.removeEventListener(
EConnectionStateEvents.CONNECTION_STATUS,
this.onConnectionChange as (v: CustomEvent<boolean>) => void
);
}
private async onConnectionChange({
detail: isConnected
}: CustomEvent<boolean>): Promise<void> {
if (!isConnected) {
this.stopKeepAlive();
return;
}
await Promise.all(this.peerIds.map((id) => this.ping(id, true)));
this.startKeepAlive();
}
private startKeepAlive(): void {
if (this.keepAliveIntervalId) {
return;
}
this.keepAliveIntervalId = setInterval(() => {
void this.peerIds.map((id) => this.ping(id));
}, this.config.keepAliveIntervalMs) as unknown as number;
}
private stopKeepAlive(): void {
if (!this.keepAliveIntervalId) {
return;
}
clearInterval(this.keepAliveIntervalId);
this.keepAliveIntervalId = undefined;
}
private startPeerConnectionListener(): void {
this.libp2p.addEventListener(
"peer:connect",
this.onPeerConnected as EventHandler<CustomEvent<PeerId | undefined>>
);
this.libp2p.addEventListener(
"peer:disconnect",
this.onPeerDisconnected as EventHandler<CustomEvent<PeerId | undefined>>
);
}
private stopPeerConnectionListener(): void {
this.libp2p.removeEventListener(
"peer:connect",
this.onPeerConnected as EventHandler<CustomEvent<PeerId | undefined>>
);
this.libp2p.removeEventListener(
"peer:disconnect",
this.onPeerDisconnected as EventHandler<CustomEvent<PeerId | undefined>>
);
}
// this method keeps track of new connections and will trigger subscribe request if needed
private async onPeerConnected(_event: CustomEvent<PeerId>): Promise<void> {
// TODO(weboko): use config.numOfUsedPeers instead of this.peers
const hasSomePeers = this.peerIds.length > 0;
if (hasSomePeers) {
return;
}
this.peerIds = this.peerManager.getPeers();
await Promise.all(this.peerIds.map((id) => this.subscribe(id)));
}
// this method keeps track of disconnects and will trigger subscribe request if needed
private async onPeerDisconnected(event: CustomEvent<PeerId>): Promise<void> {
const hasNotBeenUsed = !this.peerIds.find((id) => id.equals(event.detail));
if (hasNotBeenUsed) {
return;
}
this.peerIds = this.peerManager.getPeers();
// we trigger subscribe for peer that was used before
// it will expectedly fail and we will initiate addition of a new peer
await Promise.all(this.peerIds.map((id) => this.subscribe(id)));
}
private async subscribe(_peerId: PeerId | undefined): Promise<void> {
let peerId: PeerId | undefined = _peerId;
for (let i = 0; i < MAX_SUBSCRIBE_ATTEMPTS; i++) {
if (!peerId) {
return;
}
const response = await this.filter.subscribe(
this.pubsubTopic,
peerId,
Array.from(this.activeSubscriptions.keys())
);
if (response.success) {
return;
}
peerId = this.peerManager.requestRenew(peerId);
}
}
private async ping(
peerId: PeerId,
renewOnFirstFail: boolean = false
): Promise<void> {
const peerIdStr = peerId.toString();
const response = await this.filter.ping(peerId);
if (response.failure && renewOnFirstFail) {
const newPeer = this.peerManager.requestRenew(peerId);
await this.subscribe(newPeer);
return;
}
if (response.failure) {
const prev = this.pingFailedAttempts.get(peerIdStr) || 0;
this.pingFailedAttempts.set(peerIdStr, prev + 1);
}
if (response.success) {
this.pingFailedAttempts.set(peerIdStr, 0);
}
const madeAttempts = this.pingFailedAttempts.get(peerIdStr) || 0;
if (madeAttempts >= this.config.pingsBeforePeerRenewed) {
const newPeer = this.peerManager.requestRenew(peerId);
await this.subscribe(newPeer);
}
}
}

View File

@ -1,12 +1,12 @@
import { ConnectionManager } from "@waku/core";
import { FilterCore } from "@waku/core";
import type { Libp2p, NextFilterOptions } from "@waku/interfaces";
import type { FilterProtocolOptions, Libp2p } from "@waku/interfaces";
import { WakuMessage } from "@waku/proto";
import { PeerManager } from "../peer_manager/index.js";
export type FilterConstructorParams = {
options?: Partial<NextFilterOptions>;
options?: Partial<FilterProtocolOptions>;
libp2p: Libp2p;
peerManager: PeerManager;
connectionManager: ConnectionManager;
@ -20,6 +20,6 @@ export type SubscriptionParams = {
libp2p: Libp2p;
pubsubTopic: string;
protocol: FilterCore;
config: NextFilterOptions;
config: FilterProtocolOptions;
peerManager: PeerManager;
};

View File

@ -1,15 +1,48 @@
import { FilterProtocolOptions } from "@waku/interfaces";
export class TTLSet<T> {
private readonly ttlMs: number;
private cleanupIntervalId: number | null = null;
private readonly entryTimestamps = new Map<T, number>();
import * as C from "./constants.js";
/**
* Creates a new CustomSet with TTL functionality.
* @param ttlMs - The time-to-live in milliseconds for each entry.
* @param cleanupIntervalMs - Optional interval between cleanup operations (default: 5000ms).
*/
public constructor(ttlMs: number, cleanupIntervalMs: number = 5000) {
this.ttlMs = ttlMs;
this.startCleanupInterval(cleanupIntervalMs);
}
export const buildConfig = (
config?: Partial<FilterProtocolOptions>
): FilterProtocolOptions => {
return {
keepAliveIntervalMs: config?.keepAliveIntervalMs || C.DEFAULT_KEEP_ALIVE,
pingsBeforePeerRenewed:
config?.pingsBeforePeerRenewed || C.DEFAULT_MAX_PINGS,
enableLightPushFilterCheck:
config?.enableLightPushFilterCheck || C.DEFAULT_LIGHT_PUSH_FILTER_CHECK
};
};
public dispose(): void {
if (this.cleanupIntervalId !== null) {
clearInterval(this.cleanupIntervalId);
this.cleanupIntervalId = null;
}
this.entryTimestamps.clear();
}
public add(entry: T): this {
this.entryTimestamps.set(entry, Date.now());
return this;
}
public has(entry: T): boolean {
return this.entryTimestamps.has(entry);
}
private startCleanupInterval(intervalMs: number): void {
this.cleanupIntervalId = setInterval(() => {
this.removeExpiredEntries();
}, intervalMs) as unknown as number;
}
private removeExpiredEntries(): void {
const now = Date.now();
for (const [entry, timestamp] of this.entryTimestamps.entries()) {
if (now - timestamp > this.ttlMs) {
this.entryTimestamps.delete(entry);
}
}
}
}

View File

@ -1,259 +0,0 @@
import { ConnectionManager, FilterCore } from "@waku/core";
import type {
Callback,
NextFilterOptions as FilterOptions,
IDecodedMessage,
IDecoder,
INextFilter as IFilter,
Libp2p
} from "@waku/interfaces";
import { WakuMessage } from "@waku/proto";
import { Logger } from "@waku/utils";
import { PeerManager } from "../peer_manager/index.js";
import { Subscription } from "./subscription.js";
import { FilterConstructorParams } from "./types.js";
const log = new Logger("sdk:next-filter");
type PubsubTopic = string;
export class Filter implements IFilter {
private readonly libp2p: Libp2p;
private readonly protocol: FilterCore;
private readonly peerManager: PeerManager;
private readonly connectionManager: ConnectionManager;
private readonly config: FilterOptions;
private subscriptions = new Map<PubsubTopic, Subscription>();
public constructor(params: FilterConstructorParams) {
this.config = {
numPeersToUse: 2,
pingsBeforePeerRenewed: 3,
keepAliveIntervalMs: 60_000,
...params.options
};
this.libp2p = params.libp2p;
this.peerManager = params.peerManager;
this.connectionManager = params.connectionManager;
this.protocol = new FilterCore(
this.onIncomingMessage.bind(this),
params.connectionManager.pubsubTopics,
params.libp2p
);
}
public get multicodec(): string {
return this.protocol.multicodec;
}
/**
* Unsubscribes from all active subscriptions across all pubsub topics.
*
* @example
* // Clean up all subscriptions when React component unmounts
* useEffect(() => {
* return () => filter.unsubscribeAll();
* }, [filter]);
*
* @example
* // Reset subscriptions and start over
* filter.unsubscribeAll();
* await filter.subscribe(newDecoder, newCallback);
*/
public unsubscribeAll(): void {
for (const subscription of this.subscriptions.values()) {
subscription.stop();
}
this.subscriptions.clear();
}
/**
* Subscribes to messages with specified decoders and executes callback when a message is received.
* In case no peers available initially - will delay subscription till connects to any peer.
*
* @param decoders - Single decoder or array of decoders to subscribe to. All decoders must share the same pubsubTopic.
* @param callback - Function called when a message matching the decoder's contentTopic is received.
* @returns Promise that resolves to true if subscription was successful, false otherwise.
*
* @example
* // Subscribe to a single content topic
* await filter.subscribe(decoder, (msg) => console.log(msg));
*
* @example
* // Subscribe to multiple content topics with the same pubsub topic
* await filter.subscribe([decoder1, decoder2], (msg) => console.log(msg));
*
* @example
* // Handle subscription failure
* const success = await filter.subscribe(decoder, handleMessage);
* if (!success) {
* console.error("Failed to subscribe");
* }
*/
public async subscribe<T extends IDecodedMessage>(
decoder: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>
): Promise<boolean> {
const decoders = Array.isArray(decoder) ? decoder : [decoder];
if (decoders.length === 0) {
throw Error("Cannot subscribe with 0 decoders.");
}
const pubsubTopics = decoders.map((v) => v.pubsubTopic);
const contentTopics = decoders.map((v) => v.contentTopic);
// doing this for simplicity, we can enable subscription for more than one PubsubTopic at once later when requested
if (!this.isSamePubsubTopic(decoders)) {
throw Error(
`Cannot subscribe to more than one pubsub topic at the same time, got pubsubTopics:${pubsubTopics}`
);
}
log.info(
`Subscribing to content topic: ${contentTopics}, pubsub topic: ${pubsubTopics}`
);
const supportedPubsubTopic = this.connectionManager.pubsubTopics.includes(
pubsubTopics[0]
);
if (!supportedPubsubTopic) {
throw Error(
`Pubsub topic ${pubsubTopics[0]} has not been configured on this instance.`
);
}
let subscription = this.subscriptions.get(pubsubTopics[0]);
if (!subscription) {
subscription = new Subscription({
pubsubTopic: pubsubTopics[0],
libp2p: this.libp2p,
protocol: this.protocol,
config: this.config,
peerManager: this.peerManager
});
subscription.start();
}
const result = await subscription.add(decoders, callback);
this.subscriptions.set(pubsubTopics[0], subscription);
log.info(
`Subscription ${result ? "successful" : "failed"} for content topic: ${contentTopics}`
);
return result;
}
/**
* Unsubscribes from messages with specified decoders.
*
* @param decoders - Single decoder or array of decoders to unsubscribe from. All decoders must share the same pubsubTopic.
* @returns Promise that resolves to true if unsubscription was successful, false otherwise.
*
* @example
* // Unsubscribe from a single decoder
* await filter.unsubscribe(decoder);
*
* @example
* // Unsubscribe from multiple decoders at once
* await filter.unsubscribe([decoder1, decoder2]);
*
* @example
* // Handle unsubscription failure
* const success = await filter.unsubscribe(decoder);
* if (!success) {
* console.error("Failed to unsubscribe");
* }
*/
public async unsubscribe<T extends IDecodedMessage>(
decoder: IDecoder<T> | IDecoder<T>[]
): Promise<boolean> {
const decoders = Array.isArray(decoder) ? decoder : [decoder];
if (decoders.length === 0) {
throw Error("Cannot unsubscribe with 0 decoders.");
}
const pubsubTopics = decoders.map((v) => v.pubsubTopic);
const contentTopics = decoders.map((v) => v.contentTopic);
// doing this for simplicity, we can enable unsubscribing with more than one PubsubTopic at once later when requested
if (!this.isSamePubsubTopic(decoders)) {
throw Error(
`Cannot unsubscribe with more than one pubsub topic at the same time, got pubsubTopics:${pubsubTopics}`
);
}
log.info(
`Unsubscribing from content topic: ${contentTopics}, pubsub topic: ${pubsubTopics}`
);
const supportedPubsubTopic = this.connectionManager.pubsubTopics.includes(
pubsubTopics[0]
);
if (!supportedPubsubTopic) {
throw Error(
`Pubsub topic ${pubsubTopics[0]} has not been configured on this instance.`
);
}
const subscription = this.subscriptions.get(pubsubTopics[0]);
if (!subscription) {
log.warn("No subscriptions associated with the decoder.");
return false;
}
const result = await subscription.remove(decoders);
if (subscription.isEmpty()) {
log.warn("Subscription has no decoders anymore, terminating it.");
subscription.stop();
this.subscriptions.delete(pubsubTopics[0]);
}
log.info(
`Unsubscribing ${result ? "successful" : "failed"} for content topic: ${contentTopics}`
);
return result;
}
private async onIncomingMessage(
pubsubTopic: string,
message: WakuMessage,
peerId: string
): Promise<void> {
log.info(
`Received message for pubsubTopic:${pubsubTopic}, contentTopic:${message.contentTopic}, peerId:${peerId.toString()}`
);
const subscription = this.subscriptions.get(pubsubTopic);
if (!subscription) {
log.error(`No subscription locally registered for topic ${pubsubTopic}`);
return;
}
subscription.invoke(message, peerId);
}
private isSamePubsubTopic<T extends IDecodedMessage>(
decoders: IDecoder<T>[]
): boolean {
const topics = new Set<string>();
for (const decoder of decoders) {
topics.add(decoder.pubsubTopic);
}
return topics.size === 1;
}
}

View File

@ -1 +0,0 @@
export { Filter as NextFilter } from "./filter.js";

View File

@ -1,592 +0,0 @@
import {
type EventHandler,
type PeerId,
TypedEventEmitter
} from "@libp2p/interface";
import { FilterCore, messageHashStr } from "@waku/core";
import type {
Callback,
NextFilterOptions as FilterOptions,
IDecodedMessage,
IDecoder,
IProtoMessage,
Libp2p
} from "@waku/interfaces";
import { WakuMessage } from "@waku/proto";
import { Logger } from "@waku/utils";
import { PeerManager } from "../peer_manager/index.js";
import { SubscriptionEvents, SubscriptionParams } from "./types.js";
import { TTLSet } from "./utils.js";
const log = new Logger("sdk:filter-subscription");
type AttemptSubscribeParams = {
useNewContentTopics: boolean;
useOnlyNewPeers?: boolean;
};
type AttemptUnsubscribeParams = {
useNewContentTopics: boolean;
};
export class Subscription {
private readonly libp2p: Libp2p;
private readonly pubsubTopic: string;
private readonly protocol: FilterCore;
private readonly peerManager: PeerManager;
private readonly config: FilterOptions;
private isStarted: boolean = false;
private inProgress: boolean = false;
private peers = new Set<PeerId>();
private peerFailures = new Map<PeerId, number>();
private readonly receivedMessages = new TTLSet<string>(60_000);
private callbacks = new Map<
IDecoder<IDecodedMessage>,
EventHandler<CustomEvent<WakuMessage>>
>();
private messageEmitter = new TypedEventEmitter<SubscriptionEvents>();
private toSubscribeContentTopics = new Set<string>();
private toUnsubscribeContentTopics = new Set<string>();
private subscribeIntervalId: number | null = null;
private keepAliveIntervalId: number | null = null;
private get contentTopics(): string[] {
const allTopics = Array.from(this.callbacks.keys()).map(
(k) => k.contentTopic
);
const uniqueTopics = new Set(allTopics).values();
return Array.from(uniqueTopics);
}
public constructor(params: SubscriptionParams) {
this.config = params.config;
this.pubsubTopic = params.pubsubTopic;
this.libp2p = params.libp2p;
this.protocol = params.protocol;
this.peerManager = params.peerManager;
this.onPeerConnected = this.onPeerConnected.bind(this);
this.onPeerDisconnected = this.onPeerDisconnected.bind(this);
}
public start(): void {
log.info(`Starting subscription for pubsubTopic: ${this.pubsubTopic}`);
if (this.isStarted || this.inProgress) {
log.info("Subscription already started or in progress, skipping start");
return;
}
this.inProgress = true;
void this.attemptSubscribe({
useNewContentTopics: false
});
this.setupSubscriptionInterval();
this.setupKeepAliveInterval();
this.setupEventListeners();
this.isStarted = true;
this.inProgress = false;
log.info(`Subscription started for pubsubTopic: ${this.pubsubTopic}`);
}
public stop(): void {
log.info(`Stopping subscription for pubsubTopic: ${this.pubsubTopic}`);
if (!this.isStarted || this.inProgress) {
log.info("Subscription not started or stop in progress, skipping stop");
return;
}
this.inProgress = true;
this.disposeEventListeners();
this.disposeIntervals();
void this.disposePeers();
this.disposeHandlers();
this.receivedMessages.dispose();
this.inProgress = false;
this.isStarted = false;
log.info(`Subscription stopped for pubsubTopic: ${this.pubsubTopic}`);
}
public isEmpty(): boolean {
return this.callbacks.size === 0;
}
public async add<T extends IDecodedMessage>(
decoder: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>
): Promise<boolean> {
const decoders = Array.isArray(decoder) ? decoder : [decoder];
for (const decoder of decoders) {
this.addSingle(decoder, callback);
}
return this.toSubscribeContentTopics.size > 0
? await this.attemptSubscribe({ useNewContentTopics: true })
: true; // if content topic is not new - subscription, most likely exists
}
public async remove<T extends IDecodedMessage>(
decoder: IDecoder<T> | IDecoder<T>[]
): Promise<boolean> {
const decoders = Array.isArray(decoder) ? decoder : [decoder];
for (const decoder of decoders) {
this.removeSingle(decoder);
}
return this.toUnsubscribeContentTopics.size > 0
? await this.attemptUnsubscribe({ useNewContentTopics: true })
: true; // no need to unsubscribe if there are other decoders on the contentTopic
}
public invoke(message: WakuMessage, _peerId: string): void {
if (this.isMessageReceived(message)) {
log.info(
`Skipping invoking callbacks for already received message: pubsubTopic:${this.pubsubTopic}, peerId:${_peerId.toString()}, contentTopic:${message.contentTopic}`
);
return;
}
log.info(`Invoking message for contentTopic: ${message.contentTopic}`);
this.messageEmitter.dispatchEvent(
new CustomEvent<WakuMessage>(message.contentTopic, {
detail: message
})
);
}
private addSingle<T extends IDecodedMessage>(
decoder: IDecoder<T>,
callback: Callback<T>
): void {
log.info(`Adding subscription for contentTopic: ${decoder.contentTopic}`);
const isNewContentTopic = !this.contentTopics.includes(
decoder.contentTopic
);
if (isNewContentTopic) {
this.toSubscribeContentTopics.add(decoder.contentTopic);
}
if (this.callbacks.has(decoder)) {
log.warn(
`Replacing callback associated associated with decoder with pubsubTopic:${decoder.pubsubTopic} and contentTopic:${decoder.contentTopic}`
);
const callback = this.callbacks.get(decoder);
this.callbacks.delete(decoder);
this.messageEmitter.removeEventListener(decoder.contentTopic, callback);
}
const eventHandler = (event: CustomEvent<WakuMessage>): void => {
void (async (): Promise<void> => {
try {
const message = await decoder.fromProtoObj(
decoder.pubsubTopic,
event.detail as IProtoMessage
);
void callback(message!);
} catch (err) {
log.error("Error decoding message", err);
}
})();
};
this.callbacks.set(decoder, eventHandler);
this.messageEmitter.addEventListener(decoder.contentTopic, eventHandler);
log.info(
`Subscription added for contentTopic: ${decoder.contentTopic}, isNewContentTopic: ${isNewContentTopic}`
);
}
private removeSingle<T extends IDecodedMessage>(decoder: IDecoder<T>): void {
log.info(`Removing subscription for contentTopic: ${decoder.contentTopic}`);
const callback = this.callbacks.get(decoder);
if (!callback) {
log.warn(
`No callback associated with decoder with pubsubTopic:${decoder.pubsubTopic} and contentTopic:${decoder.contentTopic}`
);
}
this.callbacks.delete(decoder);
this.messageEmitter.removeEventListener(decoder.contentTopic, callback);
const isCompletelyRemoved = !this.contentTopics.includes(
decoder.contentTopic
);
if (isCompletelyRemoved) {
this.toUnsubscribeContentTopics.add(decoder.contentTopic);
}
log.info(
`Subscription removed for contentTopic: ${decoder.contentTopic}, isCompletelyRemoved: ${isCompletelyRemoved}`
);
}
private isMessageReceived(message: WakuMessage): boolean {
try {
const messageHash = messageHashStr(
this.pubsubTopic,
message as IProtoMessage
);
if (this.receivedMessages.has(messageHash)) {
return true;
}
this.receivedMessages.add(messageHash);
} catch (e) {
// do nothing on throw, message will be handled as not received
}
return false;
}
private setupSubscriptionInterval(): void {
const subscriptionRefreshIntervalMs = 1000;
log.info(
`Setting up subscription interval with period ${subscriptionRefreshIntervalMs}ms`
);
this.subscribeIntervalId = setInterval(() => {
const run = async (): Promise<void> => {
if (this.toSubscribeContentTopics.size > 0) {
log.info(
`Subscription interval: ${this.toSubscribeContentTopics.size} topics to subscribe`
);
void (await this.attemptSubscribe({ useNewContentTopics: true }));
}
if (this.toUnsubscribeContentTopics.size > 0) {
log.info(
`Subscription interval: ${this.toUnsubscribeContentTopics.size} topics to unsubscribe`
);
void (await this.attemptUnsubscribe({ useNewContentTopics: true }));
}
};
void run();
}, subscriptionRefreshIntervalMs) as unknown as number;
}
private setupKeepAliveInterval(): void {
log.info(
`Setting up keep-alive interval with period ${this.config.keepAliveIntervalMs}ms`
);
this.keepAliveIntervalId = setInterval(() => {
const run = async (): Promise<void> => {
log.info(`Keep-alive interval running for ${this.peers.size} peers`);
let peersToReplace = await Promise.all(
Array.from(this.peers.values()).map(
async (peer): Promise<PeerId | undefined> => {
const response = await this.protocol.ping(peer);
if (response.success) {
log.info(`Ping successful for peer: ${peer.toString()}`);
this.peerFailures.set(peer, 0);
return;
}
let failures = this.peerFailures.get(peer) || 0;
failures += 1;
this.peerFailures.set(peer, failures);
log.warn(
`Ping failed for peer: ${peer.toString()}, failures: ${failures}/${this.config.pingsBeforePeerRenewed}`
);
if (failures < this.config.pingsBeforePeerRenewed) {
return;
}
log.info(
`Peer ${peer.toString()} exceeded max failures (${this.config.pingsBeforePeerRenewed}), will be replaced`
);
return peer;
}
)
);
peersToReplace = peersToReplace.filter((p) => !!p);
await Promise.all(
peersToReplace.map((p) => {
this.peers.delete(p as PeerId);
this.peerFailures.delete(p as PeerId);
return this.requestUnsubscribe(p as PeerId, this.contentTopics);
})
);
if (peersToReplace.length > 0) {
log.info(`Replacing ${peersToReplace.length} failed peers`);
void (await this.attemptSubscribe({
useNewContentTopics: false,
useOnlyNewPeers: true
}));
}
};
void run();
}, this.config.keepAliveIntervalMs) as unknown as number;
}
private setupEventListeners(): void {
this.libp2p.addEventListener(
"peer:connect",
(e) => void this.onPeerConnected(e)
);
this.libp2p.addEventListener(
"peer:disconnect",
(e) => void this.onPeerDisconnected(e)
);
}
private disposeIntervals(): void {
if (this.subscribeIntervalId) {
clearInterval(this.subscribeIntervalId);
}
if (this.keepAliveIntervalId) {
clearInterval(this.keepAliveIntervalId);
}
}
private disposeHandlers(): void {
for (const [decoder, handler] of this.callbacks.entries()) {
this.messageEmitter.removeEventListener(decoder.contentTopic, handler);
}
this.callbacks.clear();
}
private async disposePeers(): Promise<void> {
await this.attemptUnsubscribe({ useNewContentTopics: false });
this.peers.clear();
this.peerFailures = new Map();
}
private disposeEventListeners(): void {
this.libp2p.removeEventListener("peer:connect", this.onPeerConnected);
this.libp2p.removeEventListener("peer:disconnect", this.onPeerDisconnected);
}
private onPeerConnected(event: CustomEvent<PeerId>): void {
log.info(`Peer connected: ${event.detail.toString()}`);
// skip the peer we already subscribe to
if (this.peers.has(event.detail)) {
log.info(`Peer ${event.detail.toString()} already subscribed, skipping`);
return;
}
void this.attemptSubscribe({
useNewContentTopics: false,
useOnlyNewPeers: true
});
}
private onPeerDisconnected(event: CustomEvent<PeerId>): void {
log.info(`Peer disconnected: ${event.detail.toString()}`);
// ignore as the peer is not the one that is in use
if (!this.peers.has(event.detail)) {
log.info(
`Disconnected peer ${event.detail.toString()} not in use, ignoring`
);
return;
}
log.info(
`Active peer ${event.detail.toString()} disconnected, removing from peers list`
);
this.peers.delete(event.detail);
void this.attemptSubscribe({
useNewContentTopics: false,
useOnlyNewPeers: true
});
}
private async attemptSubscribe(
params: AttemptSubscribeParams
): Promise<boolean> {
const { useNewContentTopics, useOnlyNewPeers = false } = params;
const contentTopics = useNewContentTopics
? Array.from(this.toSubscribeContentTopics)
: this.contentTopics;
log.info(
`Attempting to subscribe: useNewContentTopics=${useNewContentTopics}, useOnlyNewPeers=${useOnlyNewPeers}, contentTopics=${contentTopics.length}`
);
if (!contentTopics.length) {
log.warn("Requested content topics is an empty array, skipping");
return false;
}
const prevPeers = new Set(this.peers);
const peersToAdd = this.peerManager.getPeers();
for (const peer of peersToAdd) {
if (this.peers.size >= this.config.numPeersToUse) {
break;
}
this.peers.add(peer);
}
const peersToUse = useOnlyNewPeers
? Array.from(this.peers.values()).filter((p) => !prevPeers.has(p))
: Array.from(this.peers.values());
log.info(
`Subscribing with ${peersToUse.length} peers for ${contentTopics.length} content topics`
);
if (useOnlyNewPeers && peersToUse.length === 0) {
log.warn(`Requested to use only new peers, but no peers found, skipping`);
return false;
}
const results = await Promise.all(
peersToUse.map((p) => this.requestSubscribe(p, contentTopics))
);
const successCount = results.filter((r) => r).length;
log.info(
`Subscribe attempts completed: ${successCount}/${results.length} successful`
);
if (useNewContentTopics) {
this.toSubscribeContentTopics = new Set();
}
return results.some((v) => v);
}
private async requestSubscribe(
peerId: PeerId,
contentTopics: string[]
): Promise<boolean> {
log.info(
`requestSubscribe: pubsubTopic:${this.pubsubTopic}\tcontentTopics:${contentTopics.join(",")}`
);
if (!contentTopics.length || !this.pubsubTopic) {
log.warn(
`requestSubscribe: no contentTopics or pubsubTopic provided, not sending subscribe request`
);
return false;
}
const response = await this.protocol.subscribe(
this.pubsubTopic,
peerId,
contentTopics
);
if (response.failure) {
log.warn(
`requestSubscribe: Failed to subscribe ${this.pubsubTopic} to ${peerId.toString()} with error:${response.failure.error} for contentTopics:${contentTopics}`
);
return false;
}
log.info(
`requestSubscribe: Subscribed ${this.pubsubTopic} to ${peerId.toString()} for contentTopics:${contentTopics}`
);
return true;
}
private async attemptUnsubscribe(
params: AttemptUnsubscribeParams
): Promise<boolean> {
const { useNewContentTopics } = params;
const contentTopics = useNewContentTopics
? Array.from(this.toUnsubscribeContentTopics)
: this.contentTopics;
log.info(
`Attempting to unsubscribe: useNewContentTopics=${useNewContentTopics}, contentTopics=${contentTopics.length}`
);
if (!contentTopics.length) {
log.warn("Requested content topics is an empty array, skipping");
return false;
}
const peersToUse = Array.from(this.peers.values());
const result = await Promise.all(
peersToUse.map((p) =>
this.requestUnsubscribe(
p,
useNewContentTopics ? contentTopics : undefined
)
)
);
const successCount = result.filter((r) => r).length;
log.info(
`Unsubscribe attempts completed: ${successCount}/${result.length} successful`
);
if (useNewContentTopics) {
this.toUnsubscribeContentTopics = new Set();
}
return result.some((v) => v);
}
private async requestUnsubscribe(
peerId: PeerId,
contentTopics?: string[]
): Promise<boolean> {
const response = contentTopics
? await this.protocol.unsubscribe(this.pubsubTopic, peerId, contentTopics)
: await this.protocol.unsubscribeAll(this.pubsubTopic, peerId);
if (response.failure) {
log.warn(
`requestUnsubscribe: Failed to unsubscribe for pubsubTopic:${this.pubsubTopic} from peerId:${peerId.toString()} with error:${response.failure?.error} for contentTopics:${contentTopics}`
);
return false;
}
log.info(
`requestUnsubscribe: Unsubscribed pubsubTopic:${this.pubsubTopic} from peerId:${peerId.toString()} for contentTopics:${contentTopics}`
);
return true;
}
}

View File

@ -1,48 +0,0 @@
export class TTLSet<T> {
private readonly ttlMs: number;
private cleanupIntervalId: number | null = null;
private readonly entryTimestamps = new Map<T, number>();
/**
* Creates a new CustomSet with TTL functionality.
* @param ttlMs - The time-to-live in milliseconds for each entry.
* @param cleanupIntervalMs - Optional interval between cleanup operations (default: 5000ms).
*/
public constructor(ttlMs: number, cleanupIntervalMs: number = 5000) {
this.ttlMs = ttlMs;
this.startCleanupInterval(cleanupIntervalMs);
}
public dispose(): void {
if (this.cleanupIntervalId !== null) {
clearInterval(this.cleanupIntervalId);
this.cleanupIntervalId = null;
}
this.entryTimestamps.clear();
}
public add(entry: T): this {
this.entryTimestamps.set(entry, Date.now());
return this;
}
public has(entry: T): boolean {
return this.entryTimestamps.has(entry);
}
private startCleanupInterval(intervalMs: number): void {
this.cleanupIntervalId = setInterval(() => {
this.removeExpiredEntries();
}, intervalMs) as unknown as number;
}
private removeExpiredEntries(): void {
const now = Date.now();
for (const [entry, timestamp] of this.entryTimestamps.entries()) {
if (now - timestamp > this.ttlMs) {
this.entryTimestamps.delete(entry);
}
}
}
}

View File

@ -10,7 +10,6 @@ import type {
IEncoder,
IFilter,
ILightPush,
INextFilter,
IRelay,
IStore,
IWaku,
@ -22,7 +21,6 @@ import { DefaultNetworkConfig, Protocols } from "@waku/interfaces";
import { Logger } from "@waku/utils";
import { Filter } from "../filter/index.js";
import { NextFilter } from "../filter_next/index.js";
import { HealthIndicator } from "../health_indicator/index.js";
import { LightPush } from "../light_push/index.js";
import { PeerManager } from "../peer_manager/index.js";
@ -48,7 +46,6 @@ export class WakuNode implements IWaku {
public relay?: IRelay;
public store?: IStore;
public filter?: IFilter;
public nextFilter?: INextFilter;
public lightPush?: ILightPush;
public connectionManager: ConnectionManager;
public health: HealthIndicator;
@ -117,14 +114,6 @@ export class WakuNode implements IWaku {
if (protocolsEnabled.filter) {
this.filter = new Filter({
libp2p,
connectionManager: this.connectionManager,
peerManager: this.peerManager,
lightPush: this.lightPush,
options: options.filter
});
this.nextFilter = new NextFilter({
libp2p,
connectionManager: this.connectionManager,
peerManager: this.peerManager,
@ -192,8 +181,8 @@ export class WakuNode implements IWaku {
}
}
if (_protocols.includes(Protocols.Filter)) {
if (this.nextFilter) {
codecs.push(this.nextFilter.multicodec);
if (this.filter) {
codecs.push(this.filter.multicodec);
} else {
log.error(
"Filter codec not included in dial codec: protocol not mounted locally"

View File

@ -1,340 +0,0 @@
import { LightNode, Protocols } from "@waku/interfaces";
import { utf8ToBytes } from "@waku/sdk";
import { expect } from "chai";
import {
afterEachCustom,
beforeEachCustom,
delay,
runMultipleNodes,
ServiceNodesFleet,
teardownNodesWithRedundancy,
TEST_STRING,
TEST_TIMESTAMPS
} from "../../src/index.js";
import {
messageText,
TestContentTopic,
TestDecoder,
TestEncoder,
TestPubsubTopic,
TestShardInfo
} from "./utils.js";
const runTests = (strictCheckNodes: boolean): void => {
describe(`Waku Filter Next: FilterPush: Multiple Nodes: Strict Checking: ${strictCheckNodes}`, function () {
// Set the timeout for all tests in this suite. Can be overwritten at test level
this.timeout(10000);
let waku: LightNode;
let serviceNodes: ServiceNodesFleet;
let ctx: Mocha.Context;
beforeEachCustom(this, async () => {
ctx = this.ctx;
[serviceNodes, waku] = await runMultipleNodes(this.ctx, TestShardInfo, {
lightpush: true,
filter: true
});
});
afterEachCustom(this, async () => {
await teardownNodesWithRedundancy(serviceNodes, waku);
});
TEST_STRING.forEach((testItem) => {
it(`Check received message containing ${testItem.description}`, async function () {
await waku.nextFilter.subscribe(
TestDecoder,
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes(testItem.value)
});
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: testItem.value,
expectedContentTopic: TestContentTopic,
expectedPubsubTopic: TestPubsubTopic
});
});
});
TEST_TIMESTAMPS.forEach((testItem) => {
it(`Check received message with timestamp: ${testItem} `, async function () {
await waku.nextFilter.subscribe(
TestDecoder,
serviceNodes.messageCollector.callback
);
await delay(400);
await serviceNodes.sendRelayMessage(
{
contentTopic: TestContentTopic,
payload: Buffer.from(utf8ToBytes(messageText)).toString("base64"),
timestamp: testItem as any
},
TestPubsubTopic
);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
checkTimestamp: false,
expectedContentTopic: TestContentTopic,
expectedPubsubTopic: TestPubsubTopic
});
// Check if the timestamp matches
const timestamp = serviceNodes.messageCollector.getMessage(0).timestamp;
if (testItem == undefined) {
expect(timestamp).to.eq(undefined);
}
if (timestamp !== undefined && timestamp instanceof Date) {
expect(testItem?.toString()).to.contain(
timestamp.getTime().toString()
);
}
});
});
it("Check message with invalid timestamp is not received", async function () {
await waku.nextFilter.subscribe(
TestDecoder,
serviceNodes.messageCollector.callback
);
await delay(400);
await serviceNodes.sendRelayMessage(
{
contentTopic: TestContentTopic,
payload: Buffer.from(utf8ToBytes(messageText)).toString("base64"),
timestamp: "2023-09-06T12:05:38.609Z" as any
},
TestPubsubTopic
);
// Verify that no message was received
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
false
);
});
it("Check message on other pubsub topic is not received", async function () {
await waku.nextFilter.subscribe(
TestDecoder,
serviceNodes.messageCollector.callback
);
await delay(400);
await serviceNodes.sendRelayMessage(
{
contentTopic: TestContentTopic,
payload: Buffer.from(utf8ToBytes(messageText)).toString("base64"),
timestamp: BigInt(Date.now()) * BigInt(1000000)
},
"WrongContentTopic"
);
expect(
await serviceNodes.messageCollector.waitForMessages(1, {
pubsubTopic: TestPubsubTopic
})
).to.eq(false);
});
it("Check message with no pubsub topic is not received", async function () {
await waku.nextFilter.subscribe(
TestDecoder,
serviceNodes.messageCollector.callback
);
await delay(400);
await serviceNodes.nodes[0].restCall<boolean>(
`/relay/v1/messages/`,
"POST",
{
contentTopic: TestContentTopic,
payload: Buffer.from(utf8ToBytes(messageText)).toString("base64"),
timestamp: BigInt(Date.now()) * BigInt(1000000)
},
async (res) => res.status === 200
);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
false
);
});
it("Check message with no content topic is not received", async function () {
await waku.nextFilter.subscribe(
TestDecoder,
serviceNodes.messageCollector.callback
);
await delay(400);
await serviceNodes.sendRelayMessage(
{
payload: Buffer.from(utf8ToBytes(messageText)).toString("base64"),
timestamp: BigInt(Date.now()) * BigInt(1000000)
},
TestPubsubTopic
);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
false
);
});
it("Check message with no payload is not received", async function () {
await waku.nextFilter.subscribe(
TestDecoder,
serviceNodes.messageCollector.callback
);
await delay(400);
await serviceNodes.sendRelayMessage(
{
contentTopic: TestContentTopic,
timestamp: BigInt(Date.now()) * BigInt(1000000),
payload: undefined as any
},
TestPubsubTopic
);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
false
);
});
it("Check message with non string payload is not received", async function () {
await waku.nextFilter.subscribe(
TestDecoder,
serviceNodes.messageCollector.callback
);
await delay(400);
await serviceNodes.sendRelayMessage(
{
contentTopic: TestContentTopic,
payload: 12345 as unknown as string,
timestamp: BigInt(Date.now()) * BigInt(1000000)
},
TestPubsubTopic
);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
false
);
});
it("Check message received after jswaku node is restarted", async function () {
await waku.nextFilter.subscribe(
TestDecoder,
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
await waku.stop();
expect(waku.isStarted()).to.eq(false);
await waku.start();
expect(waku.isStarted()).to.eq(true);
for (const node of serviceNodes.nodes) {
await waku.dial(await node.getMultiaddrWithId());
await waku.waitForPeers([Protocols.Filter, Protocols.LightPush]);
}
await waku.nextFilter.subscribe(
TestDecoder,
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") });
expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: "M1",
expectedContentTopic: TestContentTopic,
expectedPubsubTopic: TestPubsubTopic
});
serviceNodes.messageCollector.verifyReceivedMessage(1, {
expectedMessageText: "M2",
expectedContentTopic: TestContentTopic,
expectedPubsubTopic: TestPubsubTopic
});
});
it("Check message received after old nwaku nodes are not available and new are created", async function () {
let callback = serviceNodes.messageCollector.callback;
await waku.nextFilter.subscribe(TestDecoder, (...args) =>
callback(...args)
);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: "M1",
expectedContentTopic: TestContentTopic,
expectedPubsubTopic: TestPubsubTopic
});
await teardownNodesWithRedundancy(serviceNodes, []);
serviceNodes = await ServiceNodesFleet.createAndRun(
ctx,
2,
false,
TestShardInfo,
{
lightpush: true,
filter: true
},
false
);
callback = serviceNodes.messageCollector.callback;
const peerConnectEvent = new Promise((resolve, reject) => {
waku.libp2p.addEventListener("peer:connect", (e) => {
resolve(e);
});
setTimeout(() => reject, 1000);
});
for (const node of serviceNodes.nodes) {
await waku.dial(await node.getMultiaddrWithId());
await waku.waitForPeers([Protocols.Filter, Protocols.LightPush]);
}
await peerConnectEvent;
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") });
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(1, {
expectedMessageText: "M2",
expectedContentTopic: TestContentTopic,
expectedPubsubTopic: TestPubsubTopic
});
});
});
};
[true, false].map(runTests);

View File

@ -1,668 +0,0 @@
import { createDecoder, createEncoder } from "@waku/core";
import { IDecodedMessage, IDecoder, LightNode } from "@waku/interfaces";
import {
ecies,
generatePrivateKey,
generateSymmetricKey,
getPublicKey,
symmetric
} from "@waku/message-encryption";
import { Protocols, utf8ToBytes } from "@waku/sdk";
import { expect } from "chai";
import {
afterEachCustom,
beforeEachCustom,
delay,
generateTestData,
makeLogFileName,
MessageCollector,
runMultipleNodes,
ServiceNode,
ServiceNodesFleet,
tearDownNodes,
teardownNodesWithRedundancy,
TEST_STRING,
waitForConnections
} from "../../src/index.js";
import {
ClusterId,
messagePayload,
messageText,
ShardIndex,
TestContentTopic,
TestDecoder,
TestEncoder,
TestPubsubTopic,
TestShardInfo
} from "./utils.js";
const runTests = (strictCheckNodes: boolean): void => {
describe(`Waku Filter Next: Subscribe: Multiple Service Nodes: Strict Check mode: ${strictCheckNodes}`, function () {
this.timeout(100000);
let waku: LightNode;
let serviceNodes: ServiceNodesFleet;
beforeEachCustom(this, async () => {
[serviceNodes, waku] = await runMultipleNodes(
this.ctx,
TestShardInfo,
undefined,
strictCheckNodes
);
});
afterEachCustom(this, async () => {
await teardownNodesWithRedundancy(serviceNodes, waku);
});
it("Subscribe and receive messages via lightPush", async function () {
expect(waku.libp2p.getConnections()).has.length(2);
await waku.nextFilter.subscribe(
TestDecoder,
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(TestEncoder, messagePayload);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: TestContentTopic
});
await serviceNodes.confirmMessageLength(1);
});
it("Subscribe and receive ecies encrypted messages via lightPush", async function () {
const privateKey = generatePrivateKey();
const publicKey = getPublicKey(privateKey);
const encoder = ecies.createEncoder({
contentTopic: TestContentTopic,
publicKey,
pubsubTopic: TestPubsubTopic
});
const decoder = ecies.createDecoder(
TestContentTopic,
privateKey,
TestPubsubTopic
);
await waku.nextFilter.subscribe(
decoder,
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(encoder, messagePayload);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: TestContentTopic,
expectedVersion: 1,
expectedPubsubTopic: TestPubsubTopic
});
await serviceNodes.confirmMessageLength(2);
});
it("Subscribe and receive symmetrically encrypted messages via lightPush", async function () {
const symKey = generateSymmetricKey();
const encoder = symmetric.createEncoder({
contentTopic: TestContentTopic,
symKey,
pubsubTopic: TestPubsubTopic
});
const decoder = symmetric.createDecoder(
TestContentTopic,
symKey,
TestPubsubTopic
);
await waku.nextFilter.subscribe(
decoder,
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(encoder, messagePayload);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: TestContentTopic,
expectedVersion: 1,
expectedPubsubTopic: TestPubsubTopic
});
await serviceNodes.confirmMessageLength(2);
});
it("Subscribe and receive messages via waku relay post", async function () {
await waku.nextFilter.subscribe(
TestDecoder,
serviceNodes.messageCollector.callback
);
await delay(400);
// Send a test message using the relay post method.
const relayMessage = ServiceNodesFleet.toMessageRpcQuery({
contentTopic: TestContentTopic,
payload: utf8ToBytes(messageText)
});
await serviceNodes.sendRelayMessage(relayMessage, TestPubsubTopic);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: TestContentTopic,
expectedPubsubTopic: TestPubsubTopic
});
await serviceNodes.confirmMessageLength(1);
});
it("Subscribe and receive 2 messages on the same topic", async function () {
await waku.nextFilter.subscribe(
TestDecoder,
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(TestEncoder, messagePayload);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: TestContentTopic
});
// Send another message on the same topic.
const newMessageText = "Filtering still works!";
await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes(newMessageText)
});
// Verify that the second message was successfully received.
expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(1, {
expectedMessageText: newMessageText,
expectedContentTopic: TestContentTopic
});
await serviceNodes.confirmMessageLength(2);
});
it("Subscribe and receive messages on 2 different content topics", async function () {
// Subscribe to the first content topic and send a message.
await waku.nextFilter.subscribe(
TestDecoder,
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(TestEncoder, messagePayload);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: TestContentTopic,
expectedPubsubTopic: TestPubsubTopic
});
// Modify subscription to include a new content topic and send a message.
const newMessageText = "Filtering still works!";
const newMessagePayload = { payload: utf8ToBytes(newMessageText) };
const newContentTopic = "/test/2/waku-filter/default";
const newEncoder = createEncoder({
contentTopic: newContentTopic,
pubsubTopic: TestPubsubTopic
});
const newDecoder = createDecoder(newContentTopic, TestPubsubTopic);
await waku.nextFilter.subscribe(
newDecoder,
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(newEncoder, {
payload: utf8ToBytes(newMessageText)
});
expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(1, {
expectedContentTopic: newContentTopic,
expectedMessageText: newMessageText,
expectedPubsubTopic: TestPubsubTopic
});
// Send another message on the initial content topic to verify it still works.
await waku.lightPush.send(TestEncoder, newMessagePayload);
expect(await serviceNodes.messageCollector.waitForMessages(3)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(2, {
expectedMessageText: newMessageText,
expectedContentTopic: TestContentTopic,
expectedPubsubTopic: TestPubsubTopic
});
await serviceNodes.confirmMessageLength(3);
});
it("Subscribe and receives messages on 20 topics", async function () {
const topicCount = 20;
const td = generateTestData(topicCount, { pubsubTopic: TestPubsubTopic });
// Subscribe to all 20 topics.
for (let i = 0; i < topicCount; i++) {
await waku.nextFilter.subscribe(
td.decoders[i],
serviceNodes.messageCollector.callback
);
}
// Send a unique message on each topic.
for (let i = 0; i < topicCount; i++) {
await waku.lightPush.send(td.encoders[i], {
payload: utf8ToBytes(`Message for Topic ${i + 1}`)
});
}
// Verify that each message was received on the corresponding topic.
expect(await serviceNodes.messageCollector.waitForMessages(20)).to.eq(
true
);
td.contentTopics.forEach((topic, index) => {
serviceNodes.messageCollector.verifyReceivedMessage(index, {
expectedContentTopic: topic,
expectedMessageText: `Message for Topic ${index + 1}`,
expectedPubsubTopic: TestPubsubTopic
});
});
});
// skip for now, will be enabled once old Filter is removed as it exausts amount of streams avaialble
it.skip("Subscribe to 30 topics in separate streams (30 streams for Filter is limit) at once and receives messages", async function () {
this.timeout(100_000);
const topicCount = 30;
const td = generateTestData(topicCount, { pubsubTopic: TestPubsubTopic });
for (let i = 0; i < topicCount; i++) {
await waku.nextFilter.subscribe(
td.decoders[i],
serviceNodes.messageCollector.callback
);
}
// Send a unique message on each topic.
for (let i = 0; i < topicCount; i++) {
await waku.lightPush.send(td.encoders[i], {
payload: utf8ToBytes(`Message for Topic ${i + 1}`)
});
}
// Verify that each message was received on the corresponding topic.
expect(
await serviceNodes.messageCollector.waitForMessages(topicCount)
).to.eq(true);
td.contentTopics.forEach((topic, index) => {
serviceNodes.messageCollector.verifyReceivedMessage(index, {
expectedContentTopic: topic,
expectedMessageText: `Message for Topic ${index + 1}`,
expectedPubsubTopic: TestPubsubTopic
});
});
});
it("Subscribe to 100 topics (new limit) at once and receives messages", async function () {
this.timeout(100_000);
const topicCount = 100;
const td = generateTestData(topicCount, { pubsubTopic: TestPubsubTopic });
await waku.nextFilter.subscribe(
td.decoders,
serviceNodes.messageCollector.callback
);
// Send a unique message on each topic.
for (let i = 0; i < topicCount; i++) {
await waku.lightPush.send(td.encoders[i], {
payload: utf8ToBytes(`Message for Topic ${i + 1}`)
});
}
// Verify that each message was received on the corresponding topic.
expect(
await serviceNodes.messageCollector.waitForMessages(topicCount)
).to.eq(true);
td.contentTopics.forEach((topic, index) => {
serviceNodes.messageCollector.verifyReceivedMessage(index, {
expectedContentTopic: topic,
expectedMessageText: `Message for Topic ${index + 1}`,
expectedPubsubTopic: TestPubsubTopic
});
});
});
it("Error when try to subscribe to more than 101 topics (new limit)", async function () {
const topicCount = 101;
const td = generateTestData(topicCount, { pubsubTopic: TestPubsubTopic });
try {
await waku.nextFilter.subscribe(
td.decoders,
serviceNodes.messageCollector.callback
);
} catch (err) {
if (
err instanceof Error &&
err.message.includes(
`exceeds maximum content topics: ${topicCount - 1}`
)
) {
return;
} else {
throw err;
}
}
});
it("Overlapping topic subscription", async function () {
// Define two sets of test data with overlapping topics.
const topicCount1 = 2;
const td1 = generateTestData(topicCount1, {
pubsubTopic: TestPubsubTopic
});
const topicCount2 = 4;
const td2 = generateTestData(topicCount2, {
pubsubTopic: TestPubsubTopic
});
await waku.nextFilter.subscribe(
td1.decoders,
serviceNodes.messageCollector.callback
);
// Subscribe to the second set of topics which has overlapping topics with the first set.
await waku.nextFilter.subscribe(
td2.decoders,
serviceNodes.messageCollector.callback
);
// Send messages to the first set of topics.
for (let i = 0; i < topicCount1; i++) {
const messageText = `Topic Set 1: Message Number: ${i + 1}`;
await waku.lightPush.send(td1.encoders[i], {
payload: utf8ToBytes(messageText)
});
}
// Send messages to the second set of topics.
for (let i = 0; i < topicCount2; i++) {
const messageText = `Topic Set 2: Message Number: ${i + 1}`;
await waku.lightPush.send(td2.encoders[i], {
payload: utf8ToBytes(messageText)
});
}
// Since there are overlapping topics, there should be 10 messages in total because overlaping decoders handle them
expect(
await serviceNodes.messageCollector.waitForMessages(10, { exact: true })
).to.eq(true);
});
it("Refresh subscription", async function () {
await waku.nextFilter.subscribe(
TestDecoder,
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
// Resubscribe (refresh) to the same topic and send another message.
await waku.nextFilter.subscribe(
TestDecoder,
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") });
// Confirm both messages were received.
expect(
await serviceNodes.messageCollector.waitForMessages(2, { exact: true })
).to.eq(true);
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: "M1",
expectedContentTopic: TestContentTopic,
expectedPubsubTopic: TestPubsubTopic
});
serviceNodes.messageCollector.verifyReceivedMessage(1, {
expectedMessageText: "M2",
expectedContentTopic: TestContentTopic,
expectedPubsubTopic: TestPubsubTopic
});
});
TEST_STRING.forEach((testItem) => {
it(`Subscribe to topic containing ${testItem.description} and receive message`, async function () {
const newContentTopic = testItem.value;
const newEncoder = waku.createEncoder({
contentTopic: newContentTopic,
shardInfo: {
clusterId: ClusterId,
shard: ShardIndex
}
});
const newDecoder = waku.createDecoder({
contentTopic: newContentTopic,
shardInfo: {
clusterId: ClusterId,
shard: ShardIndex
}
});
await waku.nextFilter.subscribe(
newDecoder as IDecoder<IDecodedMessage>,
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(newEncoder, messagePayload);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: newContentTopic,
expectedPubsubTopic: TestPubsubTopic
});
});
});
it("Add multiple subscription objects on single nwaku node", async function () {
await waku.nextFilter.subscribe(
TestDecoder,
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
const newContentTopic = "/test/2/waku-filter/default";
const newEncoder = createEncoder({
contentTopic: newContentTopic,
pubsubTopic: TestPubsubTopic
});
const newDecoder = createDecoder(newContentTopic, TestPubsubTopic);
await waku.nextFilter.subscribe(
newDecoder,
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M2") });
// Check if both messages were received
expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: "M1",
expectedContentTopic: TestContentTopic,
expectedPubsubTopic: TestPubsubTopic
});
serviceNodes.messageCollector.verifyReceivedMessage(1, {
expectedContentTopic: newContentTopic,
expectedMessageText: "M2",
expectedPubsubTopic: TestPubsubTopic
});
});
it("Renews subscription after lossing a connection", async function () {
// setup check
expect(waku.libp2p.getConnections()).has.length(2);
await waku.nextFilter.subscribe(
TestDecoder,
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(TestEncoder, messagePayload);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: TestContentTopic
});
await serviceNodes.confirmMessageLength(1);
// check renew logic
const nwakuPeers = await Promise.all(
serviceNodes.nodes.map((v) => v.getMultiaddrWithId())
);
await Promise.all(nwakuPeers.map((v) => waku.libp2p.hangUp(v)));
expect(waku.libp2p.getConnections().length).eq(0);
await Promise.all(nwakuPeers.map((v) => waku.libp2p.dial(v)));
await waitForConnections(nwakuPeers.length, waku);
const testText = "second try";
await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes(testText)
});
expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(1, {
expectedMessageText: testText,
expectedContentTopic: TestContentTopic
});
});
it("Subscribe and receive messages from 2 nwaku nodes each with different pubsubtopics", async function () {
await waku.nextFilter.subscribe(
TestDecoder,
serviceNodes.messageCollector.callback
);
// Set up and start a new nwaku node with customPubsubTopic1
const nwaku2 = new ServiceNode(makeLogFileName(this) + "3");
try {
const customContentTopic = "/test/4/waku-filter/default";
const customDecoder = createDecoder(customContentTopic, {
clusterId: ClusterId,
shard: 4
});
const customEncoder = createEncoder({
contentTopic: customContentTopic,
pubsubTopicShardInfo: { clusterId: ClusterId, shard: 4 }
});
await nwaku2.start({
filter: true,
lightpush: true,
relay: true,
clusterId: ClusterId,
shard: [4]
});
await waku.dial(await nwaku2.getMultiaddrWithId());
await waku.waitForPeers([Protocols.Filter, Protocols.LightPush]);
await nwaku2.ensureSubscriptions([customDecoder.pubsubTopic]);
const messageCollector2 = new MessageCollector();
await waku.nextFilter.subscribe(
customDecoder,
messageCollector2.callback
);
// Making sure that messages are send and reveiced for both subscriptions
// While loop is done because of https://github.com/waku-org/js-waku/issues/1606
while (
!(await serviceNodes.messageCollector.waitForMessages(1, {
pubsubTopic: TestDecoder.pubsubTopic
})) ||
!(await messageCollector2.waitForMessages(1, {
pubsubTopic: customDecoder.pubsubTopic
}))
) {
await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes("M1")
});
await waku.lightPush.send(customEncoder, {
payload: utf8ToBytes("M2")
});
}
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedContentTopic: TestDecoder.contentTopic,
expectedPubsubTopic: TestDecoder.pubsubTopic,
expectedMessageText: "M1"
});
messageCollector2.verifyReceivedMessage(0, {
expectedContentTopic: customDecoder.contentTopic,
expectedPubsubTopic: customDecoder.pubsubTopic,
expectedMessageText: "M2"
});
} catch (e) {
await tearDownNodes([nwaku2], []);
}
});
it("Should fail to subscribe with decoder with wrong shard", async function () {
const wrongDecoder = createDecoder(TestDecoder.contentTopic, {
clusterId: ClusterId,
shard: 5
});
// this subscription object is set up with the `customPubsubTopic1` but we're passing it a Decoder with the `customPubsubTopic2`
try {
await waku.nextFilter.subscribe(
wrongDecoder,
serviceNodes.messageCollector.callback
);
} catch (error) {
expect((error as Error).message).to.include(
`Pubsub topic ${wrongDecoder.pubsubTopic} has not been configured on this instance.`
);
}
});
});
};
[true, false].map((strictCheckNodes) => runTests(strictCheckNodes));

View File

@ -1,214 +0,0 @@
import { createDecoder, createEncoder } from "@waku/core";
import { type LightNode } from "@waku/interfaces";
import { utf8ToBytes } from "@waku/sdk";
import { expect } from "chai";
import {
afterEachCustom,
beforeEachCustom,
generateTestData,
runMultipleNodes,
ServiceNodesFleet,
teardownNodesWithRedundancy
} from "../../src/index.js";
import {
ClusterId,
messagePayload,
messageText,
TestContentTopic,
TestDecoder,
TestEncoder,
TestPubsubTopic
} from "./utils.js";
const runTests = (strictCheckNodes: boolean): void => {
describe(`Waku Filter Next: Unsubscribe: Multiple Nodes: Strict Checking: ${strictCheckNodes}`, function () {
// Set the timeout for all tests in this suite. Can be overwritten at test level
this.timeout(10000);
let waku: LightNode;
let serviceNodes: ServiceNodesFleet;
beforeEachCustom(this, async () => {
[serviceNodes, waku] = await runMultipleNodes(
this.ctx,
{
contentTopics: [TestContentTopic],
clusterId: ClusterId
},
{ filter: true, lightpush: true }
);
});
afterEachCustom(this, async () => {
await teardownNodesWithRedundancy(serviceNodes, waku);
});
it("Unsubscribe 1 topic - node subscribed to 1 topic", async function () {
await waku.nextFilter.subscribe(
TestDecoder,
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(TestEncoder, messagePayload);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
await waku.nextFilter.unsubscribe(TestDecoder);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq(
false
);
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: TestContentTopic
});
expect(serviceNodes.messageCollector.count).to.eq(1);
await serviceNodes.confirmMessageLength(2);
});
it("Unsubscribe 1 topic - node subscribed to 2 topics", async function () {
// Subscribe to 2 topics and send messages
await waku.nextFilter.subscribe(
TestDecoder,
serviceNodes.messageCollector.callback
);
const newContentTopic = "/test/2/waku-filter";
const newEncoder = createEncoder({
contentTopic: newContentTopic,
pubsubTopic: TestPubsubTopic
});
const newDecoder = createDecoder(newContentTopic, TestPubsubTopic);
await waku.nextFilter.subscribe(
newDecoder,
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M2") });
expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq(
true
);
// Unsubscribe from the first topic and send again
await waku.nextFilter.unsubscribe(TestDecoder);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M3") });
await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M4") });
expect(await serviceNodes.messageCollector.waitForMessages(3)).to.eq(
true
);
// Check that from 4 messages send 3 were received
expect(serviceNodes.messageCollector.count).to.eq(3);
await serviceNodes.confirmMessageLength(4);
});
it("Unsubscribe 2 topics - node subscribed to 2 topics", async function () {
// Subscribe to 2 topics and send messages
await waku.nextFilter.subscribe(
TestDecoder,
serviceNodes.messageCollector.callback
);
const newContentTopic = "/test/2/waku-filter";
const newEncoder = createEncoder({
contentTopic: newContentTopic,
pubsubTopic: TestPubsubTopic
});
const newDecoder = createDecoder(newContentTopic, TestPubsubTopic);
await waku.nextFilter.subscribe(
newDecoder,
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M2") });
expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq(
true
);
// Unsubscribe from both and send again
await waku.nextFilter.unsubscribe(TestDecoder);
await waku.nextFilter.unsubscribe(newDecoder);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M3") });
await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M4") });
expect(await serviceNodes.messageCollector.waitForMessages(3)).to.eq(
false
);
// Check that from 4 messages send 2 were received
expect(serviceNodes.messageCollector.count).to.eq(2);
await serviceNodes.confirmMessageLength(4);
});
it("Unsubscribe topics the node is not subscribed to", async function () {
// Subscribe to 1 topic and send message
await waku.nextFilter.subscribe(
TestDecoder,
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
expect(serviceNodes.messageCollector.count).to.eq(1);
// Unsubscribe from topics that the node is not not subscribed to and send again
await waku.nextFilter.unsubscribe(
createDecoder("/test/2/waku-filter", TestDecoder.pubsubTopic)
);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") });
expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq(
true
);
// Check that both messages were received
expect(serviceNodes.messageCollector.count).to.eq(2);
await serviceNodes.confirmMessageLength(2);
});
it("Unsubscribe from 100 topics (new limit) at once and receives messages", async function () {
this.timeout(100_000);
const topicCount = 100;
const td = generateTestData(topicCount, { pubsubTopic: TestPubsubTopic });
await waku.nextFilter.subscribe(
td.decoders,
serviceNodes.messageCollector.callback
);
for (let i = 0; i < topicCount; i++) {
await waku.lightPush.send(td.encoders[i], {
payload: utf8ToBytes(`Message for Topic ${i + 1}`)
});
}
expect(
await serviceNodes.messageCollector.waitForMessages(topicCount)
).to.eq(true);
td.contentTopics.forEach((topic, index) => {
serviceNodes.messageCollector.verifyReceivedMessage(index, {
expectedContentTopic: topic,
expectedMessageText: `Message for Topic ${index + 1}`,
expectedPubsubTopic: TestPubsubTopic
});
});
await waku.nextFilter.unsubscribe(td.decoders);
for (let i = 0; i < topicCount; i++) {
await waku.lightPush.send(td.encoders[i], {
payload: utf8ToBytes(`Message for Topic ${i + 1}`)
});
}
expect(serviceNodes.messageCollector.count).to.eq(100);
});
});
};
[true, false].map(runTests);

View File

@ -1,166 +0,0 @@
import { createDecoder, createEncoder } from "@waku/core";
import {
CreateNodeOptions,
DefaultNetworkConfig,
ISubscription,
IWaku,
LightNode,
NetworkConfig,
Protocols
} from "@waku/interfaces";
import { createLightNode } from "@waku/sdk";
import {
contentTopicToPubsubTopic,
contentTopicToShardIndex,
derivePubsubTopicsFromNetworkConfig,
Logger
} from "@waku/utils";
import { utf8ToBytes } from "@waku/utils/bytes";
import { Context } from "mocha";
import pRetry from "p-retry";
import {
NOISE_KEY_1,
ServiceNodesFleet,
waitForConnections
} from "../../src/index.js";
// Constants for test configuration.
export const log = new Logger("test:filter");
export const TestContentTopic = "/test/1/waku-filter/default";
export const ClusterId = 2;
export const ShardIndex = contentTopicToShardIndex(TestContentTopic);
export const TestShardInfo = {
contentTopics: [TestContentTopic],
clusterId: ClusterId
};
export const TestPubsubTopic = contentTopicToPubsubTopic(
TestContentTopic,
ClusterId
);
export const TestEncoder = createEncoder({
contentTopic: TestContentTopic,
pubsubTopic: TestPubsubTopic
});
export const TestDecoder = createDecoder(TestContentTopic, TestPubsubTopic);
export const messageText = "Filtering works!";
export const messagePayload = { payload: utf8ToBytes(messageText) };
// Utility to validate errors related to pings in the subscription.
export async function validatePingError(
subscription: ISubscription
): Promise<void> {
try {
const { failures, successes } = await subscription.ping();
if (failures.length === 0 || successes.length > 0) {
throw new Error(
"Ping was successful but was expected to fail with a specific error."
);
}
} catch (err) {
if (
err instanceof Error &&
err.message.includes("peer has no subscriptions")
) {
return;
} else {
throw err;
}
}
}
export async function runMultipleNodes(
context: Context,
networkConfig: NetworkConfig = DefaultNetworkConfig,
strictChecking: boolean = false,
numServiceNodes = 3,
withoutFilter = false
): Promise<[ServiceNodesFleet, LightNode]> {
const pubsubTopics = derivePubsubTopicsFromNetworkConfig(networkConfig);
// create numServiceNodes nodes
const serviceNodes = await ServiceNodesFleet.createAndRun(
context,
numServiceNodes,
strictChecking,
networkConfig,
undefined,
withoutFilter
);
const wakuOptions: CreateNodeOptions = {
staticNoiseKey: NOISE_KEY_1,
libp2p: {
addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] }
}
};
log.info("Starting js waku node with :", JSON.stringify(wakuOptions));
let waku: LightNode | undefined;
try {
waku = await createLightNode(wakuOptions);
await waku.start();
} catch (error) {
log.error("jswaku node failed to start:", error);
}
if (!waku) {
throw new Error("Failed to initialize waku");
}
for (const node of serviceNodes.nodes) {
await waku.dial(await node.getMultiaddrWithId());
await waku.waitForPeers([Protocols.Filter, Protocols.LightPush]);
await node.ensureSubscriptions(pubsubTopics);
const wakuConnections = waku.libp2p.getConnections();
if (wakuConnections.length < 1) {
throw new Error(`Expected at least 1 connection for js-waku.`);
}
await node.waitForLog(waku.libp2p.peerId.toString(), 100);
}
await waitForConnections(numServiceNodes, waku);
return [serviceNodes, waku];
}
export async function teardownNodesWithRedundancy(
serviceNodes: ServiceNodesFleet,
wakuNodes: IWaku | IWaku[]
): Promise<void> {
const wNodes = Array.isArray(wakuNodes) ? wakuNodes : [wakuNodes];
const stopNwakuNodes = serviceNodes.nodes.map(async (node) => {
await pRetry(
async () => {
try {
await node.stop();
} catch (error) {
log.error("Service Node failed to stop:", error);
throw error;
}
},
{ retries: 3 }
);
});
const stopWakuNodes = wNodes.map(async (waku) => {
if (waku) {
await pRetry(
async () => {
try {
await waku.stop();
} catch (error) {
log.error("Waku failed to stop:", error);
throw error;
}
},
{ retries: 3 }
);
}
});
await Promise.all([...stopNwakuNodes, ...stopWakuNodes]);
}

View File

@ -1,141 +0,0 @@
import { ISubscription, LightNode } from "@waku/interfaces";
import { utf8ToBytes } from "@waku/sdk";
import { expect } from "chai";
import {
afterEachCustom,
beforeEachCustom,
runMultipleNodes,
ServiceNodesFleet,
teardownNodesWithRedundancy
} from "../../src/index.js";
import {
TestContentTopic,
TestDecoder,
TestEncoder,
TestShardInfo,
validatePingError
} from "./utils.js";
const runTests = (strictCheckNodes: boolean): void => {
describe(`Waku Filter V2: Ping: Multiple Nodes: Strict Checking: ${strictCheckNodes}`, function () {
// Set the timeout for all tests in this suite. Can be overwritten at test level
this.timeout(10000);
let waku: LightNode;
let serviceNodes: ServiceNodesFleet;
beforeEachCustom(this, async () => {
try {
[serviceNodes, waku] = await runMultipleNodes(this.ctx, TestShardInfo, {
lightpush: true,
filter: true
});
} catch (error) {
console.error(error);
}
});
afterEachCustom(this, async () => {
await teardownNodesWithRedundancy(serviceNodes, waku);
});
it("Ping on subscribed peer", async function () {
const { error, subscription } = await waku.filter.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
if (error) {
throw error;
}
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
// If ping is successfull(node has active subscription) we receive a success status code.
await subscription.ping();
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") });
// Confirm new messages are received after a ping.
expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq(
true
);
});
it("Ping on peer without subscriptions", async function () {
const { subscription, error } = await waku.filter.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
if (error) {
throw error;
}
await subscription.unsubscribe([TestContentTopic]);
await validatePingError(subscription);
});
it("Ping on unsubscribed peer", async function () {
const { error, subscription } = await waku.filter.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
if (error) {
throw error;
}
await subscription.ping();
await subscription.unsubscribe([TestContentTopic]);
// Ping imediately after unsubscribe
await validatePingError(subscription);
});
it("Reopen subscription with peer with lost subscription", async function () {
let subscription: ISubscription;
const openSubscription = async (): Promise<void> => {
const { error, subscription: _subscription } =
await waku.filter.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
if (error) {
throw error;
}
subscription = _subscription;
};
const unsubscribe = async (): Promise<void> => {
await subscription.unsubscribe([TestContentTopic]);
};
const pingAndReinitiateSubscription = async (): Promise<void> => {
try {
await subscription.ping();
} catch (error) {
if (
error instanceof Error &&
error.message.includes("peer has no subscriptions")
) {
await openSubscription();
} else {
throw error;
}
}
};
// open subscription & ping -> should pass
await openSubscription();
await pingAndReinitiateSubscription();
// unsubscribe & ping -> should fail and reinitiate subscription
await unsubscribe();
await pingAndReinitiateSubscription();
// ping -> should pass as subscription is reinitiated
await pingAndReinitiateSubscription();
});
});
};
[true, false].map(runTests);

View File

@ -23,13 +23,15 @@ import {
} from "./utils.js";
const runTests = (strictCheckNodes: boolean): void => {
describe(`Waku Filter V2: FilterPush: Multiple Nodes: Strict Checking: ${strictCheckNodes}`, function () {
describe(`Waku Filter Next: FilterPush: Multiple Nodes: Strict Checking: ${strictCheckNodes}`, function () {
// Set the timeout for all tests in this suite. Can be overwritten at test level
this.timeout(10000);
let waku: LightNode;
let serviceNodes: ServiceNodesFleet;
let ctx: Mocha.Context;
beforeEachCustom(this, async () => {
ctx = this.ctx;
[serviceNodes, waku] = await runMultipleNodes(this.ctx, TestShardInfo, {
lightpush: true,
filter: true
@ -43,9 +45,10 @@ const runTests = (strictCheckNodes: boolean): void => {
TEST_STRING.forEach((testItem) => {
it(`Check received message containing ${testItem.description}`, async function () {
await waku.filter.subscribe(
[TestDecoder],
TestDecoder,
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes(testItem.value)
});
@ -64,7 +67,7 @@ const runTests = (strictCheckNodes: boolean): void => {
TEST_TIMESTAMPS.forEach((testItem) => {
it(`Check received message with timestamp: ${testItem} `, async function () {
await waku.filter.subscribe(
[TestDecoder],
TestDecoder,
serviceNodes.messageCollector.callback
);
await delay(400);
@ -103,7 +106,7 @@ const runTests = (strictCheckNodes: boolean): void => {
it("Check message with invalid timestamp is not received", async function () {
await waku.filter.subscribe(
[TestDecoder],
TestDecoder,
serviceNodes.messageCollector.callback
);
await delay(400);
@ -125,7 +128,7 @@ const runTests = (strictCheckNodes: boolean): void => {
it("Check message on other pubsub topic is not received", async function () {
await waku.filter.subscribe(
[TestDecoder],
TestDecoder,
serviceNodes.messageCollector.callback
);
await delay(400);
@ -148,7 +151,7 @@ const runTests = (strictCheckNodes: boolean): void => {
it("Check message with no pubsub topic is not received", async function () {
await waku.filter.subscribe(
[TestDecoder],
TestDecoder,
serviceNodes.messageCollector.callback
);
await delay(400);
@ -171,7 +174,7 @@ const runTests = (strictCheckNodes: boolean): void => {
it("Check message with no content topic is not received", async function () {
await waku.filter.subscribe(
[TestDecoder],
TestDecoder,
serviceNodes.messageCollector.callback
);
await delay(400);
@ -191,7 +194,7 @@ const runTests = (strictCheckNodes: boolean): void => {
it("Check message with no payload is not received", async function () {
await waku.filter.subscribe(
[TestDecoder],
TestDecoder,
serviceNodes.messageCollector.callback
);
await delay(400);
@ -212,7 +215,7 @@ const runTests = (strictCheckNodes: boolean): void => {
it("Check message with non string payload is not received", async function () {
await waku.filter.subscribe(
[TestDecoder],
TestDecoder,
serviceNodes.messageCollector.callback
);
await delay(400);
@ -231,11 +234,9 @@ const runTests = (strictCheckNodes: boolean): void => {
);
});
// Will be skipped until https://github.com/waku-org/js-waku/issues/1464 si done
it.skip("Check message received after jswaku node is restarted", async function () {
// Subscribe and send message
it("Check message received after jswaku node is restarted", async function () {
await waku.filter.subscribe(
[TestDecoder],
TestDecoder,
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
@ -243,26 +244,23 @@ const runTests = (strictCheckNodes: boolean): void => {
true
);
// Restart js-waku node
await waku.stop();
expect(waku.isStarted()).to.eq(false);
await waku.start();
expect(waku.isStarted()).to.eq(true);
// Redo the connection and create a new subscription
for (const node of this.serviceNodes) {
for (const node of serviceNodes.nodes) {
await waku.dial(await node.getMultiaddrWithId());
await waku.waitForPeers([Protocols.Filter, Protocols.LightPush]);
}
await waku.filter.subscribe(
[TestDecoder],
TestDecoder,
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") });
// Confirm both messages were received.
expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq(
true
);
@ -278,33 +276,56 @@ const runTests = (strictCheckNodes: boolean): void => {
});
});
// Will be skipped until https://github.com/waku-org/js-waku/issues/1464 si done
it.skip("Check message received after nwaku node is restarted", async function () {
await waku.filter.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
it("Check message received after old nwaku nodes are not available and new are created", async function () {
let callback = serviceNodes.messageCollector.callback;
await waku.filter.subscribe(TestDecoder, (...args) => callback(...args));
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
// Restart nwaku node
await teardownNodesWithRedundancy(serviceNodes, []);
await serviceNodes.start();
await waku.waitForPeers([Protocols.Filter, Protocols.LightPush]);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") });
// Confirm both messages were received.
expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: "M1",
expectedContentTopic: TestContentTopic,
expectedPubsubTopic: TestPubsubTopic
});
await teardownNodesWithRedundancy(serviceNodes, []);
serviceNodes = await ServiceNodesFleet.createAndRun(
ctx,
2,
false,
TestShardInfo,
{
lightpush: true,
filter: true
},
false
);
callback = serviceNodes.messageCollector.callback;
const peerConnectEvent = new Promise((resolve, reject) => {
waku.libp2p.addEventListener("peer:connect", (e) => {
resolve(e);
});
setTimeout(() => reject, 1000);
});
for (const node of serviceNodes.nodes) {
await waku.dial(await node.getMultiaddrWithId());
await waku.waitForPeers([Protocols.Filter, Protocols.LightPush]);
}
await peerConnectEvent;
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") });
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(1, {
expectedMessageText: "M2",
expectedContentTopic: TestContentTopic,

View File

@ -1,5 +1,5 @@
import { createDecoder, createEncoder, DecodedMessage } from "@waku/core";
import { IDecoder, LightNode } from "@waku/interfaces";
import { createDecoder, createEncoder } from "@waku/core";
import { IDecodedMessage, IDecoder, LightNode } from "@waku/interfaces";
import {
ecies,
generatePrivateKey,
@ -39,7 +39,7 @@ import {
} from "./utils.js";
const runTests = (strictCheckNodes: boolean): void => {
describe(`Waku Filter V2: Subscribe: Multiple Service Nodes: Strict Check mode: ${strictCheckNodes}`, function () {
describe(`Waku Filter Next: Subscribe: Multiple Service Nodes: Strict Check mode: ${strictCheckNodes}`, function () {
this.timeout(100000);
let waku: LightNode;
let serviceNodes: ServiceNodesFleet;
@ -61,7 +61,7 @@ const runTests = (strictCheckNodes: boolean): void => {
expect(waku.libp2p.getConnections()).has.length(2);
await waku.filter.subscribe(
[TestDecoder],
TestDecoder,
serviceNodes.messageCollector.callback
);
@ -93,7 +93,7 @@ const runTests = (strictCheckNodes: boolean): void => {
);
await waku.filter.subscribe(
[decoder],
decoder,
serviceNodes.messageCollector.callback
);
@ -126,7 +126,7 @@ const runTests = (strictCheckNodes: boolean): void => {
);
await waku.filter.subscribe(
[decoder],
decoder,
serviceNodes.messageCollector.callback
);
@ -147,7 +147,7 @@ const runTests = (strictCheckNodes: boolean): void => {
it("Subscribe and receive messages via waku relay post", async function () {
await waku.filter.subscribe(
[TestDecoder],
TestDecoder,
serviceNodes.messageCollector.callback
);
@ -174,7 +174,7 @@ const runTests = (strictCheckNodes: boolean): void => {
it("Subscribe and receive 2 messages on the same topic", async function () {
await waku.filter.subscribe(
[TestDecoder],
TestDecoder,
serviceNodes.messageCollector.callback
);
@ -209,7 +209,7 @@ const runTests = (strictCheckNodes: boolean): void => {
it("Subscribe and receive messages on 2 different content topics", async function () {
// Subscribe to the first content topic and send a message.
await waku.filter.subscribe(
[TestDecoder],
TestDecoder,
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(TestEncoder, messagePayload);
@ -232,7 +232,7 @@ const runTests = (strictCheckNodes: boolean): void => {
});
const newDecoder = createDecoder(newContentTopic, TestPubsubTopic);
await waku.filter.subscribe(
[newDecoder],
newDecoder,
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(newEncoder, {
@ -268,7 +268,7 @@ const runTests = (strictCheckNodes: boolean): void => {
// Subscribe to all 20 topics.
for (let i = 0; i < topicCount; i++) {
await waku.filter.subscribe(
[td.decoders[i]],
td.decoders[i],
serviceNodes.messageCollector.callback
);
}
@ -293,6 +293,39 @@ const runTests = (strictCheckNodes: boolean): void => {
});
});
// skiped as it fails in CI but not locally https://github.com/waku-org/js-waku/issues/2438
it.skip("Subscribe to 30 topics in separate streams (30 streams for Filter is limit) at once and receives messages", async function () {
this.timeout(100_000);
const topicCount = 30;
const td = generateTestData(topicCount, { pubsubTopic: TestPubsubTopic });
for (let i = 0; i < topicCount; i++) {
await waku.filter.subscribe(
td.decoders[i],
serviceNodes.messageCollector.callback
);
}
// Send a unique message on each topic.
for (let i = 0; i < topicCount; i++) {
await waku.lightPush.send(td.encoders[i], {
payload: utf8ToBytes(`Message for Topic ${i + 1}`)
});
}
// Verify that each message was received on the corresponding topic.
expect(
await serviceNodes.messageCollector.waitForMessages(topicCount)
).to.eq(true);
td.contentTopics.forEach((topic, index) => {
serviceNodes.messageCollector.verifyReceivedMessage(index, {
expectedContentTopic: topic,
expectedMessageText: `Message for Topic ${index + 1}`,
expectedPubsubTopic: TestPubsubTopic
});
});
});
it("Subscribe to 100 topics (new limit) at once and receives messages", async function () {
this.timeout(100_000);
const topicCount = 100;
@ -328,19 +361,10 @@ const runTests = (strictCheckNodes: boolean): void => {
const td = generateTestData(topicCount, { pubsubTopic: TestPubsubTopic });
try {
const { error, results } = await waku.filter.subscribe(
await waku.filter.subscribe(
td.decoders,
serviceNodes.messageCollector.callback
);
if (error) {
throw error;
}
const { failures, successes } = results;
if (failures.length === 0 || successes.length > 0) {
throw new Error(
`Subscribe to ${topicCount} topics was successful but was expected to fail with a specific error.`
);
}
} catch (err) {
if (
err instanceof Error &&
@ -361,12 +385,12 @@ const runTests = (strictCheckNodes: boolean): void => {
const td1 = generateTestData(topicCount1, {
pubsubTopic: TestPubsubTopic
});
const topicCount2 = 4;
const td2 = generateTestData(topicCount2, {
pubsubTopic: TestPubsubTopic
});
// Subscribe to the first set of topics.
await waku.filter.subscribe(
td1.decoders,
serviceNodes.messageCollector.callback
@ -394,23 +418,22 @@ const runTests = (strictCheckNodes: boolean): void => {
});
}
// Check if all messages were received.
// Since there are overlapping topics, there should be 6 messages in total (2 from the first set + 4 from the second set).
// Since there are overlapping topics, there should be 10 messages in total because overlaping decoders handle them
expect(
await serviceNodes.messageCollector.waitForMessages(6, { exact: true })
await serviceNodes.messageCollector.waitForMessages(10, { exact: true })
).to.eq(true);
});
it("Refresh subscription", async function () {
await waku.filter.subscribe(
[TestDecoder],
TestDecoder,
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
// Resubscribe (refresh) to the same topic and send another message.
await waku.filter.subscribe(
[TestDecoder],
TestDecoder,
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") });
@ -450,7 +473,7 @@ const runTests = (strictCheckNodes: boolean): void => {
});
await waku.filter.subscribe(
[newDecoder as IDecoder<DecodedMessage>],
newDecoder as IDecoder<IDecodedMessage>,
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(newEncoder, messagePayload);
@ -468,7 +491,7 @@ const runTests = (strictCheckNodes: boolean): void => {
it("Add multiple subscription objects on single nwaku node", async function () {
await waku.filter.subscribe(
[TestDecoder],
TestDecoder,
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
@ -480,7 +503,7 @@ const runTests = (strictCheckNodes: boolean): void => {
});
const newDecoder = createDecoder(newContentTopic, TestPubsubTopic);
await waku.filter.subscribe(
[newDecoder],
newDecoder,
serviceNodes.messageCollector.callback
);
@ -507,7 +530,7 @@ const runTests = (strictCheckNodes: boolean): void => {
expect(waku.libp2p.getConnections()).has.length(2);
await waku.filter.subscribe(
[TestDecoder],
TestDecoder,
serviceNodes.messageCollector.callback
);
@ -550,7 +573,7 @@ const runTests = (strictCheckNodes: boolean): void => {
it("Subscribe and receive messages from 2 nwaku nodes each with different pubsubtopics", async function () {
await waku.filter.subscribe(
[TestDecoder],
TestDecoder,
serviceNodes.messageCollector.callback
);
@ -582,10 +605,7 @@ const runTests = (strictCheckNodes: boolean): void => {
const messageCollector2 = new MessageCollector();
await waku.filter.subscribe(
[customDecoder],
messageCollector2.callback
);
await waku.filter.subscribe(customDecoder, messageCollector2.callback);
// Making sure that messages are send and reveiced for both subscriptions
// While loop is done because of https://github.com/waku-org/js-waku/issues/1606
@ -630,7 +650,7 @@ const runTests = (strictCheckNodes: boolean): void => {
// this subscription object is set up with the `customPubsubTopic1` but we're passing it a Decoder with the `customPubsubTopic2`
try {
await waku.filter.subscribe(
[wrongDecoder],
wrongDecoder,
serviceNodes.messageCollector.callback
);
} catch (error) {

View File

@ -23,7 +23,7 @@ import {
} from "./utils.js";
const runTests = (strictCheckNodes: boolean): void => {
describe(`Waku Filter V2: Unsubscribe: Multiple Nodes: Strict Checking: ${strictCheckNodes}`, function () {
describe(`Waku Filter Next: Unsubscribe: Multiple Nodes: Strict Checking: ${strictCheckNodes}`, function () {
// Set the timeout for all tests in this suite. Can be overwritten at test level
this.timeout(10000);
let waku: LightNode;
@ -45,26 +45,22 @@ const runTests = (strictCheckNodes: boolean): void => {
});
it("Unsubscribe 1 topic - node subscribed to 1 topic", async function () {
const { error, subscription } = await waku.filter.subscribe(
[TestDecoder],
await waku.filter.subscribe(
TestDecoder,
serviceNodes.messageCollector.callback
);
if (error) {
throw error;
}
await waku.lightPush.send(TestEncoder, messagePayload);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
// Unsubscribe from the topic and send again
await subscription.unsubscribe([TestContentTopic]);
await waku.filter.unsubscribe(TestDecoder);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq(
false
);
// Check that from 2 messages send only the 1st was received
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: TestContentTopic
@ -76,13 +72,11 @@ const runTests = (strictCheckNodes: boolean): void => {
it("Unsubscribe 1 topic - node subscribed to 2 topics", async function () {
// Subscribe to 2 topics and send messages
const { error, subscription } = await waku.filter.subscribe(
[TestDecoder],
await waku.filter.subscribe(
TestDecoder,
serviceNodes.messageCollector.callback
);
if (error) {
throw error;
}
const newContentTopic = "/test/2/waku-filter";
const newEncoder = createEncoder({
contentTopic: newContentTopic,
@ -90,7 +84,7 @@ const runTests = (strictCheckNodes: boolean): void => {
});
const newDecoder = createDecoder(newContentTopic, TestPubsubTopic);
await waku.filter.subscribe(
[newDecoder],
newDecoder,
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
@ -100,7 +94,7 @@ const runTests = (strictCheckNodes: boolean): void => {
);
// Unsubscribe from the first topic and send again
await subscription.unsubscribe([TestContentTopic]);
await waku.filter.unsubscribe(TestDecoder);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M3") });
await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M4") });
expect(await serviceNodes.messageCollector.waitForMessages(3)).to.eq(
@ -115,7 +109,7 @@ const runTests = (strictCheckNodes: boolean): void => {
it("Unsubscribe 2 topics - node subscribed to 2 topics", async function () {
// Subscribe to 2 topics and send messages
await waku.filter.subscribe(
[TestDecoder],
TestDecoder,
serviceNodes.messageCollector.callback
);
const newContentTopic = "/test/2/waku-filter";
@ -124,13 +118,11 @@ const runTests = (strictCheckNodes: boolean): void => {
pubsubTopic: TestPubsubTopic
});
const newDecoder = createDecoder(newContentTopic, TestPubsubTopic);
const { error, subscription } = await waku.filter.subscribe(
[newDecoder],
await waku.filter.subscribe(
newDecoder,
serviceNodes.messageCollector.callback
);
if (error) {
throw error;
}
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M2") });
expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq(
@ -138,7 +130,8 @@ const runTests = (strictCheckNodes: boolean): void => {
);
// Unsubscribe from both and send again
await subscription.unsubscribe([TestContentTopic, newContentTopic]);
await waku.filter.unsubscribe(TestDecoder);
await waku.filter.unsubscribe(newDecoder);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M3") });
await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M4") });
expect(await serviceNodes.messageCollector.waitForMessages(3)).to.eq(
@ -152,13 +145,11 @@ const runTests = (strictCheckNodes: boolean): void => {
it("Unsubscribe topics the node is not subscribed to", async function () {
// Subscribe to 1 topic and send message
const { error, subscription } = await waku.filter.subscribe(
[TestDecoder],
await waku.filter.subscribe(
TestDecoder,
serviceNodes.messageCollector.callback
);
if (error) {
throw error;
}
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
@ -167,8 +158,9 @@ const runTests = (strictCheckNodes: boolean): void => {
expect(serviceNodes.messageCollector.count).to.eq(1);
// Unsubscribe from topics that the node is not not subscribed to and send again
await subscription.unsubscribe([]);
await subscription.unsubscribe(["/test/2/waku-filter"]);
await waku.filter.unsubscribe(
createDecoder("/test/2/waku-filter", TestDecoder.pubsubTopic)
);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") });
expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq(
true
@ -179,66 +171,42 @@ const runTests = (strictCheckNodes: boolean): void => {
await serviceNodes.confirmMessageLength(2);
});
it("Unsubscribes all - node subscribed to 1 topic", async function () {
const { error, subscription } = await waku.filter.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
if (error) {
throw error;
}
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
expect(serviceNodes.messageCollector.count).to.eq(1);
// Unsubscribe from all topics and send again
await subscription.unsubscribeAll();
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") });
expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq(
false
);
// Check that from 2 messages send only the 1st was received
expect(serviceNodes.messageCollector.count).to.eq(1);
await serviceNodes.confirmMessageLength(2);
});
it("Unsubscribes all - node subscribed to 10 topics", async function () {
// Subscribe to 10 topics and send message
const topicCount = 10;
it("Unsubscribe from 100 topics (new limit) at once and receives messages", async function () {
this.timeout(100_000);
const topicCount = 100;
const td = generateTestData(topicCount, { pubsubTopic: TestPubsubTopic });
const { error, subscription } = await waku.filter.subscribe(
await waku.filter.subscribe(
td.decoders,
serviceNodes.messageCollector.callback
);
if (error) {
throw error;
}
for (let i = 0; i < topicCount; i++) {
await waku.lightPush.send(td.encoders[i], {
payload: utf8ToBytes(`M${i + 1}`)
payload: utf8ToBytes(`Message for Topic ${i + 1}`)
});
}
expect(await serviceNodes.messageCollector.waitForMessages(10)).to.eq(
true
);
// Unsubscribe from all topics and send again
await subscription.unsubscribeAll();
expect(
await serviceNodes.messageCollector.waitForMessages(topicCount)
).to.eq(true);
td.contentTopics.forEach((topic, index) => {
serviceNodes.messageCollector.verifyReceivedMessage(index, {
expectedContentTopic: topic,
expectedMessageText: `Message for Topic ${index + 1}`,
expectedPubsubTopic: TestPubsubTopic
});
});
await waku.filter.unsubscribe(td.decoders);
for (let i = 0; i < topicCount; i++) {
await waku.lightPush.send(td.encoders[i], {
payload: utf8ToBytes(`M${topicCount + i + 1}`)
payload: utf8ToBytes(`Message for Topic ${i + 1}`)
});
}
expect(await serviceNodes.messageCollector.waitForMessages(11)).to.eq(
false
);
// Check that from 20 messages send only 10 were received
expect(serviceNodes.messageCollector.count).to.eq(10);
await serviceNodes.confirmMessageLength(20);
expect(serviceNodes.messageCollector.count).to.eq(100);
});
});
};

View File

@ -2,7 +2,6 @@ import { createDecoder, createEncoder } from "@waku/core";
import {
CreateNodeOptions,
DefaultNetworkConfig,
ISubscription,
IWaku,
LightNode,
NetworkConfig,
@ -46,29 +45,6 @@ export const TestDecoder = createDecoder(TestContentTopic, TestPubsubTopic);
export const messageText = "Filtering works!";
export const messagePayload = { payload: utf8ToBytes(messageText) };
// Utility to validate errors related to pings in the subscription.
export async function validatePingError(
subscription: ISubscription
): Promise<void> {
try {
const { failures, successes } = await subscription.ping();
if (failures.length === 0 || successes.length > 0) {
throw new Error(
"Ping was successful but was expected to fail with a specific error."
);
}
} catch (err) {
if (
err instanceof Error &&
err.message.includes("peer has no subscriptions")
) {
return;
} else {
throw err;
}
}
}
export async function runMultipleNodes(
context: Context,
networkConfig: NetworkConfig = DefaultNetworkConfig,

View File

@ -1,3 +1,6 @@
/*
TODO(weboko): skipped until https://github.com/waku-org/js-waku/issues/2431 is resolved
import { createDecoder, createEncoder } from "@waku/core";
import { type LightNode } from "@waku/interfaces";
import { toAsyncIterator } from "@waku/utils";
@ -105,3 +108,4 @@ describe("Util: toAsyncIterator: Filter", function () {
expect(result.done).to.eq(true);
});
});
*/