feat: event based approach to Filter (#2300)

* create new filter api

* implement await on main methods on new Filter

* add info logs in new filter

* add logs to subscription impl

* remove lint supress

* add unit tests

* introduce E2E tests

* update e2e tests and add case for testing filter recovery after nwaku nodes replacement

* add new test cases for max limits and enable decoders as array on new filter

* fix edge case testing, correct test cases

* skip test

* update error message

* up text

* up text

* fix lint

* implement unsubscribeAll

* add js-dock to new filter

* add cspell

* implement TTL set for message history
This commit is contained in:
Sasha 2025-05-28 00:44:44 +02:00 committed by GitHub
parent f85a476261
commit a4dfd3455c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 3004 additions and 6 deletions

View File

@ -40,7 +40,9 @@
"Encrypters", "Encrypters",
"enr", "enr",
"enrs", "enrs",
"unsubscription",
"enrtree", "enrtree",
"unhandle",
"ephem", "ephem",
"esnext", "esnext",
"ethersproject", "ethersproject",

View File

@ -30,18 +30,45 @@ export const FilterCodecs = {
PUSH: "/vac/waku/filter-push/2.0.0-beta1" PUSH: "/vac/waku/filter-push/2.0.0-beta1"
}; };
type IncomingMessageHandler = (
pubsubTopic: PubsubTopic,
wakuMessage: WakuMessage,
peerIdStr: string
) => Promise<void>;
export class FilterCore extends BaseProtocol implements IBaseProtocolCore { export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
private static handleIncomingMessage?: IncomingMessageHandler;
public constructor( public constructor(
private handleIncomingMessage: ( handleIncomingMessage: IncomingMessageHandler,
pubsubTopic: PubsubTopic,
wakuMessage: WakuMessage,
peerIdStr: string
) => Promise<void>,
public readonly pubsubTopics: PubsubTopic[], public readonly pubsubTopics: PubsubTopic[],
libp2p: Libp2p libp2p: Libp2p
) { ) {
super(FilterCodecs.SUBSCRIBE, libp2p.components, pubsubTopics); super(FilterCodecs.SUBSCRIBE, libp2p.components, pubsubTopics);
// TODO(weboko): remove when @waku/sdk 0.0.33 is released
const prevHandler = FilterCore.handleIncomingMessage;
FilterCore.handleIncomingMessage = !prevHandler
? handleIncomingMessage
: async (pubsubTopic, message, peerIdStr): Promise<void> => {
try {
await prevHandler(pubsubTopic, message, peerIdStr);
} catch (e) {
log.error(
"Previous FilterCore incoming message handler failed ",
e
);
}
try {
await handleIncomingMessage(pubsubTopic, message, peerIdStr);
} catch (e) {
log.error("Present FilterCore incoming message handler failed ", e);
}
return;
};
libp2p libp2p
.handle(FilterCodecs.PUSH, this.onRequest.bind(this), { .handle(FilterCodecs.PUSH, this.onRequest.bind(this), {
maxInboundStreams: 100 maxInboundStreams: 100
@ -291,7 +318,7 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
return; return;
} }
await this.handleIncomingMessage( await FilterCore.handleIncomingMessage?.(
pubsubTopic, pubsubTopic,
wakuMessage, wakuMessage,
connection.remotePeer.toString() connection.remotePeer.toString()

View File

@ -0,0 +1,96 @@
import type { IDecodedMessage, IDecoder } from "./message.js";
import type { Callback } from "./protocols.js";
export type INextFilter = {
/**
* Subscribes to messages with specified decoders and executes callback when a message is received.
* In case no peers available initially - will delay subscription till connects to any peer.
*
* @param decoders - Single decoder or array of decoders to subscribe to. All decoders must share the same pubsubTopic.
* @param callback - Function called when a message matching the decoder's contentTopic is received.
* @returns Promise that resolves to true if subscription was successful, false otherwise.
*
* @example
* // Subscribe to a single content topic
* await filter.subscribe(decoder, (msg) => console.log(msg));
*
* @example
* // Subscribe to multiple content topics with the same pubsub topic
* await filter.subscribe([decoder1, decoder2], (msg) => console.log(msg));
*
* @example
* // Handle subscription failure
* const success = await filter.subscribe(decoder, handleMessage);
* if (!success) {
* console.error("Failed to subscribe");
* }
*/
subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>
): Promise<boolean>;
/**
* Unsubscribes from messages with specified decoders.
*
* @param decoders - Single decoder or array of decoders to unsubscribe from. All decoders must share the same pubsubTopic.
* @returns Promise that resolves to true if unsubscription was successful, false otherwise.
*
* @example
* // Unsubscribe from a single decoder
* await filter.unsubscribe(decoder);
*
* @example
* // Unsubscribe from multiple decoders at once
* await filter.unsubscribe([decoder1, decoder2]);
*
* @example
* // Handle unsubscription failure
* const success = await filter.unsubscribe(decoder);
* if (!success) {
* console.error("Failed to unsubscribe");
* }
*/
unsubscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[]
): Promise<boolean>;
/**
* Unsubscribes from all active subscriptions across all pubsub topics.
*
* @example
* // Clean up all subscriptions when React component unmounts
* useEffect(() => {
* return () => filter.unsubscribeAll();
* }, [filter]);
*
* @example
* // Reset subscriptions and start over
* filter.unsubscribeAll();
* await filter.subscribe(newDecoder, newCallback);
*/
unsubscribeAll(): void;
};
export type NextFilterOptions = {
/**
* Interval with which Filter subscription will attempt to send ping requests to subscribed peers.
*
* @default 60_000
*/
keepAliveIntervalMs: number;
/**
* Number of failed pings allowed to make to a remote peer before attempting to subscribe to a new one.
*
* @default 3
*/
pingsBeforePeerRenewed: number;
/**
* Number of peers to be used for establishing subscriptions.
*
* @default 2
*/
numPeersToUse: number;
};

View File

@ -1,5 +1,6 @@
export * from "./enr.js"; export * from "./enr.js";
export * from "./filter.js"; export * from "./filter.js";
export * from "./filter_next.js";
export * from "./light_push.js"; export * from "./light_push.js";
export * from "./message.js"; export * from "./message.js";
export * from "./peer_exchange.js"; export * from "./peer_exchange.js";

View File

@ -3,6 +3,7 @@ import type { MultiaddrInput } from "@multiformats/multiaddr";
import type { IConnectionManager } from "./connection_manager.js"; import type { IConnectionManager } from "./connection_manager.js";
import type { IFilter } from "./filter.js"; import type { IFilter } from "./filter.js";
import type { INextFilter } from "./filter_next.js";
import type { IHealthIndicator } from "./health_indicator.js"; import type { IHealthIndicator } from "./health_indicator.js";
import type { Libp2p } from "./libp2p.js"; import type { Libp2p } from "./libp2p.js";
import type { ILightPush } from "./light_push.js"; import type { ILightPush } from "./light_push.js";
@ -34,7 +35,13 @@ export interface IWaku {
libp2p: Libp2p; libp2p: Libp2p;
relay?: IRelay; relay?: IRelay;
store?: IStore; store?: IStore;
/**
* @deprecated use IWaku.nextFilter instead
*/
filter?: IFilter; filter?: IFilter;
nextFilter?: INextFilter;
lightPush?: ILightPush; lightPush?: ILightPush;
connectionManager: IConnectionManager; connectionManager: IConnectionManager;
health: IHealthIndicator; health: IHealthIndicator;
@ -210,6 +217,7 @@ export interface LightNode extends IWaku {
relay: undefined; relay: undefined;
store: IStore; store: IStore;
filter: IFilter; filter: IFilter;
nextFilter: INextFilter;
lightPush: ILightPush; lightPush: ILightPush;
} }

View File

@ -0,0 +1,205 @@
import { ConnectionManager, createDecoder } from "@waku/core";
import type {
IDecodedMessage,
IDecoder,
IProtoMessage,
Libp2p
} from "@waku/interfaces";
import { expect } from "chai";
import sinon from "sinon";
import { PeerManager } from "../peer_manager/index.js";
import { Filter } from "./filter.js";
import { Subscription } from "./subscription.js";
const PUBSUB_TOPIC = "/waku/2/rs/1/4";
const CONTENT_TOPIC = "/test/1/waku-filter/utf8";
describe("Filter SDK", () => {
let libp2p: Libp2p;
let filter: Filter;
let decoder: IDecoder<IDecodedMessage>;
let callback: sinon.SinonSpy;
let connectionManager: ConnectionManager;
let peerManager: PeerManager;
beforeEach(() => {
libp2p = mockLibp2p();
connectionManager = mockConnectionManager();
peerManager = mockPeerManager();
filter = mockFilter({ libp2p, connectionManager, peerManager });
decoder = createDecoder(CONTENT_TOPIC, PUBSUB_TOPIC);
callback = sinon.spy();
});
afterEach(() => {
sinon.restore();
});
it("should throw error when subscribing with unsupported pubsub topic", async () => {
const unsupportedDecoder = createDecoder(
CONTENT_TOPIC,
"/unsupported/topic"
);
try {
await filter.subscribe(unsupportedDecoder, callback);
expect.fail("Should have thrown an error");
} catch (error) {
expect((error as Error).message).to.include(
"Pubsub topic /unsupported/topic has not been configured on this instance."
);
}
});
it("should successfully subscribe to supported pubsub topic", async () => {
const addStub = sinon.stub(Subscription.prototype, "add").resolves(true);
const startStub = sinon.stub(Subscription.prototype, "start");
const result = await filter.subscribe(decoder, callback);
expect(result).to.be.true;
expect(addStub.calledOnce).to.be.true;
expect(startStub.calledOnce).to.be.true;
});
it("should throw error when unsubscribing with unsupported pubsub topic", async () => {
const unsupportedDecoder = createDecoder(
CONTENT_TOPIC,
"/unsupported/topic"
);
try {
await filter.unsubscribe(unsupportedDecoder);
expect.fail("Should have thrown an error");
} catch (error) {
expect((error as Error).message).to.include(
"Pubsub topic /unsupported/topic has not been configured on this instance."
);
}
});
it("should return false when unsubscribing from a non-existing subscription", async () => {
const result = await filter.unsubscribe(decoder);
expect(result).to.be.false;
});
it("should successfully unsubscribe from an existing subscription", async () => {
sinon.stub(Subscription.prototype, "add").resolves(true);
sinon.stub(Subscription.prototype, "start");
await filter.subscribe(decoder, callback);
const removeStub = sinon
.stub(Subscription.prototype, "remove")
.resolves(true);
const isEmptyStub = sinon
.stub(Subscription.prototype, "isEmpty")
.returns(true);
const stopStub = sinon.stub(Subscription.prototype, "stop");
const result = await filter.unsubscribe(decoder);
expect(result).to.be.true;
expect(removeStub.calledOnce).to.be.true;
expect(isEmptyStub.calledOnce).to.be.true;
expect(stopStub.calledOnce).to.be.true;
});
it("should handle incoming messages", async () => {
const subscriptionInvokeStub = sinon.stub(Subscription.prototype, "invoke");
sinon.stub(Subscription.prototype, "add").resolves(true);
await filter.subscribe(decoder, callback);
const message = createMockMessage(CONTENT_TOPIC);
const peerId = "peer1";
await (filter as any).onIncomingMessage(PUBSUB_TOPIC, message, peerId);
expect(subscriptionInvokeStub.calledOnce).to.be.true;
expect(subscriptionInvokeStub.firstCall.args[0]).to.equal(message);
expect(subscriptionInvokeStub.firstCall.args[1]).to.equal(peerId);
});
it("should successfully stop", async () => {
const decoder2 = createDecoder("/another-content-topic", PUBSUB_TOPIC);
const stopStub = sinon.stub(Subscription.prototype, "stop");
sinon.stub(Subscription.prototype, "add").resolves(true);
sinon.stub(Subscription.prototype, "start");
await filter.subscribe(decoder, callback);
await filter.subscribe(decoder2, callback);
filter.unsubscribeAll();
expect(stopStub.calledOnce).to.be.true;
const result = await filter.unsubscribe(decoder);
expect(result).to.be.false;
});
});
function mockLibp2p(): Libp2p {
return {
addEventListener: sinon.stub(),
removeEventListener: sinon.stub(),
handle: sinon.stub().resolves(),
components: {
events: {
addEventListener: sinon.stub(),
removeEventListener: sinon.stub()
},
connectionManager: {
getConnections: sinon.stub().returns([])
}
}
} as unknown as Libp2p;
}
function mockConnectionManager(): ConnectionManager {
return {
pubsubTopics: [PUBSUB_TOPIC]
} as ConnectionManager;
}
function mockPeerManager(): PeerManager {
return {
getPeers: sinon.stub().returns([])
} as unknown as PeerManager;
}
type MockFilterOptions = {
libp2p: Libp2p;
connectionManager?: ConnectionManager;
peerManager?: PeerManager;
};
function mockFilter(options: MockFilterOptions): Filter {
const filter = new Filter({
libp2p: options.libp2p,
connectionManager: options.connectionManager || mockConnectionManager(),
peerManager: options.peerManager || mockPeerManager(),
options: {
numPeersToUse: 2,
pingsBeforePeerRenewed: 3,
keepAliveIntervalMs: 60_000
}
});
// we're not actually testing FilterCore functionality here
return filter;
}
function createMockMessage(contentTopic: string): IProtoMessage {
return {
payload: new Uint8Array(),
contentTopic,
version: 0,
timestamp: BigInt(Date.now()),
meta: undefined,
rateLimitProof: undefined,
ephemeral: false
};
}

View File

@ -0,0 +1,255 @@
import { ConnectionManager, FilterCore } from "@waku/core";
import type {
Callback,
NextFilterOptions as FilterOptions,
IDecodedMessage,
IDecoder,
INextFilter as IFilter,
Libp2p
} from "@waku/interfaces";
import { WakuMessage } from "@waku/proto";
import { Logger } from "@waku/utils";
import { PeerManager } from "../peer_manager/index.js";
import { Subscription } from "./subscription.js";
import { FilterConstructorParams } from "./types.js";
const log = new Logger("sdk:next-filter");
type PubsubTopic = string;
export class Filter implements IFilter {
private readonly libp2p: Libp2p;
private readonly protocol: FilterCore;
private readonly peerManager: PeerManager;
private readonly connectionManager: ConnectionManager;
private readonly config: FilterOptions;
private subscriptions = new Map<PubsubTopic, Subscription>();
public constructor(params: FilterConstructorParams) {
this.config = {
numPeersToUse: 2,
pingsBeforePeerRenewed: 3,
keepAliveIntervalMs: 60_000,
...params.options
};
this.libp2p = params.libp2p;
this.peerManager = params.peerManager;
this.connectionManager = params.connectionManager;
this.protocol = new FilterCore(
this.onIncomingMessage.bind(this),
params.connectionManager.pubsubTopics,
params.libp2p
);
}
/**
* Unsubscribes from all active subscriptions across all pubsub topics.
*
* @example
* // Clean up all subscriptions when React component unmounts
* useEffect(() => {
* return () => filter.unsubscribeAll();
* }, [filter]);
*
* @example
* // Reset subscriptions and start over
* filter.unsubscribeAll();
* await filter.subscribe(newDecoder, newCallback);
*/
public unsubscribeAll(): void {
for (const subscription of this.subscriptions.values()) {
subscription.stop();
}
this.subscriptions.clear();
}
/**
* Subscribes to messages with specified decoders and executes callback when a message is received.
* In case no peers available initially - will delay subscription till connects to any peer.
*
* @param decoders - Single decoder or array of decoders to subscribe to. All decoders must share the same pubsubTopic.
* @param callback - Function called when a message matching the decoder's contentTopic is received.
* @returns Promise that resolves to true if subscription was successful, false otherwise.
*
* @example
* // Subscribe to a single content topic
* await filter.subscribe(decoder, (msg) => console.log(msg));
*
* @example
* // Subscribe to multiple content topics with the same pubsub topic
* await filter.subscribe([decoder1, decoder2], (msg) => console.log(msg));
*
* @example
* // Handle subscription failure
* const success = await filter.subscribe(decoder, handleMessage);
* if (!success) {
* console.error("Failed to subscribe");
* }
*/
public async subscribe<T extends IDecodedMessage>(
decoder: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>
): Promise<boolean> {
const decoders = Array.isArray(decoder) ? decoder : [decoder];
if (decoders.length === 0) {
throw Error("Cannot subscribe with 0 decoders.");
}
const pubsubTopics = decoders.map((v) => v.pubsubTopic);
const contentTopics = decoders.map((v) => v.contentTopic);
// doing this for simplicity, we can enable subscription for more than one PubsubTopic at once later when requested
if (!this.isSamePubsubTopic(decoders)) {
throw Error(
`Cannot subscribe to more than one pubsub topic at the same time, got pubsubTopics:${pubsubTopics}`
);
}
log.info(
`Subscribing to content topic: ${contentTopics}, pubsub topic: ${pubsubTopics}`
);
const supportedPubsubTopic = this.connectionManager.pubsubTopics.includes(
pubsubTopics[0]
);
if (!supportedPubsubTopic) {
throw Error(
`Pubsub topic ${pubsubTopics[0]} has not been configured on this instance.`
);
}
let subscription = this.subscriptions.get(pubsubTopics[0]);
if (!subscription) {
subscription = new Subscription({
pubsubTopic: pubsubTopics[0],
libp2p: this.libp2p,
protocol: this.protocol,
config: this.config,
peerManager: this.peerManager
});
subscription.start();
}
const result = await subscription.add(decoders, callback);
this.subscriptions.set(pubsubTopics[0], subscription);
log.info(
`Subscription ${result ? "successful" : "failed"} for content topic: ${contentTopics}`
);
return result;
}
/**
* Unsubscribes from messages with specified decoders.
*
* @param decoders - Single decoder or array of decoders to unsubscribe from. All decoders must share the same pubsubTopic.
* @returns Promise that resolves to true if unsubscription was successful, false otherwise.
*
* @example
* // Unsubscribe from a single decoder
* await filter.unsubscribe(decoder);
*
* @example
* // Unsubscribe from multiple decoders at once
* await filter.unsubscribe([decoder1, decoder2]);
*
* @example
* // Handle unsubscription failure
* const success = await filter.unsubscribe(decoder);
* if (!success) {
* console.error("Failed to unsubscribe");
* }
*/
public async unsubscribe<T extends IDecodedMessage>(
decoder: IDecoder<T> | IDecoder<T>[]
): Promise<boolean> {
const decoders = Array.isArray(decoder) ? decoder : [decoder];
if (decoders.length === 0) {
throw Error("Cannot unsubscribe with 0 decoders.");
}
const pubsubTopics = decoders.map((v) => v.pubsubTopic);
const contentTopics = decoders.map((v) => v.contentTopic);
// doing this for simplicity, we can enable unsubscribing with more than one PubsubTopic at once later when requested
if (!this.isSamePubsubTopic(decoders)) {
throw Error(
`Cannot unsubscribe with more than one pubsub topic at the same time, got pubsubTopics:${pubsubTopics}`
);
}
log.info(
`Unsubscribing from content topic: ${contentTopics}, pubsub topic: ${pubsubTopics}`
);
const supportedPubsubTopic = this.connectionManager.pubsubTopics.includes(
pubsubTopics[0]
);
if (!supportedPubsubTopic) {
throw Error(
`Pubsub topic ${pubsubTopics[0]} has not been configured on this instance.`
);
}
const subscription = this.subscriptions.get(pubsubTopics[0]);
if (!subscription) {
log.warn("No subscriptions associated with the decoder.");
return false;
}
const result = await subscription.remove(decoders);
if (subscription.isEmpty()) {
log.warn("Subscription has no decoders anymore, terminating it.");
subscription.stop();
this.subscriptions.delete(pubsubTopics[0]);
}
log.info(
`Unsubscribing ${result ? "successful" : "failed"} for content topic: ${contentTopics}`
);
return result;
}
private async onIncomingMessage(
pubsubTopic: string,
message: WakuMessage,
peerId: string
): Promise<void> {
log.info(
`Received message for pubsubTopic:${pubsubTopic}, contentTopic:${message.contentTopic}, peerId:${peerId.toString()}`
);
const subscription = this.subscriptions.get(pubsubTopic);
if (!subscription) {
log.error(`No subscription locally registered for topic ${pubsubTopic}`);
return;
}
subscription.invoke(message, peerId);
}
private isSamePubsubTopic<T extends IDecodedMessage>(
decoders: IDecoder<T>[]
): boolean {
const topics = new Set<string>();
for (const decoder of decoders) {
topics.add(decoder.pubsubTopic);
}
return topics.size === 1;
}
}

View File

@ -0,0 +1 @@
export { Filter as NextFilter } from "./filter.js";

View File

@ -0,0 +1,239 @@
import type { PeerId } from "@libp2p/interface";
import { FilterCore } from "@waku/core";
import type {
IDecodedMessage,
IDecoder,
Libp2p,
NextFilterOptions
} from "@waku/interfaces";
import { WakuMessage } from "@waku/proto";
import { expect } from "chai";
import sinon from "sinon";
import { PeerManager } from "../peer_manager/index.js";
import { Subscription } from "./subscription.js";
const PUBSUB_TOPIC = "/waku/2/rs/1/4";
const CONTENT_TOPIC = "/test/1/waku-filter/utf8";
describe("Filter Subscription", () => {
let libp2p: Libp2p;
let filterCore: FilterCore;
let peerManager: PeerManager;
let subscription: Subscription;
let decoder: IDecoder<IDecodedMessage>;
let config: NextFilterOptions;
beforeEach(() => {
libp2p = mockLibp2p();
filterCore = mockFilterCore();
peerManager = mockPeerManager();
config = {
numPeersToUse: 2,
pingsBeforePeerRenewed: 3,
keepAliveIntervalMs: 60_000
};
subscription = new Subscription({
pubsubTopic: PUBSUB_TOPIC,
libp2p,
protocol: filterCore,
config,
peerManager
});
decoder = mockDecoder();
});
afterEach(() => {
sinon.restore();
});
it("should be empty when created", () => {
expect(subscription.isEmpty()).to.be.true;
});
it("should not be empty after adding a subscription", async () => {
const attemptSubscribeSpy = sinon
.stub(subscription as any, "attemptSubscribe")
.resolves(true);
const callback = sinon.spy();
await subscription.add(decoder, callback);
expect(subscription.isEmpty()).to.be.false;
expect(attemptSubscribeSpy.calledOnce).to.be.true;
});
it("should be empty after removing the only subscription", async () => {
const attemptSubscribeSpy = sinon
.stub(subscription as any, "attemptSubscribe")
.resolves(true);
const attemptUnsubscribeSpy = sinon
.stub(subscription as any, "attemptUnsubscribe")
.resolves(true);
const callback = sinon.spy();
await subscription.add(decoder, callback);
await subscription.remove(decoder);
expect(subscription.isEmpty()).to.be.true;
expect(attemptSubscribeSpy.calledOnce).to.be.true;
expect(attemptUnsubscribeSpy.calledOnce).to.be.true;
});
it("should invoke callbacks when receiving a message", async () => {
const testContentTopic = "/custom/content/topic";
const testDecoder = {
pubsubTopic: PUBSUB_TOPIC,
contentTopic: testContentTopic,
fromProtoObj: sinon.stub().callsFake(() => {
return Promise.resolve({ payload: new Uint8Array([1, 2, 3]) });
})
};
const callback = sinon.spy();
const message = {
contentTopic: testContentTopic
} as WakuMessage;
sinon.stub(subscription as any, "attemptSubscribe").resolves(true);
await subscription.add(testDecoder as any, callback);
subscription.invoke(message, "peer1");
await new Promise((resolve) => setTimeout(resolve, 50));
expect(callback.called).to.be.true;
expect(testDecoder.fromProtoObj.called).to.be.true;
expect(callback.callCount).to.eq(1);
});
it("should invoke callbacks only when newly receiving message is given", async () => {
const testContentTopic = "/custom/content/topic";
const testDecoder = {
pubsubTopic: PUBSUB_TOPIC,
contentTopic: testContentTopic,
fromProtoObj: sinon.stub().callsFake(() => {
return Promise.resolve({ payload: new Uint8Array([1, 2, 3]) });
})
};
const callback = sinon.spy();
const message = {
contentTopic: testContentTopic
} as WakuMessage;
sinon.stub(subscription as any, "attemptSubscribe").resolves(true);
await subscription.add(testDecoder as any, callback);
subscription.invoke(message, "peer1");
await new Promise((resolve) => setTimeout(resolve, 50));
subscription.invoke(message, "peer2");
await new Promise((resolve) => setTimeout(resolve, 50));
expect(callback.called).to.be.true;
expect(testDecoder.fromProtoObj.called).to.be.true;
expect(callback.callCount).to.eq(1);
});
it("should start and setup intervals and event listeners", () => {
const attemptSubscribeSpy = sinon
.stub(subscription as any, "attemptSubscribe")
.resolves(true);
const setupSubscriptionIntervalSpy = sinon.spy(
subscription as any,
"setupSubscriptionInterval"
);
const setupKeepAliveIntervalSpy = sinon.spy(
subscription as any,
"setupKeepAliveInterval"
);
const setupEventListenersSpy = sinon.spy(
subscription as any,
"setupEventListeners"
);
subscription.start();
expect(attemptSubscribeSpy.calledOnce).to.be.true;
expect(setupSubscriptionIntervalSpy.calledOnce).to.be.true;
expect(setupKeepAliveIntervalSpy.calledOnce).to.be.true;
expect(setupEventListenersSpy.calledOnce).to.be.true;
});
it("should stop and cleanup resources", () => {
const disposeEventListenersSpy = sinon.spy(
subscription as any,
"disposeEventListeners"
);
const disposeIntervalsSpy = sinon.spy(
subscription as any,
"disposeIntervals"
);
const disposePeersSpy = sinon
.stub(subscription as any, "disposePeers")
.resolves();
const disposeHandlersSpy = sinon.spy(
subscription as any,
"disposeHandlers"
);
sinon.stub(subscription as any, "attemptSubscribe").resolves(true);
subscription.start();
subscription.stop();
expect(disposeEventListenersSpy.calledOnce).to.be.true;
expect(disposeIntervalsSpy.calledOnce).to.be.true;
expect(disposePeersSpy.calledOnce).to.be.true;
expect(disposeHandlersSpy.calledOnce).to.be.true;
});
});
function mockLibp2p(): Libp2p {
return {
addEventListener: sinon.stub(),
removeEventListener: sinon.stub(),
handle: sinon.stub().resolves(),
components: {
events: {
addEventListener: sinon.stub(),
removeEventListener: sinon.stub()
},
connectionManager: {
getConnections: sinon.stub().returns([])
}
}
} as unknown as Libp2p;
}
function mockFilterCore(): FilterCore {
return {
subscribe: sinon.stub().resolves(true),
unsubscribe: sinon.stub().resolves(true),
ping: sinon.stub().resolves(true)
} as unknown as FilterCore;
}
function mockPeerManager(): PeerManager {
return {
getPeers: sinon.stub().returns([mockPeerId("peer1"), mockPeerId("peer2")])
} as unknown as PeerManager;
}
function mockPeerId(id: string): PeerId {
return {
toString: () => id
} as unknown as PeerId;
}
function mockDecoder(): IDecoder<IDecodedMessage> {
return {
pubsubTopic: PUBSUB_TOPIC,
contentTopic: CONTENT_TOPIC,
fromProtoObj: sinon.stub().resolves(undefined)
} as unknown as IDecoder<IDecodedMessage>;
}

View File

@ -0,0 +1,593 @@
import {
type EventHandler,
type PeerId,
TypedEventEmitter
} from "@libp2p/interface";
import { FilterCore } from "@waku/core";
import type {
Callback,
NextFilterOptions as FilterOptions,
IDecodedMessage,
IDecoder,
IProtoMessage,
Libp2p
} from "@waku/interfaces";
import { messageHashStr } from "@waku/message-hash";
import { WakuMessage } from "@waku/proto";
import { Logger } from "@waku/utils";
import { PeerManager } from "../peer_manager/index.js";
import { SubscriptionEvents, SubscriptionParams } from "./types.js";
import { TTLSet } from "./utils.js";
const log = new Logger("sdk:filter-subscription");
type AttemptSubscribeParams = {
useNewContentTopics: boolean;
useOnlyNewPeers?: boolean;
};
type AttemptUnsubscribeParams = {
useNewContentTopics: boolean;
};
export class Subscription {
private readonly libp2p: Libp2p;
private readonly pubsubTopic: string;
private readonly protocol: FilterCore;
private readonly peerManager: PeerManager;
private readonly config: FilterOptions;
private isStarted: boolean = false;
private inProgress: boolean = false;
private peers = new Set<PeerId>();
private peerFailures = new Map<PeerId, number>();
private readonly receivedMessages = new TTLSet<string>(60_000);
private callbacks = new Map<
IDecoder<IDecodedMessage>,
EventHandler<CustomEvent<WakuMessage>>
>();
private messageEmitter = new TypedEventEmitter<SubscriptionEvents>();
private toSubscribeContentTopics = new Set<string>();
private toUnsubscribeContentTopics = new Set<string>();
private subscribeIntervalId: number | null = null;
private keepAliveIntervalId: number | null = null;
private get contentTopics(): string[] {
const allTopics = Array.from(this.callbacks.keys()).map(
(k) => k.contentTopic
);
const uniqueTopics = new Set(allTopics).values();
return Array.from(uniqueTopics);
}
public constructor(params: SubscriptionParams) {
this.config = params.config;
this.pubsubTopic = params.pubsubTopic;
this.libp2p = params.libp2p;
this.protocol = params.protocol;
this.peerManager = params.peerManager;
this.onPeerConnected = this.onPeerConnected.bind(this);
this.onPeerDisconnected = this.onPeerDisconnected.bind(this);
}
public start(): void {
log.info(`Starting subscription for pubsubTopic: ${this.pubsubTopic}`);
if (this.isStarted || this.inProgress) {
log.info("Subscription already started or in progress, skipping start");
return;
}
this.inProgress = true;
void this.attemptSubscribe({
useNewContentTopics: false
});
this.setupSubscriptionInterval();
this.setupKeepAliveInterval();
this.setupEventListeners();
this.isStarted = true;
this.inProgress = false;
log.info(`Subscription started for pubsubTopic: ${this.pubsubTopic}`);
}
public stop(): void {
log.info(`Stopping subscription for pubsubTopic: ${this.pubsubTopic}`);
if (!this.isStarted || this.inProgress) {
log.info("Subscription not started or stop in progress, skipping stop");
return;
}
this.inProgress = true;
this.disposeEventListeners();
this.disposeIntervals();
void this.disposePeers();
this.disposeHandlers();
this.receivedMessages.dispose();
this.inProgress = false;
this.isStarted = false;
log.info(`Subscription stopped for pubsubTopic: ${this.pubsubTopic}`);
}
public isEmpty(): boolean {
return this.callbacks.size === 0;
}
public async add<T extends IDecodedMessage>(
decoder: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>
): Promise<boolean> {
const decoders = Array.isArray(decoder) ? decoder : [decoder];
for (const decoder of decoders) {
this.addSingle(decoder, callback);
}
return this.toSubscribeContentTopics.size > 0
? await this.attemptSubscribe({ useNewContentTopics: true })
: true; // if content topic is not new - subscription, most likely exists
}
public async remove<T extends IDecodedMessage>(
decoder: IDecoder<T> | IDecoder<T>[]
): Promise<boolean> {
const decoders = Array.isArray(decoder) ? decoder : [decoder];
for (const decoder of decoders) {
this.removeSingle(decoder);
}
return this.toUnsubscribeContentTopics.size > 0
? await this.attemptUnsubscribe({ useNewContentTopics: true })
: true; // no need to unsubscribe if there are other decoders on the contentTopic
}
public invoke(message: WakuMessage, _peerId: string): void {
if (this.isMessageReceived(message)) {
log.info(
`Skipping invoking callbacks for already received message: pubsubTopic:${this.pubsubTopic}, peerId:${_peerId.toString()}, contentTopic:${message.contentTopic}`
);
return;
}
log.info(`Invoking message for contentTopic: ${message.contentTopic}`);
this.messageEmitter.dispatchEvent(
new CustomEvent<WakuMessage>(message.contentTopic, {
detail: message
})
);
}
private addSingle<T extends IDecodedMessage>(
decoder: IDecoder<T>,
callback: Callback<T>
): void {
log.info(`Adding subscription for contentTopic: ${decoder.contentTopic}`);
const isNewContentTopic = !this.contentTopics.includes(
decoder.contentTopic
);
if (isNewContentTopic) {
this.toSubscribeContentTopics.add(decoder.contentTopic);
}
if (this.callbacks.has(decoder)) {
log.warn(
`Replacing callback associated associated with decoder with pubsubTopic:${decoder.pubsubTopic} and contentTopic:${decoder.contentTopic}`
);
const callback = this.callbacks.get(decoder);
this.callbacks.delete(decoder);
this.messageEmitter.removeEventListener(decoder.contentTopic, callback);
}
const eventHandler = (event: CustomEvent<WakuMessage>): void => {
void (async (): Promise<void> => {
try {
const message = await decoder.fromProtoObj(
decoder.pubsubTopic,
event.detail as IProtoMessage
);
void callback(message!);
} catch (err) {
log.error("Error decoding message", err);
}
})();
};
this.callbacks.set(decoder, eventHandler);
this.messageEmitter.addEventListener(decoder.contentTopic, eventHandler);
log.info(
`Subscription added for contentTopic: ${decoder.contentTopic}, isNewContentTopic: ${isNewContentTopic}`
);
}
private removeSingle<T extends IDecodedMessage>(decoder: IDecoder<T>): void {
log.info(`Removing subscription for contentTopic: ${decoder.contentTopic}`);
const callback = this.callbacks.get(decoder);
if (!callback) {
log.warn(
`No callback associated with decoder with pubsubTopic:${decoder.pubsubTopic} and contentTopic:${decoder.contentTopic}`
);
}
this.callbacks.delete(decoder);
this.messageEmitter.removeEventListener(decoder.contentTopic, callback);
const isCompletelyRemoved = !this.contentTopics.includes(
decoder.contentTopic
);
if (isCompletelyRemoved) {
this.toUnsubscribeContentTopics.add(decoder.contentTopic);
}
log.info(
`Subscription removed for contentTopic: ${decoder.contentTopic}, isCompletelyRemoved: ${isCompletelyRemoved}`
);
}
private isMessageReceived(message: WakuMessage): boolean {
try {
const messageHash = messageHashStr(
this.pubsubTopic,
message as IProtoMessage
);
if (this.receivedMessages.has(messageHash)) {
return true;
}
this.receivedMessages.add(messageHash);
} catch (e) {
// do nothing on throw, message will be handled as not received
}
return false;
}
private setupSubscriptionInterval(): void {
const subscriptionRefreshIntervalMs = 1000;
log.info(
`Setting up subscription interval with period ${subscriptionRefreshIntervalMs}ms`
);
this.subscribeIntervalId = setInterval(() => {
const run = async (): Promise<void> => {
if (this.toSubscribeContentTopics.size > 0) {
log.info(
`Subscription interval: ${this.toSubscribeContentTopics.size} topics to subscribe`
);
void (await this.attemptSubscribe({ useNewContentTopics: true }));
}
if (this.toUnsubscribeContentTopics.size > 0) {
log.info(
`Subscription interval: ${this.toUnsubscribeContentTopics.size} topics to unsubscribe`
);
void (await this.attemptUnsubscribe({ useNewContentTopics: true }));
}
};
void run();
}, subscriptionRefreshIntervalMs) as unknown as number;
}
private setupKeepAliveInterval(): void {
log.info(
`Setting up keep-alive interval with period ${this.config.keepAliveIntervalMs}ms`
);
this.keepAliveIntervalId = setInterval(() => {
const run = async (): Promise<void> => {
log.info(`Keep-alive interval running for ${this.peers.size} peers`);
let peersToReplace = await Promise.all(
Array.from(this.peers.values()).map(
async (peer): Promise<PeerId | undefined> => {
const response = await this.protocol.ping(peer);
if (response.success) {
log.info(`Ping successful for peer: ${peer.toString()}`);
this.peerFailures.set(peer, 0);
return;
}
let failures = this.peerFailures.get(peer) || 0;
failures += 1;
this.peerFailures.set(peer, failures);
log.warn(
`Ping failed for peer: ${peer.toString()}, failures: ${failures}/${this.config.pingsBeforePeerRenewed}`
);
if (failures < this.config.pingsBeforePeerRenewed) {
return;
}
log.info(
`Peer ${peer.toString()} exceeded max failures (${this.config.pingsBeforePeerRenewed}), will be replaced`
);
return peer;
}
)
);
peersToReplace = peersToReplace.filter((p) => !!p);
await Promise.all(
peersToReplace.map((p) => {
this.peers.delete(p as PeerId);
this.peerFailures.delete(p as PeerId);
return this.requestUnsubscribe(p as PeerId, this.contentTopics);
})
);
if (peersToReplace.length > 0) {
log.info(`Replacing ${peersToReplace.length} failed peers`);
void (await this.attemptSubscribe({
useNewContentTopics: false,
useOnlyNewPeers: true
}));
}
};
void run();
}, this.config.keepAliveIntervalMs) as unknown as number;
}
private setupEventListeners(): void {
this.libp2p.addEventListener(
"peer:connect",
(e) => void this.onPeerConnected(e)
);
this.libp2p.addEventListener(
"peer:disconnect",
(e) => void this.onPeerDisconnected(e)
);
}
private disposeIntervals(): void {
if (this.subscribeIntervalId) {
clearInterval(this.subscribeIntervalId);
}
if (this.keepAliveIntervalId) {
clearInterval(this.keepAliveIntervalId);
}
}
private disposeHandlers(): void {
for (const [decoder, handler] of this.callbacks.entries()) {
this.messageEmitter.removeEventListener(decoder.contentTopic, handler);
}
this.callbacks.clear();
}
private async disposePeers(): Promise<void> {
await this.attemptUnsubscribe({ useNewContentTopics: false });
this.peers.clear();
this.peerFailures = new Map();
}
private disposeEventListeners(): void {
this.libp2p.removeEventListener("peer:connect", this.onPeerConnected);
this.libp2p.removeEventListener("peer:disconnect", this.onPeerDisconnected);
}
private onPeerConnected(event: CustomEvent<PeerId>): void {
log.info(`Peer connected: ${event.detail.toString()}`);
// skip the peer we already subscribe to
if (this.peers.has(event.detail)) {
log.info(`Peer ${event.detail.toString()} already subscribed, skipping`);
return;
}
void this.attemptSubscribe({
useNewContentTopics: false,
useOnlyNewPeers: true
});
}
private onPeerDisconnected(event: CustomEvent<PeerId>): void {
log.info(`Peer disconnected: ${event.detail.toString()}`);
// ignore as the peer is not the one that is in use
if (!this.peers.has(event.detail)) {
log.info(
`Disconnected peer ${event.detail.toString()} not in use, ignoring`
);
return;
}
log.info(
`Active peer ${event.detail.toString()} disconnected, removing from peers list`
);
this.peers.delete(event.detail);
void this.attemptSubscribe({
useNewContentTopics: false,
useOnlyNewPeers: true
});
}
private async attemptSubscribe(
params: AttemptSubscribeParams
): Promise<boolean> {
const { useNewContentTopics, useOnlyNewPeers = false } = params;
const contentTopics = useNewContentTopics
? Array.from(this.toSubscribeContentTopics)
: this.contentTopics;
log.info(
`Attempting to subscribe: useNewContentTopics=${useNewContentTopics}, useOnlyNewPeers=${useOnlyNewPeers}, contentTopics=${contentTopics.length}`
);
if (!contentTopics.length) {
log.warn("Requested content topics is an empty array, skipping");
return false;
}
const prevPeers = new Set(this.peers);
const peersToAdd = this.peerManager.getPeers();
for (const peer of peersToAdd) {
if (this.peers.size >= this.config.numPeersToUse) {
break;
}
this.peers.add(peer);
}
const peersToUse = useOnlyNewPeers
? Array.from(this.peers.values()).filter((p) => !prevPeers.has(p))
: Array.from(this.peers.values());
log.info(
`Subscribing with ${peersToUse.length} peers for ${contentTopics.length} content topics`
);
if (useOnlyNewPeers && peersToUse.length === 0) {
log.warn(`Requested to use only new peers, but no peers found, skipping`);
return false;
}
const results = await Promise.all(
peersToUse.map((p) => this.requestSubscribe(p, contentTopics))
);
const successCount = results.filter((r) => r).length;
log.info(
`Subscribe attempts completed: ${successCount}/${results.length} successful`
);
if (useNewContentTopics) {
this.toSubscribeContentTopics = new Set();
}
return results.some((v) => v);
}
private async requestSubscribe(
peerId: PeerId,
contentTopics: string[]
): Promise<boolean> {
log.info(
`requestSubscribe: pubsubTopic:${this.pubsubTopic}\tcontentTopics:${contentTopics.join(",")}`
);
if (!contentTopics.length || !this.pubsubTopic) {
log.warn(
`requestSubscribe: no contentTopics or pubsubTopic provided, not sending subscribe request`
);
return false;
}
const response = await this.protocol.subscribe(
this.pubsubTopic,
peerId,
contentTopics
);
if (response.failure) {
log.warn(
`requestSubscribe: Failed to subscribe ${this.pubsubTopic} to ${peerId.toString()} with error:${response.failure.error} for contentTopics:${contentTopics}`
);
return false;
}
log.info(
`requestSubscribe: Subscribed ${this.pubsubTopic} to ${peerId.toString()} for contentTopics:${contentTopics}`
);
return true;
}
private async attemptUnsubscribe(
params: AttemptUnsubscribeParams
): Promise<boolean> {
const { useNewContentTopics } = params;
const contentTopics = useNewContentTopics
? Array.from(this.toUnsubscribeContentTopics)
: this.contentTopics;
log.info(
`Attempting to unsubscribe: useNewContentTopics=${useNewContentTopics}, contentTopics=${contentTopics.length}`
);
if (!contentTopics.length) {
log.warn("Requested content topics is an empty array, skipping");
return false;
}
const peersToUse = Array.from(this.peers.values());
const result = await Promise.all(
peersToUse.map((p) =>
this.requestUnsubscribe(
p,
useNewContentTopics ? contentTopics : undefined
)
)
);
const successCount = result.filter((r) => r).length;
log.info(
`Unsubscribe attempts completed: ${successCount}/${result.length} successful`
);
if (useNewContentTopics) {
this.toUnsubscribeContentTopics = new Set();
}
return result.some((v) => v);
}
private async requestUnsubscribe(
peerId: PeerId,
contentTopics?: string[]
): Promise<boolean> {
const response = contentTopics
? await this.protocol.unsubscribe(this.pubsubTopic, peerId, contentTopics)
: await this.protocol.unsubscribeAll(this.pubsubTopic, peerId);
if (response.failure) {
log.warn(
`requestUnsubscribe: Failed to unsubscribe for pubsubTopic:${this.pubsubTopic} from peerId:${peerId.toString()} with error:${response.failure?.error} for contentTopics:${contentTopics}`
);
return false;
}
log.info(
`requestUnsubscribe: Unsubscribed pubsubTopic:${this.pubsubTopic} from peerId:${peerId.toString()} for contentTopics:${contentTopics}`
);
return true;
}
}

View File

@ -0,0 +1,25 @@
import { ConnectionManager } from "@waku/core";
import { FilterCore } from "@waku/core";
import type { Libp2p, NextFilterOptions } from "@waku/interfaces";
import { WakuMessage } from "@waku/proto";
import { PeerManager } from "../peer_manager/index.js";
export type FilterConstructorParams = {
options?: Partial<NextFilterOptions>;
libp2p: Libp2p;
peerManager: PeerManager;
connectionManager: ConnectionManager;
};
export type SubscriptionEvents = {
[contentTopic: string]: CustomEvent<WakuMessage>;
};
export type SubscriptionParams = {
libp2p: Libp2p;
pubsubTopic: string;
protocol: FilterCore;
config: NextFilterOptions;
peerManager: PeerManager;
};

View File

@ -0,0 +1,100 @@
import { expect } from "chai";
import sinon from "sinon";
import { TTLSet } from "./utils.js";
describe("TTLSet", () => {
let clock: sinon.SinonFakeTimers;
beforeEach(() => {
clock = sinon.useFakeTimers();
});
afterEach(() => {
clock.restore();
sinon.restore();
});
it("should add and check entries correctly", () => {
const ttlSet = new TTLSet<string>(60_000);
ttlSet.add("test-entry");
expect(ttlSet.has("test-entry")).to.be.true;
expect(ttlSet.has("non-existent-entry")).to.be.false;
});
it("should support chaining for add method", () => {
const ttlSet = new TTLSet<string>(60_000);
ttlSet.add("entry1").add("entry2");
expect(ttlSet.has("entry1")).to.be.true;
expect(ttlSet.has("entry2")).to.be.true;
});
it("should remove expired entries after TTL has passed", () => {
const ttlSet = new TTLSet<string>(1_000, 500);
ttlSet.add("expiring-entry");
expect(ttlSet.has("expiring-entry")).to.be.true;
clock.tick(1_500);
expect(ttlSet.has("expiring-entry")).to.be.false;
});
it("should keep entries that haven't expired yet", () => {
const ttlSet = new TTLSet<string>(2_000, 500);
ttlSet.add("entry");
expect(ttlSet.has("entry")).to.be.true;
clock.tick(1000);
expect(ttlSet.has("entry")).to.be.true;
});
it("should handle different types of entries", () => {
const numberSet = new TTLSet<number>(60_000);
numberSet.add(123);
expect(numberSet.has(123)).to.be.true;
expect(numberSet.has(456)).to.be.false;
const objectSet = new TTLSet<object>(60_000);
const obj1 = { id: 1 };
const obj2 = { id: 2 };
objectSet.add(obj1);
expect(objectSet.has(obj1)).to.be.true;
expect(objectSet.has(obj2)).to.be.false;
});
it("should properly clean up resources when disposed", () => {
const ttlSet = new TTLSet<string>(60_000);
const clearIntervalSpy = sinon.spy(global, "clearInterval");
ttlSet.add("test-entry");
ttlSet.dispose();
expect(clearIntervalSpy.called).to.be.true;
expect(ttlSet.has("test-entry")).to.be.false;
});
it("should continually clean up expired entries at intervals", () => {
const ttlSet = new TTLSet<string>(1_000, 500);
ttlSet.add("entry1");
clock.tick(750);
expect(ttlSet.has("entry1")).to.be.true;
ttlSet.add("entry2");
clock.tick(750);
expect(ttlSet.has("entry1")).to.be.false;
expect(ttlSet.has("entry2")).to.be.true;
clock.tick(750);
expect(ttlSet.has("entry2")).to.be.false;
});
});

View File

@ -0,0 +1,48 @@
export class TTLSet<T> {
private readonly ttlMs: number;
private cleanupIntervalId: number | null = null;
private readonly entryTimestamps = new Map<T, number>();
/**
* Creates a new CustomSet with TTL functionality.
* @param ttlMs - The time-to-live in milliseconds for each entry.
* @param cleanupIntervalMs - Optional interval between cleanup operations (default: 5000ms).
*/
public constructor(ttlMs: number, cleanupIntervalMs: number = 5000) {
this.ttlMs = ttlMs;
this.startCleanupInterval(cleanupIntervalMs);
}
public dispose(): void {
if (this.cleanupIntervalId !== null) {
clearInterval(this.cleanupIntervalId);
this.cleanupIntervalId = null;
}
this.entryTimestamps.clear();
}
public add(entry: T): this {
this.entryTimestamps.set(entry, Date.now());
return this;
}
public has(entry: T): boolean {
return this.entryTimestamps.has(entry);
}
private startCleanupInterval(intervalMs: number): void {
this.cleanupIntervalId = setInterval(() => {
this.removeExpiredEntries();
}, intervalMs) as unknown as number;
}
private removeExpiredEntries(): void {
const now = Date.now();
for (const [entry, timestamp] of this.entryTimestamps.entries()) {
if (now - timestamp > this.ttlMs) {
this.entryTimestamps.delete(entry);
}
}
}
}

View File

@ -15,6 +15,7 @@ import type {
IEncoder, IEncoder,
IFilter, IFilter,
ILightPush, ILightPush,
INextFilter,
IRelay, IRelay,
IStore, IStore,
IWaku, IWaku,
@ -26,6 +27,7 @@ import { DefaultNetworkConfig, Protocols } from "@waku/interfaces";
import { Logger } from "@waku/utils"; import { Logger } from "@waku/utils";
import { Filter } from "../filter/index.js"; import { Filter } from "../filter/index.js";
import { NextFilter } from "../filter_next/index.js";
import { HealthIndicator } from "../health_indicator/index.js"; import { HealthIndicator } from "../health_indicator/index.js";
import { LightPush } from "../light_push/index.js"; import { LightPush } from "../light_push/index.js";
import { PeerManager } from "../peer_manager/index.js"; import { PeerManager } from "../peer_manager/index.js";
@ -51,6 +53,7 @@ export class WakuNode implements IWaku {
public relay?: IRelay; public relay?: IRelay;
public store?: IStore; public store?: IStore;
public filter?: IFilter; public filter?: IFilter;
public nextFilter?: INextFilter;
public lightPush?: ILightPush; public lightPush?: ILightPush;
public connectionManager: ConnectionManager; public connectionManager: ConnectionManager;
public health: HealthIndicator; public health: HealthIndicator;
@ -135,6 +138,13 @@ export class WakuNode implements IWaku {
lightPush: this.lightPush, lightPush: this.lightPush,
options: options.filter options: options.filter
}); });
this.nextFilter = new NextFilter({
libp2p,
connectionManager: this.connectionManager,
peerManager: this.peerManager,
options: options.filter
});
} }
log.info( log.info(

View File

@ -0,0 +1,340 @@
import { LightNode, Protocols } from "@waku/interfaces";
import { utf8ToBytes } from "@waku/sdk";
import { expect } from "chai";
import {
afterEachCustom,
beforeEachCustom,
delay,
runMultipleNodes,
ServiceNodesFleet,
teardownNodesWithRedundancy,
TEST_STRING,
TEST_TIMESTAMPS
} from "../../src/index.js";
import {
messageText,
TestContentTopic,
TestDecoder,
TestEncoder,
TestPubsubTopic,
TestShardInfo
} from "./utils.js";
const runTests = (strictCheckNodes: boolean): void => {
describe(`Waku Filter Next: FilterPush: Multiple Nodes: Strict Checking: ${strictCheckNodes}`, function () {
// Set the timeout for all tests in this suite. Can be overwritten at test level
this.timeout(10000);
let waku: LightNode;
let serviceNodes: ServiceNodesFleet;
let ctx: Mocha.Context;
beforeEachCustom(this, async () => {
ctx = this.ctx;
[serviceNodes, waku] = await runMultipleNodes(this.ctx, TestShardInfo, {
lightpush: true,
filter: true
});
});
afterEachCustom(this, async () => {
await teardownNodesWithRedundancy(serviceNodes, waku);
});
TEST_STRING.forEach((testItem) => {
it(`Check received message containing ${testItem.description}`, async function () {
await waku.nextFilter.subscribe(
TestDecoder,
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes(testItem.value)
});
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: testItem.value,
expectedContentTopic: TestContentTopic,
expectedPubsubTopic: TestPubsubTopic
});
});
});
TEST_TIMESTAMPS.forEach((testItem) => {
it(`Check received message with timestamp: ${testItem} `, async function () {
await waku.nextFilter.subscribe(
TestDecoder,
serviceNodes.messageCollector.callback
);
await delay(400);
await serviceNodes.sendRelayMessage(
{
contentTopic: TestContentTopic,
payload: Buffer.from(utf8ToBytes(messageText)).toString("base64"),
timestamp: testItem as any
},
TestPubsubTopic
);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
checkTimestamp: false,
expectedContentTopic: TestContentTopic,
expectedPubsubTopic: TestPubsubTopic
});
// Check if the timestamp matches
const timestamp = serviceNodes.messageCollector.getMessage(0).timestamp;
if (testItem == undefined) {
expect(timestamp).to.eq(undefined);
}
if (timestamp !== undefined && timestamp instanceof Date) {
expect(testItem?.toString()).to.contain(
timestamp.getTime().toString()
);
}
});
});
it("Check message with invalid timestamp is not received", async function () {
await waku.nextFilter.subscribe(
TestDecoder,
serviceNodes.messageCollector.callback
);
await delay(400);
await serviceNodes.sendRelayMessage(
{
contentTopic: TestContentTopic,
payload: Buffer.from(utf8ToBytes(messageText)).toString("base64"),
timestamp: "2023-09-06T12:05:38.609Z" as any
},
TestPubsubTopic
);
// Verify that no message was received
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
false
);
});
it("Check message on other pubsub topic is not received", async function () {
await waku.nextFilter.subscribe(
TestDecoder,
serviceNodes.messageCollector.callback
);
await delay(400);
await serviceNodes.sendRelayMessage(
{
contentTopic: TestContentTopic,
payload: Buffer.from(utf8ToBytes(messageText)).toString("base64"),
timestamp: BigInt(Date.now()) * BigInt(1000000)
},
"WrongContentTopic"
);
expect(
await serviceNodes.messageCollector.waitForMessages(1, {
pubsubTopic: TestPubsubTopic
})
).to.eq(false);
});
it("Check message with no pubsub topic is not received", async function () {
await waku.nextFilter.subscribe(
TestDecoder,
serviceNodes.messageCollector.callback
);
await delay(400);
await serviceNodes.nodes[0].restCall<boolean>(
`/relay/v1/messages/`,
"POST",
{
contentTopic: TestContentTopic,
payload: Buffer.from(utf8ToBytes(messageText)).toString("base64"),
timestamp: BigInt(Date.now()) * BigInt(1000000)
},
async (res) => res.status === 200
);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
false
);
});
it("Check message with no content topic is not received", async function () {
await waku.nextFilter.subscribe(
TestDecoder,
serviceNodes.messageCollector.callback
);
await delay(400);
await serviceNodes.sendRelayMessage(
{
payload: Buffer.from(utf8ToBytes(messageText)).toString("base64"),
timestamp: BigInt(Date.now()) * BigInt(1000000)
},
TestPubsubTopic
);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
false
);
});
it("Check message with no payload is not received", async function () {
await waku.nextFilter.subscribe(
TestDecoder,
serviceNodes.messageCollector.callback
);
await delay(400);
await serviceNodes.sendRelayMessage(
{
contentTopic: TestContentTopic,
timestamp: BigInt(Date.now()) * BigInt(1000000),
payload: undefined as any
},
TestPubsubTopic
);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
false
);
});
it("Check message with non string payload is not received", async function () {
await waku.nextFilter.subscribe(
TestDecoder,
serviceNodes.messageCollector.callback
);
await delay(400);
await serviceNodes.sendRelayMessage(
{
contentTopic: TestContentTopic,
payload: 12345 as unknown as string,
timestamp: BigInt(Date.now()) * BigInt(1000000)
},
TestPubsubTopic
);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
false
);
});
it("Check message received after jswaku node is restarted", async function () {
await waku.nextFilter.subscribe(
TestDecoder,
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
await waku.stop();
expect(waku.isStarted()).to.eq(false);
await waku.start();
expect(waku.isStarted()).to.eq(true);
for (const node of serviceNodes.nodes) {
await waku.dial(await node.getMultiaddrWithId());
await waku.waitForPeers([Protocols.Filter, Protocols.LightPush]);
}
await waku.nextFilter.subscribe(
TestDecoder,
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") });
expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: "M1",
expectedContentTopic: TestContentTopic,
expectedPubsubTopic: TestPubsubTopic
});
serviceNodes.messageCollector.verifyReceivedMessage(1, {
expectedMessageText: "M2",
expectedContentTopic: TestContentTopic,
expectedPubsubTopic: TestPubsubTopic
});
});
it("Check message received after old nwaku nodes are not available and new are created", async function () {
let callback = serviceNodes.messageCollector.callback;
await waku.nextFilter.subscribe(TestDecoder, (...args) =>
callback(...args)
);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: "M1",
expectedContentTopic: TestContentTopic,
expectedPubsubTopic: TestPubsubTopic
});
await teardownNodesWithRedundancy(serviceNodes, []);
serviceNodes = await ServiceNodesFleet.createAndRun(
ctx,
2,
false,
TestShardInfo,
{
lightpush: true,
filter: true
},
false
);
callback = serviceNodes.messageCollector.callback;
const peerConnectEvent = new Promise((resolve, reject) => {
waku.libp2p.addEventListener("peer:connect", (e) => {
resolve(e);
});
setTimeout(() => reject, 1000);
});
for (const node of serviceNodes.nodes) {
await waku.dial(await node.getMultiaddrWithId());
await waku.waitForPeers([Protocols.Filter, Protocols.LightPush]);
}
await peerConnectEvent;
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") });
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(1, {
expectedMessageText: "M2",
expectedContentTopic: TestContentTopic,
expectedPubsubTopic: TestPubsubTopic
});
});
});
};
[true, false].map(runTests);

View File

@ -0,0 +1,668 @@
import { createDecoder, createEncoder, DecodedMessage } from "@waku/core";
import { IDecoder, LightNode } from "@waku/interfaces";
import {
ecies,
generatePrivateKey,
generateSymmetricKey,
getPublicKey,
symmetric
} from "@waku/message-encryption";
import { Protocols, utf8ToBytes } from "@waku/sdk";
import { expect } from "chai";
import {
afterEachCustom,
beforeEachCustom,
delay,
generateTestData,
makeLogFileName,
MessageCollector,
runMultipleNodes,
ServiceNode,
ServiceNodesFleet,
tearDownNodes,
teardownNodesWithRedundancy,
TEST_STRING,
waitForConnections
} from "../../src/index.js";
import {
ClusterId,
messagePayload,
messageText,
ShardIndex,
TestContentTopic,
TestDecoder,
TestEncoder,
TestPubsubTopic,
TestShardInfo
} from "./utils.js";
const runTests = (strictCheckNodes: boolean): void => {
describe(`Waku Filter Next: Subscribe: Multiple Service Nodes: Strict Check mode: ${strictCheckNodes}`, function () {
this.timeout(100000);
let waku: LightNode;
let serviceNodes: ServiceNodesFleet;
beforeEachCustom(this, async () => {
[serviceNodes, waku] = await runMultipleNodes(
this.ctx,
TestShardInfo,
undefined,
strictCheckNodes
);
});
afterEachCustom(this, async () => {
await teardownNodesWithRedundancy(serviceNodes, waku);
});
it("Subscribe and receive messages via lightPush", async function () {
expect(waku.libp2p.getConnections()).has.length(2);
await waku.nextFilter.subscribe(
TestDecoder,
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(TestEncoder, messagePayload);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: TestContentTopic
});
await serviceNodes.confirmMessageLength(1);
});
it("Subscribe and receive ecies encrypted messages via lightPush", async function () {
const privateKey = generatePrivateKey();
const publicKey = getPublicKey(privateKey);
const encoder = ecies.createEncoder({
contentTopic: TestContentTopic,
publicKey,
pubsubTopic: TestPubsubTopic
});
const decoder = ecies.createDecoder(
TestContentTopic,
privateKey,
TestPubsubTopic
);
await waku.nextFilter.subscribe(
decoder,
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(encoder, messagePayload);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: TestContentTopic,
expectedVersion: 1,
expectedPubsubTopic: TestPubsubTopic
});
await serviceNodes.confirmMessageLength(2);
});
it("Subscribe and receive symmetrically encrypted messages via lightPush", async function () {
const symKey = generateSymmetricKey();
const encoder = symmetric.createEncoder({
contentTopic: TestContentTopic,
symKey,
pubsubTopic: TestPubsubTopic
});
const decoder = symmetric.createDecoder(
TestContentTopic,
symKey,
TestPubsubTopic
);
await waku.nextFilter.subscribe(
decoder,
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(encoder, messagePayload);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: TestContentTopic,
expectedVersion: 1,
expectedPubsubTopic: TestPubsubTopic
});
await serviceNodes.confirmMessageLength(2);
});
it("Subscribe and receive messages via waku relay post", async function () {
await waku.nextFilter.subscribe(
TestDecoder,
serviceNodes.messageCollector.callback
);
await delay(400);
// Send a test message using the relay post method.
const relayMessage = ServiceNodesFleet.toMessageRpcQuery({
contentTopic: TestContentTopic,
payload: utf8ToBytes(messageText)
});
await serviceNodes.sendRelayMessage(relayMessage, TestPubsubTopic);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: TestContentTopic,
expectedPubsubTopic: TestPubsubTopic
});
await serviceNodes.confirmMessageLength(1);
});
it("Subscribe and receive 2 messages on the same topic", async function () {
await waku.nextFilter.subscribe(
TestDecoder,
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(TestEncoder, messagePayload);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: TestContentTopic
});
// Send another message on the same topic.
const newMessageText = "Filtering still works!";
await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes(newMessageText)
});
// Verify that the second message was successfully received.
expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(1, {
expectedMessageText: newMessageText,
expectedContentTopic: TestContentTopic
});
await serviceNodes.confirmMessageLength(2);
});
it("Subscribe and receive messages on 2 different content topics", async function () {
// Subscribe to the first content topic and send a message.
await waku.nextFilter.subscribe(
TestDecoder,
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(TestEncoder, messagePayload);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: TestContentTopic,
expectedPubsubTopic: TestPubsubTopic
});
// Modify subscription to include a new content topic and send a message.
const newMessageText = "Filtering still works!";
const newMessagePayload = { payload: utf8ToBytes(newMessageText) };
const newContentTopic = "/test/2/waku-filter/default";
const newEncoder = createEncoder({
contentTopic: newContentTopic,
pubsubTopic: TestPubsubTopic
});
const newDecoder = createDecoder(newContentTopic, TestPubsubTopic);
await waku.nextFilter.subscribe(
newDecoder,
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(newEncoder, {
payload: utf8ToBytes(newMessageText)
});
expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(1, {
expectedContentTopic: newContentTopic,
expectedMessageText: newMessageText,
expectedPubsubTopic: TestPubsubTopic
});
// Send another message on the initial content topic to verify it still works.
await waku.lightPush.send(TestEncoder, newMessagePayload);
expect(await serviceNodes.messageCollector.waitForMessages(3)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(2, {
expectedMessageText: newMessageText,
expectedContentTopic: TestContentTopic,
expectedPubsubTopic: TestPubsubTopic
});
await serviceNodes.confirmMessageLength(3);
});
it("Subscribe and receives messages on 20 topics", async function () {
const topicCount = 20;
const td = generateTestData(topicCount, { pubsubTopic: TestPubsubTopic });
// Subscribe to all 20 topics.
for (let i = 0; i < topicCount; i++) {
await waku.nextFilter.subscribe(
td.decoders[i],
serviceNodes.messageCollector.callback
);
}
// Send a unique message on each topic.
for (let i = 0; i < topicCount; i++) {
await waku.lightPush.send(td.encoders[i], {
payload: utf8ToBytes(`Message for Topic ${i + 1}`)
});
}
// Verify that each message was received on the corresponding topic.
expect(await serviceNodes.messageCollector.waitForMessages(20)).to.eq(
true
);
td.contentTopics.forEach((topic, index) => {
serviceNodes.messageCollector.verifyReceivedMessage(index, {
expectedContentTopic: topic,
expectedMessageText: `Message for Topic ${index + 1}`,
expectedPubsubTopic: TestPubsubTopic
});
});
});
// skip for now, will be enabled once old Filter is removed as it exausts amount of streams avaialble
it.skip("Subscribe to 30 topics in separate streams (30 streams for Filter is limit) at once and receives messages", async function () {
this.timeout(100_000);
const topicCount = 30;
const td = generateTestData(topicCount, { pubsubTopic: TestPubsubTopic });
for (let i = 0; i < topicCount; i++) {
await waku.nextFilter.subscribe(
td.decoders[i],
serviceNodes.messageCollector.callback
);
}
// Send a unique message on each topic.
for (let i = 0; i < topicCount; i++) {
await waku.lightPush.send(td.encoders[i], {
payload: utf8ToBytes(`Message for Topic ${i + 1}`)
});
}
// Verify that each message was received on the corresponding topic.
expect(
await serviceNodes.messageCollector.waitForMessages(topicCount)
).to.eq(true);
td.contentTopics.forEach((topic, index) => {
serviceNodes.messageCollector.verifyReceivedMessage(index, {
expectedContentTopic: topic,
expectedMessageText: `Message for Topic ${index + 1}`,
expectedPubsubTopic: TestPubsubTopic
});
});
});
it("Subscribe to 100 topics (new limit) at once and receives messages", async function () {
this.timeout(100_000);
const topicCount = 100;
const td = generateTestData(topicCount, { pubsubTopic: TestPubsubTopic });
await waku.nextFilter.subscribe(
td.decoders,
serviceNodes.messageCollector.callback
);
// Send a unique message on each topic.
for (let i = 0; i < topicCount; i++) {
await waku.lightPush.send(td.encoders[i], {
payload: utf8ToBytes(`Message for Topic ${i + 1}`)
});
}
// Verify that each message was received on the corresponding topic.
expect(
await serviceNodes.messageCollector.waitForMessages(topicCount)
).to.eq(true);
td.contentTopics.forEach((topic, index) => {
serviceNodes.messageCollector.verifyReceivedMessage(index, {
expectedContentTopic: topic,
expectedMessageText: `Message for Topic ${index + 1}`,
expectedPubsubTopic: TestPubsubTopic
});
});
});
it("Error when try to subscribe to more than 101 topics (new limit)", async function () {
const topicCount = 101;
const td = generateTestData(topicCount, { pubsubTopic: TestPubsubTopic });
try {
await waku.nextFilter.subscribe(
td.decoders,
serviceNodes.messageCollector.callback
);
} catch (err) {
if (
err instanceof Error &&
err.message.includes(
`exceeds maximum content topics: ${topicCount - 1}`
)
) {
return;
} else {
throw err;
}
}
});
it("Overlapping topic subscription", async function () {
// Define two sets of test data with overlapping topics.
const topicCount1 = 2;
const td1 = generateTestData(topicCount1, {
pubsubTopic: TestPubsubTopic
});
const topicCount2 = 4;
const td2 = generateTestData(topicCount2, {
pubsubTopic: TestPubsubTopic
});
await waku.nextFilter.subscribe(
td1.decoders,
serviceNodes.messageCollector.callback
);
// Subscribe to the second set of topics which has overlapping topics with the first set.
await waku.nextFilter.subscribe(
td2.decoders,
serviceNodes.messageCollector.callback
);
// Send messages to the first set of topics.
for (let i = 0; i < topicCount1; i++) {
const messageText = `Topic Set 1: Message Number: ${i + 1}`;
await waku.lightPush.send(td1.encoders[i], {
payload: utf8ToBytes(messageText)
});
}
// Send messages to the second set of topics.
for (let i = 0; i < topicCount2; i++) {
const messageText = `Topic Set 2: Message Number: ${i + 1}`;
await waku.lightPush.send(td2.encoders[i], {
payload: utf8ToBytes(messageText)
});
}
// Since there are overlapping topics, there should be 10 messages in total because overlaping decoders handle them
expect(
await serviceNodes.messageCollector.waitForMessages(10, { exact: true })
).to.eq(true);
});
it("Refresh subscription", async function () {
await waku.nextFilter.subscribe(
TestDecoder,
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
// Resubscribe (refresh) to the same topic and send another message.
await waku.nextFilter.subscribe(
TestDecoder,
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") });
// Confirm both messages were received.
expect(
await serviceNodes.messageCollector.waitForMessages(2, { exact: true })
).to.eq(true);
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: "M1",
expectedContentTopic: TestContentTopic,
expectedPubsubTopic: TestPubsubTopic
});
serviceNodes.messageCollector.verifyReceivedMessage(1, {
expectedMessageText: "M2",
expectedContentTopic: TestContentTopic,
expectedPubsubTopic: TestPubsubTopic
});
});
TEST_STRING.forEach((testItem) => {
it(`Subscribe to topic containing ${testItem.description} and receive message`, async function () {
const newContentTopic = testItem.value;
const newEncoder = waku.createEncoder({
contentTopic: newContentTopic,
shardInfo: {
clusterId: ClusterId,
shard: ShardIndex
}
});
const newDecoder = waku.createDecoder({
contentTopic: newContentTopic,
shardInfo: {
clusterId: ClusterId,
shard: ShardIndex
}
});
await waku.nextFilter.subscribe(
newDecoder as IDecoder<DecodedMessage>,
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(newEncoder, messagePayload);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: newContentTopic,
expectedPubsubTopic: TestPubsubTopic
});
});
});
it("Add multiple subscription objects on single nwaku node", async function () {
await waku.nextFilter.subscribe(
TestDecoder,
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
const newContentTopic = "/test/2/waku-filter/default";
const newEncoder = createEncoder({
contentTopic: newContentTopic,
pubsubTopic: TestPubsubTopic
});
const newDecoder = createDecoder(newContentTopic, TestPubsubTopic);
await waku.nextFilter.subscribe(
newDecoder,
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M2") });
// Check if both messages were received
expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: "M1",
expectedContentTopic: TestContentTopic,
expectedPubsubTopic: TestPubsubTopic
});
serviceNodes.messageCollector.verifyReceivedMessage(1, {
expectedContentTopic: newContentTopic,
expectedMessageText: "M2",
expectedPubsubTopic: TestPubsubTopic
});
});
it("Renews subscription after lossing a connection", async function () {
// setup check
expect(waku.libp2p.getConnections()).has.length(2);
await waku.nextFilter.subscribe(
TestDecoder,
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(TestEncoder, messagePayload);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: TestContentTopic
});
await serviceNodes.confirmMessageLength(1);
// check renew logic
const nwakuPeers = await Promise.all(
serviceNodes.nodes.map((v) => v.getMultiaddrWithId())
);
await Promise.all(nwakuPeers.map((v) => waku.libp2p.hangUp(v)));
expect(waku.libp2p.getConnections().length).eq(0);
await Promise.all(nwakuPeers.map((v) => waku.libp2p.dial(v)));
await waitForConnections(nwakuPeers.length, waku);
const testText = "second try";
await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes(testText)
});
expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq(
true
);
serviceNodes.messageCollector.verifyReceivedMessage(1, {
expectedMessageText: testText,
expectedContentTopic: TestContentTopic
});
});
it("Subscribe and receive messages from 2 nwaku nodes each with different pubsubtopics", async function () {
await waku.nextFilter.subscribe(
TestDecoder,
serviceNodes.messageCollector.callback
);
// Set up and start a new nwaku node with customPubsubTopic1
const nwaku2 = new ServiceNode(makeLogFileName(this) + "3");
try {
const customContentTopic = "/test/4/waku-filter/default";
const customDecoder = createDecoder(customContentTopic, {
clusterId: ClusterId,
shard: 4
});
const customEncoder = createEncoder({
contentTopic: customContentTopic,
pubsubTopicShardInfo: { clusterId: ClusterId, shard: 4 }
});
await nwaku2.start({
filter: true,
lightpush: true,
relay: true,
clusterId: ClusterId,
shard: [4]
});
await waku.dial(await nwaku2.getMultiaddrWithId());
await waku.waitForPeers([Protocols.Filter, Protocols.LightPush]);
await nwaku2.ensureSubscriptions([customDecoder.pubsubTopic]);
const messageCollector2 = new MessageCollector();
await waku.nextFilter.subscribe(
customDecoder,
messageCollector2.callback
);
// Making sure that messages are send and reveiced for both subscriptions
// While loop is done because of https://github.com/waku-org/js-waku/issues/1606
while (
!(await serviceNodes.messageCollector.waitForMessages(1, {
pubsubTopic: TestDecoder.pubsubTopic
})) ||
!(await messageCollector2.waitForMessages(1, {
pubsubTopic: customDecoder.pubsubTopic
}))
) {
await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes("M1")
});
await waku.lightPush.send(customEncoder, {
payload: utf8ToBytes("M2")
});
}
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedContentTopic: TestDecoder.contentTopic,
expectedPubsubTopic: TestDecoder.pubsubTopic,
expectedMessageText: "M1"
});
messageCollector2.verifyReceivedMessage(0, {
expectedContentTopic: customDecoder.contentTopic,
expectedPubsubTopic: customDecoder.pubsubTopic,
expectedMessageText: "M2"
});
} catch (e) {
await tearDownNodes([nwaku2], []);
}
});
it("Should fail to subscribe with decoder with wrong shard", async function () {
const wrongDecoder = createDecoder(TestDecoder.contentTopic, {
clusterId: ClusterId,
shard: 5
});
// this subscription object is set up with the `customPubsubTopic1` but we're passing it a Decoder with the `customPubsubTopic2`
try {
await waku.nextFilter.subscribe(
wrongDecoder,
serviceNodes.messageCollector.callback
);
} catch (error) {
expect((error as Error).message).to.include(
`Pubsub topic ${wrongDecoder.pubsubTopic} has not been configured on this instance.`
);
}
});
});
};
[true, false].map((strictCheckNodes) => runTests(strictCheckNodes));

View File

@ -0,0 +1,214 @@
import { createDecoder, createEncoder } from "@waku/core";
import { type LightNode } from "@waku/interfaces";
import { utf8ToBytes } from "@waku/sdk";
import { expect } from "chai";
import {
afterEachCustom,
beforeEachCustom,
generateTestData,
runMultipleNodes,
ServiceNodesFleet,
teardownNodesWithRedundancy
} from "../../src/index.js";
import {
ClusterId,
messagePayload,
messageText,
TestContentTopic,
TestDecoder,
TestEncoder,
TestPubsubTopic
} from "./utils.js";
const runTests = (strictCheckNodes: boolean): void => {
describe(`Waku Filter Next: Unsubscribe: Multiple Nodes: Strict Checking: ${strictCheckNodes}`, function () {
// Set the timeout for all tests in this suite. Can be overwritten at test level
this.timeout(10000);
let waku: LightNode;
let serviceNodes: ServiceNodesFleet;
beforeEachCustom(this, async () => {
[serviceNodes, waku] = await runMultipleNodes(
this.ctx,
{
contentTopics: [TestContentTopic],
clusterId: ClusterId
},
{ filter: true, lightpush: true }
);
});
afterEachCustom(this, async () => {
await teardownNodesWithRedundancy(serviceNodes, waku);
});
it("Unsubscribe 1 topic - node subscribed to 1 topic", async function () {
await waku.nextFilter.subscribe(
TestDecoder,
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(TestEncoder, messagePayload);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
await waku.nextFilter.unsubscribe(TestDecoder);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq(
false
);
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: TestContentTopic
});
expect(serviceNodes.messageCollector.count).to.eq(1);
await serviceNodes.confirmMessageLength(2);
});
it("Unsubscribe 1 topic - node subscribed to 2 topics", async function () {
// Subscribe to 2 topics and send messages
await waku.nextFilter.subscribe(
TestDecoder,
serviceNodes.messageCollector.callback
);
const newContentTopic = "/test/2/waku-filter";
const newEncoder = createEncoder({
contentTopic: newContentTopic,
pubsubTopic: TestPubsubTopic
});
const newDecoder = createDecoder(newContentTopic, TestPubsubTopic);
await waku.nextFilter.subscribe(
newDecoder,
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M2") });
expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq(
true
);
// Unsubscribe from the first topic and send again
await waku.nextFilter.unsubscribe(TestDecoder);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M3") });
await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M4") });
expect(await serviceNodes.messageCollector.waitForMessages(3)).to.eq(
true
);
// Check that from 4 messages send 3 were received
expect(serviceNodes.messageCollector.count).to.eq(3);
await serviceNodes.confirmMessageLength(4);
});
it("Unsubscribe 2 topics - node subscribed to 2 topics", async function () {
// Subscribe to 2 topics and send messages
await waku.nextFilter.subscribe(
TestDecoder,
serviceNodes.messageCollector.callback
);
const newContentTopic = "/test/2/waku-filter";
const newEncoder = createEncoder({
contentTopic: newContentTopic,
pubsubTopic: TestPubsubTopic
});
const newDecoder = createDecoder(newContentTopic, TestPubsubTopic);
await waku.nextFilter.subscribe(
newDecoder,
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M2") });
expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq(
true
);
// Unsubscribe from both and send again
await waku.nextFilter.unsubscribe(TestDecoder);
await waku.nextFilter.unsubscribe(newDecoder);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M3") });
await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M4") });
expect(await serviceNodes.messageCollector.waitForMessages(3)).to.eq(
false
);
// Check that from 4 messages send 2 were received
expect(serviceNodes.messageCollector.count).to.eq(2);
await serviceNodes.confirmMessageLength(4);
});
it("Unsubscribe topics the node is not subscribed to", async function () {
// Subscribe to 1 topic and send message
await waku.nextFilter.subscribe(
TestDecoder,
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
);
expect(serviceNodes.messageCollector.count).to.eq(1);
// Unsubscribe from topics that the node is not not subscribed to and send again
await waku.nextFilter.unsubscribe(
createDecoder("/test/2/waku-filter", TestDecoder.pubsubTopic)
);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") });
expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq(
true
);
// Check that both messages were received
expect(serviceNodes.messageCollector.count).to.eq(2);
await serviceNodes.confirmMessageLength(2);
});
it("Unsubscribe from 100 topics (new limit) at once and receives messages", async function () {
this.timeout(100_000);
const topicCount = 100;
const td = generateTestData(topicCount, { pubsubTopic: TestPubsubTopic });
await waku.nextFilter.subscribe(
td.decoders,
serviceNodes.messageCollector.callback
);
for (let i = 0; i < topicCount; i++) {
await waku.lightPush.send(td.encoders[i], {
payload: utf8ToBytes(`Message for Topic ${i + 1}`)
});
}
expect(
await serviceNodes.messageCollector.waitForMessages(topicCount)
).to.eq(true);
td.contentTopics.forEach((topic, index) => {
serviceNodes.messageCollector.verifyReceivedMessage(index, {
expectedContentTopic: topic,
expectedMessageText: `Message for Topic ${index + 1}`,
expectedPubsubTopic: TestPubsubTopic
});
});
await waku.nextFilter.unsubscribe(td.decoders);
for (let i = 0; i < topicCount; i++) {
await waku.lightPush.send(td.encoders[i], {
payload: utf8ToBytes(`Message for Topic ${i + 1}`)
});
}
expect(serviceNodes.messageCollector.count).to.eq(100);
});
});
};
[true, false].map(runTests);

View File

@ -0,0 +1,166 @@
import { createDecoder, createEncoder } from "@waku/core";
import {
CreateNodeOptions,
DefaultNetworkConfig,
ISubscription,
IWaku,
LightNode,
NetworkConfig,
Protocols
} from "@waku/interfaces";
import { createLightNode } from "@waku/sdk";
import {
contentTopicToPubsubTopic,
contentTopicToShardIndex,
derivePubsubTopicsFromNetworkConfig,
Logger
} from "@waku/utils";
import { utf8ToBytes } from "@waku/utils/bytes";
import { Context } from "mocha";
import pRetry from "p-retry";
import {
NOISE_KEY_1,
ServiceNodesFleet,
waitForConnections
} from "../../src/index.js";
// Constants for test configuration.
export const log = new Logger("test:filter");
export const TestContentTopic = "/test/1/waku-filter/default";
export const ClusterId = 2;
export const ShardIndex = contentTopicToShardIndex(TestContentTopic);
export const TestShardInfo = {
contentTopics: [TestContentTopic],
clusterId: ClusterId
};
export const TestPubsubTopic = contentTopicToPubsubTopic(
TestContentTopic,
ClusterId
);
export const TestEncoder = createEncoder({
contentTopic: TestContentTopic,
pubsubTopic: TestPubsubTopic
});
export const TestDecoder = createDecoder(TestContentTopic, TestPubsubTopic);
export const messageText = "Filtering works!";
export const messagePayload = { payload: utf8ToBytes(messageText) };
// Utility to validate errors related to pings in the subscription.
export async function validatePingError(
subscription: ISubscription
): Promise<void> {
try {
const { failures, successes } = await subscription.ping();
if (failures.length === 0 || successes.length > 0) {
throw new Error(
"Ping was successful but was expected to fail with a specific error."
);
}
} catch (err) {
if (
err instanceof Error &&
err.message.includes("peer has no subscriptions")
) {
return;
} else {
throw err;
}
}
}
export async function runMultipleNodes(
context: Context,
networkConfig: NetworkConfig = DefaultNetworkConfig,
strictChecking: boolean = false,
numServiceNodes = 3,
withoutFilter = false
): Promise<[ServiceNodesFleet, LightNode]> {
const pubsubTopics = derivePubsubTopicsFromNetworkConfig(networkConfig);
// create numServiceNodes nodes
const serviceNodes = await ServiceNodesFleet.createAndRun(
context,
numServiceNodes,
strictChecking,
networkConfig,
undefined,
withoutFilter
);
const wakuOptions: CreateNodeOptions = {
staticNoiseKey: NOISE_KEY_1,
libp2p: {
addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] }
}
};
log.info("Starting js waku node with :", JSON.stringify(wakuOptions));
let waku: LightNode | undefined;
try {
waku = await createLightNode(wakuOptions);
await waku.start();
} catch (error) {
log.error("jswaku node failed to start:", error);
}
if (!waku) {
throw new Error("Failed to initialize waku");
}
for (const node of serviceNodes.nodes) {
await waku.dial(await node.getMultiaddrWithId());
await waku.waitForPeers([Protocols.Filter, Protocols.LightPush]);
await node.ensureSubscriptions(pubsubTopics);
const wakuConnections = waku.libp2p.getConnections();
if (wakuConnections.length < 1) {
throw new Error(`Expected at least 1 connection for js-waku.`);
}
await node.waitForLog(waku.libp2p.peerId.toString(), 100);
}
await waitForConnections(numServiceNodes, waku);
return [serviceNodes, waku];
}
export async function teardownNodesWithRedundancy(
serviceNodes: ServiceNodesFleet,
wakuNodes: IWaku | IWaku[]
): Promise<void> {
const wNodes = Array.isArray(wakuNodes) ? wakuNodes : [wakuNodes];
const stopNwakuNodes = serviceNodes.nodes.map(async (node) => {
await pRetry(
async () => {
try {
await node.stop();
} catch (error) {
log.error("Service Node failed to stop:", error);
throw error;
}
},
{ retries: 3 }
);
});
const stopWakuNodes = wNodes.map(async (waku) => {
if (waku) {
await pRetry(
async () => {
try {
await waku.stop();
} catch (error) {
log.error("Waku failed to stop:", error);
throw error;
}
},
{ retries: 3 }
);
}
});
await Promise.all([...stopNwakuNodes, ...stopWakuNodes]);
}