From a4dfd3455c88db6ff60531c15a58128afb25db05 Mon Sep 17 00:00:00 2001 From: Sasha <118575614+weboko@users.noreply.github.com> Date: Wed, 28 May 2025 00:44:44 +0200 Subject: [PATCH] feat: event based approach to Filter (#2300) * create new filter api * implement await on main methods on new Filter * add info logs in new filter * add logs to subscription impl * remove lint supress * add unit tests * introduce E2E tests * update e2e tests and add case for testing filter recovery after nwaku nodes replacement * add new test cases for max limits and enable decoders as array on new filter * fix edge case testing, correct test cases * skip test * update error message * up text * up text * fix lint * implement unsubscribeAll * add js-dock to new filter * add cspell * implement TTL set for message history --- .cspell.json | 2 + packages/core/src/lib/filter/filter.ts | 39 +- packages/interfaces/src/filter_next.ts | 96 +++ packages/interfaces/src/index.ts | 1 + packages/interfaces/src/waku.ts | 8 + packages/sdk/src/filter_next/filter.spec.ts | 205 ++++++ packages/sdk/src/filter_next/filter.ts | 255 +++++++ packages/sdk/src/filter_next/index.ts | 1 + .../sdk/src/filter_next/subscription.spec.ts | 239 +++++++ packages/sdk/src/filter_next/subscription.ts | 593 ++++++++++++++++ packages/sdk/src/filter_next/types.ts | 25 + packages/sdk/src/filter_next/utils.spec.ts | 100 +++ packages/sdk/src/filter_next/utils.ts | 48 ++ packages/sdk/src/waku/waku.ts | 10 + .../tests/tests/filter next/push.node.spec.ts | 340 +++++++++ .../tests/filter next/subscribe.node.spec.ts | 668 ++++++++++++++++++ .../filter next/unsubscribe.node.spec.ts | 214 ++++++ packages/tests/tests/filter next/utils.ts | 166 +++++ 18 files changed, 3004 insertions(+), 6 deletions(-) create mode 100644 packages/interfaces/src/filter_next.ts create mode 100644 packages/sdk/src/filter_next/filter.spec.ts create mode 100644 packages/sdk/src/filter_next/filter.ts create mode 100644 packages/sdk/src/filter_next/index.ts create mode 100644 packages/sdk/src/filter_next/subscription.spec.ts create mode 100644 packages/sdk/src/filter_next/subscription.ts create mode 100644 packages/sdk/src/filter_next/types.ts create mode 100644 packages/sdk/src/filter_next/utils.spec.ts create mode 100644 packages/sdk/src/filter_next/utils.ts create mode 100644 packages/tests/tests/filter next/push.node.spec.ts create mode 100644 packages/tests/tests/filter next/subscribe.node.spec.ts create mode 100644 packages/tests/tests/filter next/unsubscribe.node.spec.ts create mode 100644 packages/tests/tests/filter next/utils.ts diff --git a/.cspell.json b/.cspell.json index 731b362bbb..61fedbe688 100644 --- a/.cspell.json +++ b/.cspell.json @@ -40,7 +40,9 @@ "Encrypters", "enr", "enrs", + "unsubscription", "enrtree", + "unhandle", "ephem", "esnext", "ethersproject", diff --git a/packages/core/src/lib/filter/filter.ts b/packages/core/src/lib/filter/filter.ts index 5c672a28fc..0c2ed77f32 100644 --- a/packages/core/src/lib/filter/filter.ts +++ b/packages/core/src/lib/filter/filter.ts @@ -30,18 +30,45 @@ export const FilterCodecs = { PUSH: "/vac/waku/filter-push/2.0.0-beta1" }; +type IncomingMessageHandler = ( + pubsubTopic: PubsubTopic, + wakuMessage: WakuMessage, + peerIdStr: string +) => Promise; + export class FilterCore extends BaseProtocol implements IBaseProtocolCore { + private static handleIncomingMessage?: IncomingMessageHandler; + public constructor( - private handleIncomingMessage: ( - pubsubTopic: PubsubTopic, - wakuMessage: WakuMessage, - peerIdStr: string - ) => Promise, + handleIncomingMessage: IncomingMessageHandler, public readonly pubsubTopics: PubsubTopic[], libp2p: Libp2p ) { super(FilterCodecs.SUBSCRIBE, libp2p.components, pubsubTopics); + // TODO(weboko): remove when @waku/sdk 0.0.33 is released + const prevHandler = FilterCore.handleIncomingMessage; + FilterCore.handleIncomingMessage = !prevHandler + ? handleIncomingMessage + : async (pubsubTopic, message, peerIdStr): Promise => { + try { + await prevHandler(pubsubTopic, message, peerIdStr); + } catch (e) { + log.error( + "Previous FilterCore incoming message handler failed ", + e + ); + } + + try { + await handleIncomingMessage(pubsubTopic, message, peerIdStr); + } catch (e) { + log.error("Present FilterCore incoming message handler failed ", e); + } + + return; + }; + libp2p .handle(FilterCodecs.PUSH, this.onRequest.bind(this), { maxInboundStreams: 100 @@ -291,7 +318,7 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore { return; } - await this.handleIncomingMessage( + await FilterCore.handleIncomingMessage?.( pubsubTopic, wakuMessage, connection.remotePeer.toString() diff --git a/packages/interfaces/src/filter_next.ts b/packages/interfaces/src/filter_next.ts new file mode 100644 index 0000000000..f995c6879a --- /dev/null +++ b/packages/interfaces/src/filter_next.ts @@ -0,0 +1,96 @@ +import type { IDecodedMessage, IDecoder } from "./message.js"; +import type { Callback } from "./protocols.js"; + +export type INextFilter = { + /** + * Subscribes to messages with specified decoders and executes callback when a message is received. + * In case no peers available initially - will delay subscription till connects to any peer. + * + * @param decoders - Single decoder or array of decoders to subscribe to. All decoders must share the same pubsubTopic. + * @param callback - Function called when a message matching the decoder's contentTopic is received. + * @returns Promise that resolves to true if subscription was successful, false otherwise. + * + * @example + * // Subscribe to a single content topic + * await filter.subscribe(decoder, (msg) => console.log(msg)); + * + * @example + * // Subscribe to multiple content topics with the same pubsub topic + * await filter.subscribe([decoder1, decoder2], (msg) => console.log(msg)); + * + * @example + * // Handle subscription failure + * const success = await filter.subscribe(decoder, handleMessage); + * if (!success) { + * console.error("Failed to subscribe"); + * } + */ + subscribe( + decoders: IDecoder | IDecoder[], + callback: Callback + ): Promise; + + /** + * Unsubscribes from messages with specified decoders. + * + * @param decoders - Single decoder or array of decoders to unsubscribe from. All decoders must share the same pubsubTopic. + * @returns Promise that resolves to true if unsubscription was successful, false otherwise. + * + * @example + * // Unsubscribe from a single decoder + * await filter.unsubscribe(decoder); + * + * @example + * // Unsubscribe from multiple decoders at once + * await filter.unsubscribe([decoder1, decoder2]); + * + * @example + * // Handle unsubscription failure + * const success = await filter.unsubscribe(decoder); + * if (!success) { + * console.error("Failed to unsubscribe"); + * } + */ + unsubscribe( + decoders: IDecoder | IDecoder[] + ): Promise; + + /** + * Unsubscribes from all active subscriptions across all pubsub topics. + * + * @example + * // Clean up all subscriptions when React component unmounts + * useEffect(() => { + * return () => filter.unsubscribeAll(); + * }, [filter]); + * + * @example + * // Reset subscriptions and start over + * filter.unsubscribeAll(); + * await filter.subscribe(newDecoder, newCallback); + */ + unsubscribeAll(): void; +}; + +export type NextFilterOptions = { + /** + * Interval with which Filter subscription will attempt to send ping requests to subscribed peers. + * + * @default 60_000 + */ + keepAliveIntervalMs: number; + + /** + * Number of failed pings allowed to make to a remote peer before attempting to subscribe to a new one. + * + * @default 3 + */ + pingsBeforePeerRenewed: number; + + /** + * Number of peers to be used for establishing subscriptions. + * + * @default 2 + */ + numPeersToUse: number; +}; diff --git a/packages/interfaces/src/index.ts b/packages/interfaces/src/index.ts index 4887607c5c..4ead41bb37 100644 --- a/packages/interfaces/src/index.ts +++ b/packages/interfaces/src/index.ts @@ -1,5 +1,6 @@ export * from "./enr.js"; export * from "./filter.js"; +export * from "./filter_next.js"; export * from "./light_push.js"; export * from "./message.js"; export * from "./peer_exchange.js"; diff --git a/packages/interfaces/src/waku.ts b/packages/interfaces/src/waku.ts index 7df3ed511a..7aef9e9a9d 100644 --- a/packages/interfaces/src/waku.ts +++ b/packages/interfaces/src/waku.ts @@ -3,6 +3,7 @@ import type { MultiaddrInput } from "@multiformats/multiaddr"; import type { IConnectionManager } from "./connection_manager.js"; import type { IFilter } from "./filter.js"; +import type { INextFilter } from "./filter_next.js"; import type { IHealthIndicator } from "./health_indicator.js"; import type { Libp2p } from "./libp2p.js"; import type { ILightPush } from "./light_push.js"; @@ -34,7 +35,13 @@ export interface IWaku { libp2p: Libp2p; relay?: IRelay; store?: IStore; + + /** + * @deprecated use IWaku.nextFilter instead + */ filter?: IFilter; + + nextFilter?: INextFilter; lightPush?: ILightPush; connectionManager: IConnectionManager; health: IHealthIndicator; @@ -210,6 +217,7 @@ export interface LightNode extends IWaku { relay: undefined; store: IStore; filter: IFilter; + nextFilter: INextFilter; lightPush: ILightPush; } diff --git a/packages/sdk/src/filter_next/filter.spec.ts b/packages/sdk/src/filter_next/filter.spec.ts new file mode 100644 index 0000000000..757b18865b --- /dev/null +++ b/packages/sdk/src/filter_next/filter.spec.ts @@ -0,0 +1,205 @@ +import { ConnectionManager, createDecoder } from "@waku/core"; +import type { + IDecodedMessage, + IDecoder, + IProtoMessage, + Libp2p +} from "@waku/interfaces"; +import { expect } from "chai"; +import sinon from "sinon"; + +import { PeerManager } from "../peer_manager/index.js"; + +import { Filter } from "./filter.js"; +import { Subscription } from "./subscription.js"; + +const PUBSUB_TOPIC = "/waku/2/rs/1/4"; +const CONTENT_TOPIC = "/test/1/waku-filter/utf8"; + +describe("Filter SDK", () => { + let libp2p: Libp2p; + let filter: Filter; + let decoder: IDecoder; + let callback: sinon.SinonSpy; + let connectionManager: ConnectionManager; + let peerManager: PeerManager; + + beforeEach(() => { + libp2p = mockLibp2p(); + connectionManager = mockConnectionManager(); + peerManager = mockPeerManager(); + filter = mockFilter({ libp2p, connectionManager, peerManager }); + decoder = createDecoder(CONTENT_TOPIC, PUBSUB_TOPIC); + callback = sinon.spy(); + }); + + afterEach(() => { + sinon.restore(); + }); + + it("should throw error when subscribing with unsupported pubsub topic", async () => { + const unsupportedDecoder = createDecoder( + CONTENT_TOPIC, + "/unsupported/topic" + ); + + try { + await filter.subscribe(unsupportedDecoder, callback); + expect.fail("Should have thrown an error"); + } catch (error) { + expect((error as Error).message).to.include( + "Pubsub topic /unsupported/topic has not been configured on this instance." + ); + } + }); + + it("should successfully subscribe to supported pubsub topic", async () => { + const addStub = sinon.stub(Subscription.prototype, "add").resolves(true); + const startStub = sinon.stub(Subscription.prototype, "start"); + + const result = await filter.subscribe(decoder, callback); + + expect(result).to.be.true; + expect(addStub.calledOnce).to.be.true; + expect(startStub.calledOnce).to.be.true; + }); + + it("should throw error when unsubscribing with unsupported pubsub topic", async () => { + const unsupportedDecoder = createDecoder( + CONTENT_TOPIC, + "/unsupported/topic" + ); + + try { + await filter.unsubscribe(unsupportedDecoder); + expect.fail("Should have thrown an error"); + } catch (error) { + expect((error as Error).message).to.include( + "Pubsub topic /unsupported/topic has not been configured on this instance." + ); + } + }); + + it("should return false when unsubscribing from a non-existing subscription", async () => { + const result = await filter.unsubscribe(decoder); + expect(result).to.be.false; + }); + + it("should successfully unsubscribe from an existing subscription", async () => { + sinon.stub(Subscription.prototype, "add").resolves(true); + sinon.stub(Subscription.prototype, "start"); + await filter.subscribe(decoder, callback); + + const removeStub = sinon + .stub(Subscription.prototype, "remove") + .resolves(true); + const isEmptyStub = sinon + .stub(Subscription.prototype, "isEmpty") + .returns(true); + const stopStub = sinon.stub(Subscription.prototype, "stop"); + + const result = await filter.unsubscribe(decoder); + + expect(result).to.be.true; + expect(removeStub.calledOnce).to.be.true; + expect(isEmptyStub.calledOnce).to.be.true; + expect(stopStub.calledOnce).to.be.true; + }); + + it("should handle incoming messages", async () => { + const subscriptionInvokeStub = sinon.stub(Subscription.prototype, "invoke"); + sinon.stub(Subscription.prototype, "add").resolves(true); + + await filter.subscribe(decoder, callback); + + const message = createMockMessage(CONTENT_TOPIC); + const peerId = "peer1"; + + await (filter as any).onIncomingMessage(PUBSUB_TOPIC, message, peerId); + + expect(subscriptionInvokeStub.calledOnce).to.be.true; + expect(subscriptionInvokeStub.firstCall.args[0]).to.equal(message); + expect(subscriptionInvokeStub.firstCall.args[1]).to.equal(peerId); + }); + + it("should successfully stop", async () => { + const decoder2 = createDecoder("/another-content-topic", PUBSUB_TOPIC); + const stopStub = sinon.stub(Subscription.prototype, "stop"); + + sinon.stub(Subscription.prototype, "add").resolves(true); + sinon.stub(Subscription.prototype, "start"); + + await filter.subscribe(decoder, callback); + await filter.subscribe(decoder2, callback); + + filter.unsubscribeAll(); + + expect(stopStub.calledOnce).to.be.true; + + const result = await filter.unsubscribe(decoder); + expect(result).to.be.false; + }); +}); + +function mockLibp2p(): Libp2p { + return { + addEventListener: sinon.stub(), + removeEventListener: sinon.stub(), + handle: sinon.stub().resolves(), + components: { + events: { + addEventListener: sinon.stub(), + removeEventListener: sinon.stub() + }, + connectionManager: { + getConnections: sinon.stub().returns([]) + } + } + } as unknown as Libp2p; +} + +function mockConnectionManager(): ConnectionManager { + return { + pubsubTopics: [PUBSUB_TOPIC] + } as ConnectionManager; +} + +function mockPeerManager(): PeerManager { + return { + getPeers: sinon.stub().returns([]) + } as unknown as PeerManager; +} + +type MockFilterOptions = { + libp2p: Libp2p; + connectionManager?: ConnectionManager; + peerManager?: PeerManager; +}; + +function mockFilter(options: MockFilterOptions): Filter { + const filter = new Filter({ + libp2p: options.libp2p, + connectionManager: options.connectionManager || mockConnectionManager(), + peerManager: options.peerManager || mockPeerManager(), + options: { + numPeersToUse: 2, + pingsBeforePeerRenewed: 3, + keepAliveIntervalMs: 60_000 + } + }); + + // we're not actually testing FilterCore functionality here + return filter; +} + +function createMockMessage(contentTopic: string): IProtoMessage { + return { + payload: new Uint8Array(), + contentTopic, + version: 0, + timestamp: BigInt(Date.now()), + meta: undefined, + rateLimitProof: undefined, + ephemeral: false + }; +} diff --git a/packages/sdk/src/filter_next/filter.ts b/packages/sdk/src/filter_next/filter.ts new file mode 100644 index 0000000000..ae65302f6e --- /dev/null +++ b/packages/sdk/src/filter_next/filter.ts @@ -0,0 +1,255 @@ +import { ConnectionManager, FilterCore } from "@waku/core"; +import type { + Callback, + NextFilterOptions as FilterOptions, + IDecodedMessage, + IDecoder, + INextFilter as IFilter, + Libp2p +} from "@waku/interfaces"; +import { WakuMessage } from "@waku/proto"; +import { Logger } from "@waku/utils"; + +import { PeerManager } from "../peer_manager/index.js"; + +import { Subscription } from "./subscription.js"; +import { FilterConstructorParams } from "./types.js"; + +const log = new Logger("sdk:next-filter"); + +type PubsubTopic = string; + +export class Filter implements IFilter { + private readonly libp2p: Libp2p; + private readonly protocol: FilterCore; + private readonly peerManager: PeerManager; + private readonly connectionManager: ConnectionManager; + + private readonly config: FilterOptions; + private subscriptions = new Map(); + + public constructor(params: FilterConstructorParams) { + this.config = { + numPeersToUse: 2, + pingsBeforePeerRenewed: 3, + keepAliveIntervalMs: 60_000, + ...params.options + }; + + this.libp2p = params.libp2p; + this.peerManager = params.peerManager; + this.connectionManager = params.connectionManager; + + this.protocol = new FilterCore( + this.onIncomingMessage.bind(this), + params.connectionManager.pubsubTopics, + params.libp2p + ); + } + + /** + * Unsubscribes from all active subscriptions across all pubsub topics. + * + * @example + * // Clean up all subscriptions when React component unmounts + * useEffect(() => { + * return () => filter.unsubscribeAll(); + * }, [filter]); + * + * @example + * // Reset subscriptions and start over + * filter.unsubscribeAll(); + * await filter.subscribe(newDecoder, newCallback); + */ + public unsubscribeAll(): void { + for (const subscription of this.subscriptions.values()) { + subscription.stop(); + } + + this.subscriptions.clear(); + } + + /** + * Subscribes to messages with specified decoders and executes callback when a message is received. + * In case no peers available initially - will delay subscription till connects to any peer. + * + * @param decoders - Single decoder or array of decoders to subscribe to. All decoders must share the same pubsubTopic. + * @param callback - Function called when a message matching the decoder's contentTopic is received. + * @returns Promise that resolves to true if subscription was successful, false otherwise. + * + * @example + * // Subscribe to a single content topic + * await filter.subscribe(decoder, (msg) => console.log(msg)); + * + * @example + * // Subscribe to multiple content topics with the same pubsub topic + * await filter.subscribe([decoder1, decoder2], (msg) => console.log(msg)); + * + * @example + * // Handle subscription failure + * const success = await filter.subscribe(decoder, handleMessage); + * if (!success) { + * console.error("Failed to subscribe"); + * } + */ + public async subscribe( + decoder: IDecoder | IDecoder[], + callback: Callback + ): Promise { + const decoders = Array.isArray(decoder) ? decoder : [decoder]; + + if (decoders.length === 0) { + throw Error("Cannot subscribe with 0 decoders."); + } + + const pubsubTopics = decoders.map((v) => v.pubsubTopic); + const contentTopics = decoders.map((v) => v.contentTopic); + + // doing this for simplicity, we can enable subscription for more than one PubsubTopic at once later when requested + if (!this.isSamePubsubTopic(decoders)) { + throw Error( + `Cannot subscribe to more than one pubsub topic at the same time, got pubsubTopics:${pubsubTopics}` + ); + } + + log.info( + `Subscribing to content topic: ${contentTopics}, pubsub topic: ${pubsubTopics}` + ); + + const supportedPubsubTopic = this.connectionManager.pubsubTopics.includes( + pubsubTopics[0] + ); + + if (!supportedPubsubTopic) { + throw Error( + `Pubsub topic ${pubsubTopics[0]} has not been configured on this instance.` + ); + } + + let subscription = this.subscriptions.get(pubsubTopics[0]); + if (!subscription) { + subscription = new Subscription({ + pubsubTopic: pubsubTopics[0], + libp2p: this.libp2p, + protocol: this.protocol, + config: this.config, + peerManager: this.peerManager + }); + subscription.start(); + } + + const result = await subscription.add(decoders, callback); + this.subscriptions.set(pubsubTopics[0], subscription); + + log.info( + `Subscription ${result ? "successful" : "failed"} for content topic: ${contentTopics}` + ); + + return result; + } + + /** + * Unsubscribes from messages with specified decoders. + * + * @param decoders - Single decoder or array of decoders to unsubscribe from. All decoders must share the same pubsubTopic. + * @returns Promise that resolves to true if unsubscription was successful, false otherwise. + * + * @example + * // Unsubscribe from a single decoder + * await filter.unsubscribe(decoder); + * + * @example + * // Unsubscribe from multiple decoders at once + * await filter.unsubscribe([decoder1, decoder2]); + * + * @example + * // Handle unsubscription failure + * const success = await filter.unsubscribe(decoder); + * if (!success) { + * console.error("Failed to unsubscribe"); + * } + */ + public async unsubscribe( + decoder: IDecoder | IDecoder[] + ): Promise { + const decoders = Array.isArray(decoder) ? decoder : [decoder]; + + if (decoders.length === 0) { + throw Error("Cannot unsubscribe with 0 decoders."); + } + + const pubsubTopics = decoders.map((v) => v.pubsubTopic); + const contentTopics = decoders.map((v) => v.contentTopic); + + // doing this for simplicity, we can enable unsubscribing with more than one PubsubTopic at once later when requested + if (!this.isSamePubsubTopic(decoders)) { + throw Error( + `Cannot unsubscribe with more than one pubsub topic at the same time, got pubsubTopics:${pubsubTopics}` + ); + } + + log.info( + `Unsubscribing from content topic: ${contentTopics}, pubsub topic: ${pubsubTopics}` + ); + + const supportedPubsubTopic = this.connectionManager.pubsubTopics.includes( + pubsubTopics[0] + ); + if (!supportedPubsubTopic) { + throw Error( + `Pubsub topic ${pubsubTopics[0]} has not been configured on this instance.` + ); + } + + const subscription = this.subscriptions.get(pubsubTopics[0]); + if (!subscription) { + log.warn("No subscriptions associated with the decoder."); + return false; + } + + const result = await subscription.remove(decoders); + + if (subscription.isEmpty()) { + log.warn("Subscription has no decoders anymore, terminating it."); + subscription.stop(); + this.subscriptions.delete(pubsubTopics[0]); + } + + log.info( + `Unsubscribing ${result ? "successful" : "failed"} for content topic: ${contentTopics}` + ); + + return result; + } + + private async onIncomingMessage( + pubsubTopic: string, + message: WakuMessage, + peerId: string + ): Promise { + log.info( + `Received message for pubsubTopic:${pubsubTopic}, contentTopic:${message.contentTopic}, peerId:${peerId.toString()}` + ); + + const subscription = this.subscriptions.get(pubsubTopic); + + if (!subscription) { + log.error(`No subscription locally registered for topic ${pubsubTopic}`); + return; + } + + subscription.invoke(message, peerId); + } + + private isSamePubsubTopic( + decoders: IDecoder[] + ): boolean { + const topics = new Set(); + + for (const decoder of decoders) { + topics.add(decoder.pubsubTopic); + } + + return topics.size === 1; + } +} diff --git a/packages/sdk/src/filter_next/index.ts b/packages/sdk/src/filter_next/index.ts new file mode 100644 index 0000000000..d2d17d15fd --- /dev/null +++ b/packages/sdk/src/filter_next/index.ts @@ -0,0 +1 @@ +export { Filter as NextFilter } from "./filter.js"; diff --git a/packages/sdk/src/filter_next/subscription.spec.ts b/packages/sdk/src/filter_next/subscription.spec.ts new file mode 100644 index 0000000000..e0d79e2736 --- /dev/null +++ b/packages/sdk/src/filter_next/subscription.spec.ts @@ -0,0 +1,239 @@ +import type { PeerId } from "@libp2p/interface"; +import { FilterCore } from "@waku/core"; +import type { + IDecodedMessage, + IDecoder, + Libp2p, + NextFilterOptions +} from "@waku/interfaces"; +import { WakuMessage } from "@waku/proto"; +import { expect } from "chai"; +import sinon from "sinon"; + +import { PeerManager } from "../peer_manager/index.js"; + +import { Subscription } from "./subscription.js"; + +const PUBSUB_TOPIC = "/waku/2/rs/1/4"; +const CONTENT_TOPIC = "/test/1/waku-filter/utf8"; + +describe("Filter Subscription", () => { + let libp2p: Libp2p; + let filterCore: FilterCore; + let peerManager: PeerManager; + let subscription: Subscription; + let decoder: IDecoder; + let config: NextFilterOptions; + + beforeEach(() => { + libp2p = mockLibp2p(); + filterCore = mockFilterCore(); + peerManager = mockPeerManager(); + config = { + numPeersToUse: 2, + pingsBeforePeerRenewed: 3, + keepAliveIntervalMs: 60_000 + }; + + subscription = new Subscription({ + pubsubTopic: PUBSUB_TOPIC, + libp2p, + protocol: filterCore, + config, + peerManager + }); + + decoder = mockDecoder(); + }); + + afterEach(() => { + sinon.restore(); + }); + + it("should be empty when created", () => { + expect(subscription.isEmpty()).to.be.true; + }); + + it("should not be empty after adding a subscription", async () => { + const attemptSubscribeSpy = sinon + .stub(subscription as any, "attemptSubscribe") + .resolves(true); + + const callback = sinon.spy(); + await subscription.add(decoder, callback); + + expect(subscription.isEmpty()).to.be.false; + expect(attemptSubscribeSpy.calledOnce).to.be.true; + }); + + it("should be empty after removing the only subscription", async () => { + const attemptSubscribeSpy = sinon + .stub(subscription as any, "attemptSubscribe") + .resolves(true); + const attemptUnsubscribeSpy = sinon + .stub(subscription as any, "attemptUnsubscribe") + .resolves(true); + + const callback = sinon.spy(); + await subscription.add(decoder, callback); + await subscription.remove(decoder); + + expect(subscription.isEmpty()).to.be.true; + expect(attemptSubscribeSpy.calledOnce).to.be.true; + expect(attemptUnsubscribeSpy.calledOnce).to.be.true; + }); + + it("should invoke callbacks when receiving a message", async () => { + const testContentTopic = "/custom/content/topic"; + const testDecoder = { + pubsubTopic: PUBSUB_TOPIC, + contentTopic: testContentTopic, + fromProtoObj: sinon.stub().callsFake(() => { + return Promise.resolve({ payload: new Uint8Array([1, 2, 3]) }); + }) + }; + + const callback = sinon.spy(); + const message = { + contentTopic: testContentTopic + } as WakuMessage; + + sinon.stub(subscription as any, "attemptSubscribe").resolves(true); + await subscription.add(testDecoder as any, callback); + + subscription.invoke(message, "peer1"); + + await new Promise((resolve) => setTimeout(resolve, 50)); + + expect(callback.called).to.be.true; + expect(testDecoder.fromProtoObj.called).to.be.true; + expect(callback.callCount).to.eq(1); + }); + + it("should invoke callbacks only when newly receiving message is given", async () => { + const testContentTopic = "/custom/content/topic"; + const testDecoder = { + pubsubTopic: PUBSUB_TOPIC, + contentTopic: testContentTopic, + fromProtoObj: sinon.stub().callsFake(() => { + return Promise.resolve({ payload: new Uint8Array([1, 2, 3]) }); + }) + }; + + const callback = sinon.spy(); + const message = { + contentTopic: testContentTopic + } as WakuMessage; + + sinon.stub(subscription as any, "attemptSubscribe").resolves(true); + await subscription.add(testDecoder as any, callback); + + subscription.invoke(message, "peer1"); + await new Promise((resolve) => setTimeout(resolve, 50)); + + subscription.invoke(message, "peer2"); + await new Promise((resolve) => setTimeout(resolve, 50)); + + expect(callback.called).to.be.true; + expect(testDecoder.fromProtoObj.called).to.be.true; + expect(callback.callCount).to.eq(1); + }); + + it("should start and setup intervals and event listeners", () => { + const attemptSubscribeSpy = sinon + .stub(subscription as any, "attemptSubscribe") + .resolves(true); + const setupSubscriptionIntervalSpy = sinon.spy( + subscription as any, + "setupSubscriptionInterval" + ); + const setupKeepAliveIntervalSpy = sinon.spy( + subscription as any, + "setupKeepAliveInterval" + ); + const setupEventListenersSpy = sinon.spy( + subscription as any, + "setupEventListeners" + ); + + subscription.start(); + + expect(attemptSubscribeSpy.calledOnce).to.be.true; + expect(setupSubscriptionIntervalSpy.calledOnce).to.be.true; + expect(setupKeepAliveIntervalSpy.calledOnce).to.be.true; + expect(setupEventListenersSpy.calledOnce).to.be.true; + }); + + it("should stop and cleanup resources", () => { + const disposeEventListenersSpy = sinon.spy( + subscription as any, + "disposeEventListeners" + ); + const disposeIntervalsSpy = sinon.spy( + subscription as any, + "disposeIntervals" + ); + const disposePeersSpy = sinon + .stub(subscription as any, "disposePeers") + .resolves(); + const disposeHandlersSpy = sinon.spy( + subscription as any, + "disposeHandlers" + ); + + sinon.stub(subscription as any, "attemptSubscribe").resolves(true); + subscription.start(); + + subscription.stop(); + + expect(disposeEventListenersSpy.calledOnce).to.be.true; + expect(disposeIntervalsSpy.calledOnce).to.be.true; + expect(disposePeersSpy.calledOnce).to.be.true; + expect(disposeHandlersSpy.calledOnce).to.be.true; + }); +}); + +function mockLibp2p(): Libp2p { + return { + addEventListener: sinon.stub(), + removeEventListener: sinon.stub(), + handle: sinon.stub().resolves(), + components: { + events: { + addEventListener: sinon.stub(), + removeEventListener: sinon.stub() + }, + connectionManager: { + getConnections: sinon.stub().returns([]) + } + } + } as unknown as Libp2p; +} + +function mockFilterCore(): FilterCore { + return { + subscribe: sinon.stub().resolves(true), + unsubscribe: sinon.stub().resolves(true), + ping: sinon.stub().resolves(true) + } as unknown as FilterCore; +} + +function mockPeerManager(): PeerManager { + return { + getPeers: sinon.stub().returns([mockPeerId("peer1"), mockPeerId("peer2")]) + } as unknown as PeerManager; +} + +function mockPeerId(id: string): PeerId { + return { + toString: () => id + } as unknown as PeerId; +} + +function mockDecoder(): IDecoder { + return { + pubsubTopic: PUBSUB_TOPIC, + contentTopic: CONTENT_TOPIC, + fromProtoObj: sinon.stub().resolves(undefined) + } as unknown as IDecoder; +} diff --git a/packages/sdk/src/filter_next/subscription.ts b/packages/sdk/src/filter_next/subscription.ts new file mode 100644 index 0000000000..7e6aecd0e1 --- /dev/null +++ b/packages/sdk/src/filter_next/subscription.ts @@ -0,0 +1,593 @@ +import { + type EventHandler, + type PeerId, + TypedEventEmitter +} from "@libp2p/interface"; +import { FilterCore } from "@waku/core"; +import type { + Callback, + NextFilterOptions as FilterOptions, + IDecodedMessage, + IDecoder, + IProtoMessage, + Libp2p +} from "@waku/interfaces"; +import { messageHashStr } from "@waku/message-hash"; +import { WakuMessage } from "@waku/proto"; +import { Logger } from "@waku/utils"; + +import { PeerManager } from "../peer_manager/index.js"; + +import { SubscriptionEvents, SubscriptionParams } from "./types.js"; +import { TTLSet } from "./utils.js"; + +const log = new Logger("sdk:filter-subscription"); + +type AttemptSubscribeParams = { + useNewContentTopics: boolean; + useOnlyNewPeers?: boolean; +}; + +type AttemptUnsubscribeParams = { + useNewContentTopics: boolean; +}; + +export class Subscription { + private readonly libp2p: Libp2p; + private readonly pubsubTopic: string; + private readonly protocol: FilterCore; + private readonly peerManager: PeerManager; + + private readonly config: FilterOptions; + + private isStarted: boolean = false; + private inProgress: boolean = false; + + private peers = new Set(); + private peerFailures = new Map(); + + private readonly receivedMessages = new TTLSet(60_000); + + private callbacks = new Map< + IDecoder, + EventHandler> + >(); + private messageEmitter = new TypedEventEmitter(); + + private toSubscribeContentTopics = new Set(); + private toUnsubscribeContentTopics = new Set(); + + private subscribeIntervalId: number | null = null; + private keepAliveIntervalId: number | null = null; + + private get contentTopics(): string[] { + const allTopics = Array.from(this.callbacks.keys()).map( + (k) => k.contentTopic + ); + const uniqueTopics = new Set(allTopics).values(); + + return Array.from(uniqueTopics); + } + + public constructor(params: SubscriptionParams) { + this.config = params.config; + this.pubsubTopic = params.pubsubTopic; + + this.libp2p = params.libp2p; + this.protocol = params.protocol; + this.peerManager = params.peerManager; + + this.onPeerConnected = this.onPeerConnected.bind(this); + this.onPeerDisconnected = this.onPeerDisconnected.bind(this); + } + + public start(): void { + log.info(`Starting subscription for pubsubTopic: ${this.pubsubTopic}`); + + if (this.isStarted || this.inProgress) { + log.info("Subscription already started or in progress, skipping start"); + return; + } + + this.inProgress = true; + + void this.attemptSubscribe({ + useNewContentTopics: false + }); + this.setupSubscriptionInterval(); + this.setupKeepAliveInterval(); + this.setupEventListeners(); + + this.isStarted = true; + this.inProgress = false; + + log.info(`Subscription started for pubsubTopic: ${this.pubsubTopic}`); + } + + public stop(): void { + log.info(`Stopping subscription for pubsubTopic: ${this.pubsubTopic}`); + + if (!this.isStarted || this.inProgress) { + log.info("Subscription not started or stop in progress, skipping stop"); + return; + } + + this.inProgress = true; + + this.disposeEventListeners(); + this.disposeIntervals(); + void this.disposePeers(); + this.disposeHandlers(); + this.receivedMessages.dispose(); + + this.inProgress = false; + this.isStarted = false; + + log.info(`Subscription stopped for pubsubTopic: ${this.pubsubTopic}`); + } + + public isEmpty(): boolean { + return this.callbacks.size === 0; + } + + public async add( + decoder: IDecoder | IDecoder[], + callback: Callback + ): Promise { + const decoders = Array.isArray(decoder) ? decoder : [decoder]; + + for (const decoder of decoders) { + this.addSingle(decoder, callback); + } + + return this.toSubscribeContentTopics.size > 0 + ? await this.attemptSubscribe({ useNewContentTopics: true }) + : true; // if content topic is not new - subscription, most likely exists + } + + public async remove( + decoder: IDecoder | IDecoder[] + ): Promise { + const decoders = Array.isArray(decoder) ? decoder : [decoder]; + + for (const decoder of decoders) { + this.removeSingle(decoder); + } + + return this.toUnsubscribeContentTopics.size > 0 + ? await this.attemptUnsubscribe({ useNewContentTopics: true }) + : true; // no need to unsubscribe if there are other decoders on the contentTopic + } + + public invoke(message: WakuMessage, _peerId: string): void { + if (this.isMessageReceived(message)) { + log.info( + `Skipping invoking callbacks for already received message: pubsubTopic:${this.pubsubTopic}, peerId:${_peerId.toString()}, contentTopic:${message.contentTopic}` + ); + return; + } + + log.info(`Invoking message for contentTopic: ${message.contentTopic}`); + + this.messageEmitter.dispatchEvent( + new CustomEvent(message.contentTopic, { + detail: message + }) + ); + } + + private addSingle( + decoder: IDecoder, + callback: Callback + ): void { + log.info(`Adding subscription for contentTopic: ${decoder.contentTopic}`); + + const isNewContentTopic = !this.contentTopics.includes( + decoder.contentTopic + ); + + if (isNewContentTopic) { + this.toSubscribeContentTopics.add(decoder.contentTopic); + } + + if (this.callbacks.has(decoder)) { + log.warn( + `Replacing callback associated associated with decoder with pubsubTopic:${decoder.pubsubTopic} and contentTopic:${decoder.contentTopic}` + ); + + const callback = this.callbacks.get(decoder); + this.callbacks.delete(decoder); + this.messageEmitter.removeEventListener(decoder.contentTopic, callback); + } + + const eventHandler = (event: CustomEvent): void => { + void (async (): Promise => { + try { + const message = await decoder.fromProtoObj( + decoder.pubsubTopic, + event.detail as IProtoMessage + ); + void callback(message!); + } catch (err) { + log.error("Error decoding message", err); + } + })(); + }; + + this.callbacks.set(decoder, eventHandler); + this.messageEmitter.addEventListener(decoder.contentTopic, eventHandler); + + log.info( + `Subscription added for contentTopic: ${decoder.contentTopic}, isNewContentTopic: ${isNewContentTopic}` + ); + } + + private removeSingle(decoder: IDecoder): void { + log.info(`Removing subscription for contentTopic: ${decoder.contentTopic}`); + + const callback = this.callbacks.get(decoder); + + if (!callback) { + log.warn( + `No callback associated with decoder with pubsubTopic:${decoder.pubsubTopic} and contentTopic:${decoder.contentTopic}` + ); + } + + this.callbacks.delete(decoder); + this.messageEmitter.removeEventListener(decoder.contentTopic, callback); + + const isCompletelyRemoved = !this.contentTopics.includes( + decoder.contentTopic + ); + + if (isCompletelyRemoved) { + this.toUnsubscribeContentTopics.add(decoder.contentTopic); + } + + log.info( + `Subscription removed for contentTopic: ${decoder.contentTopic}, isCompletelyRemoved: ${isCompletelyRemoved}` + ); + } + + private isMessageReceived(message: WakuMessage): boolean { + try { + const messageHash = messageHashStr( + this.pubsubTopic, + message as IProtoMessage + ); + + if (this.receivedMessages.has(messageHash)) { + return true; + } + + this.receivedMessages.add(messageHash); + } catch (e) { + // do nothing on throw, message will be handled as not received + } + + return false; + } + + private setupSubscriptionInterval(): void { + const subscriptionRefreshIntervalMs = 1000; + + log.info( + `Setting up subscription interval with period ${subscriptionRefreshIntervalMs}ms` + ); + + this.subscribeIntervalId = setInterval(() => { + const run = async (): Promise => { + if (this.toSubscribeContentTopics.size > 0) { + log.info( + `Subscription interval: ${this.toSubscribeContentTopics.size} topics to subscribe` + ); + void (await this.attemptSubscribe({ useNewContentTopics: true })); + } + + if (this.toUnsubscribeContentTopics.size > 0) { + log.info( + `Subscription interval: ${this.toUnsubscribeContentTopics.size} topics to unsubscribe` + ); + void (await this.attemptUnsubscribe({ useNewContentTopics: true })); + } + }; + + void run(); + }, subscriptionRefreshIntervalMs) as unknown as number; + } + + private setupKeepAliveInterval(): void { + log.info( + `Setting up keep-alive interval with period ${this.config.keepAliveIntervalMs}ms` + ); + + this.keepAliveIntervalId = setInterval(() => { + const run = async (): Promise => { + log.info(`Keep-alive interval running for ${this.peers.size} peers`); + + let peersToReplace = await Promise.all( + Array.from(this.peers.values()).map( + async (peer): Promise => { + const response = await this.protocol.ping(peer); + + if (response.success) { + log.info(`Ping successful for peer: ${peer.toString()}`); + this.peerFailures.set(peer, 0); + return; + } + + let failures = this.peerFailures.get(peer) || 0; + failures += 1; + this.peerFailures.set(peer, failures); + + log.warn( + `Ping failed for peer: ${peer.toString()}, failures: ${failures}/${this.config.pingsBeforePeerRenewed}` + ); + + if (failures < this.config.pingsBeforePeerRenewed) { + return; + } + + log.info( + `Peer ${peer.toString()} exceeded max failures (${this.config.pingsBeforePeerRenewed}), will be replaced` + ); + return peer; + } + ) + ); + + peersToReplace = peersToReplace.filter((p) => !!p); + + await Promise.all( + peersToReplace.map((p) => { + this.peers.delete(p as PeerId); + this.peerFailures.delete(p as PeerId); + return this.requestUnsubscribe(p as PeerId, this.contentTopics); + }) + ); + + if (peersToReplace.length > 0) { + log.info(`Replacing ${peersToReplace.length} failed peers`); + + void (await this.attemptSubscribe({ + useNewContentTopics: false, + useOnlyNewPeers: true + })); + } + }; + + void run(); + }, this.config.keepAliveIntervalMs) as unknown as number; + } + + private setupEventListeners(): void { + this.libp2p.addEventListener( + "peer:connect", + (e) => void this.onPeerConnected(e) + ); + this.libp2p.addEventListener( + "peer:disconnect", + (e) => void this.onPeerDisconnected(e) + ); + } + + private disposeIntervals(): void { + if (this.subscribeIntervalId) { + clearInterval(this.subscribeIntervalId); + } + + if (this.keepAliveIntervalId) { + clearInterval(this.keepAliveIntervalId); + } + } + + private disposeHandlers(): void { + for (const [decoder, handler] of this.callbacks.entries()) { + this.messageEmitter.removeEventListener(decoder.contentTopic, handler); + } + this.callbacks.clear(); + } + + private async disposePeers(): Promise { + await this.attemptUnsubscribe({ useNewContentTopics: false }); + + this.peers.clear(); + this.peerFailures = new Map(); + } + + private disposeEventListeners(): void { + this.libp2p.removeEventListener("peer:connect", this.onPeerConnected); + this.libp2p.removeEventListener("peer:disconnect", this.onPeerDisconnected); + } + + private onPeerConnected(event: CustomEvent): void { + log.info(`Peer connected: ${event.detail.toString()}`); + + // skip the peer we already subscribe to + if (this.peers.has(event.detail)) { + log.info(`Peer ${event.detail.toString()} already subscribed, skipping`); + return; + } + + void this.attemptSubscribe({ + useNewContentTopics: false, + useOnlyNewPeers: true + }); + } + + private onPeerDisconnected(event: CustomEvent): void { + log.info(`Peer disconnected: ${event.detail.toString()}`); + + // ignore as the peer is not the one that is in use + if (!this.peers.has(event.detail)) { + log.info( + `Disconnected peer ${event.detail.toString()} not in use, ignoring` + ); + return; + } + + log.info( + `Active peer ${event.detail.toString()} disconnected, removing from peers list` + ); + + this.peers.delete(event.detail); + void this.attemptSubscribe({ + useNewContentTopics: false, + useOnlyNewPeers: true + }); + } + + private async attemptSubscribe( + params: AttemptSubscribeParams + ): Promise { + const { useNewContentTopics, useOnlyNewPeers = false } = params; + + const contentTopics = useNewContentTopics + ? Array.from(this.toSubscribeContentTopics) + : this.contentTopics; + + log.info( + `Attempting to subscribe: useNewContentTopics=${useNewContentTopics}, useOnlyNewPeers=${useOnlyNewPeers}, contentTopics=${contentTopics.length}` + ); + + if (!contentTopics.length) { + log.warn("Requested content topics is an empty array, skipping"); + return false; + } + + const prevPeers = new Set(this.peers); + const peersToAdd = this.peerManager.getPeers(); + for (const peer of peersToAdd) { + if (this.peers.size >= this.config.numPeersToUse) { + break; + } + + this.peers.add(peer); + } + + const peersToUse = useOnlyNewPeers + ? Array.from(this.peers.values()).filter((p) => !prevPeers.has(p)) + : Array.from(this.peers.values()); + + log.info( + `Subscribing with ${peersToUse.length} peers for ${contentTopics.length} content topics` + ); + + if (useOnlyNewPeers && peersToUse.length === 0) { + log.warn(`Requested to use only new peers, but no peers found, skipping`); + return false; + } + + const results = await Promise.all( + peersToUse.map((p) => this.requestSubscribe(p, contentTopics)) + ); + + const successCount = results.filter((r) => r).length; + log.info( + `Subscribe attempts completed: ${successCount}/${results.length} successful` + ); + + if (useNewContentTopics) { + this.toSubscribeContentTopics = new Set(); + } + + return results.some((v) => v); + } + + private async requestSubscribe( + peerId: PeerId, + contentTopics: string[] + ): Promise { + log.info( + `requestSubscribe: pubsubTopic:${this.pubsubTopic}\tcontentTopics:${contentTopics.join(",")}` + ); + + if (!contentTopics.length || !this.pubsubTopic) { + log.warn( + `requestSubscribe: no contentTopics or pubsubTopic provided, not sending subscribe request` + ); + return false; + } + + const response = await this.protocol.subscribe( + this.pubsubTopic, + peerId, + contentTopics + ); + + if (response.failure) { + log.warn( + `requestSubscribe: Failed to subscribe ${this.pubsubTopic} to ${peerId.toString()} with error:${response.failure.error} for contentTopics:${contentTopics}` + ); + return false; + } + + log.info( + `requestSubscribe: Subscribed ${this.pubsubTopic} to ${peerId.toString()} for contentTopics:${contentTopics}` + ); + + return true; + } + + private async attemptUnsubscribe( + params: AttemptUnsubscribeParams + ): Promise { + const { useNewContentTopics } = params; + + const contentTopics = useNewContentTopics + ? Array.from(this.toUnsubscribeContentTopics) + : this.contentTopics; + + log.info( + `Attempting to unsubscribe: useNewContentTopics=${useNewContentTopics}, contentTopics=${contentTopics.length}` + ); + + if (!contentTopics.length) { + log.warn("Requested content topics is an empty array, skipping"); + return false; + } + + const peersToUse = Array.from(this.peers.values()); + const result = await Promise.all( + peersToUse.map((p) => + this.requestUnsubscribe( + p, + useNewContentTopics ? contentTopics : undefined + ) + ) + ); + + const successCount = result.filter((r) => r).length; + log.info( + `Unsubscribe attempts completed: ${successCount}/${result.length} successful` + ); + + if (useNewContentTopics) { + this.toUnsubscribeContentTopics = new Set(); + } + + return result.some((v) => v); + } + + private async requestUnsubscribe( + peerId: PeerId, + contentTopics?: string[] + ): Promise { + const response = contentTopics + ? await this.protocol.unsubscribe(this.pubsubTopic, peerId, contentTopics) + : await this.protocol.unsubscribeAll(this.pubsubTopic, peerId); + + if (response.failure) { + log.warn( + `requestUnsubscribe: Failed to unsubscribe for pubsubTopic:${this.pubsubTopic} from peerId:${peerId.toString()} with error:${response.failure?.error} for contentTopics:${contentTopics}` + ); + return false; + } + + log.info( + `requestUnsubscribe: Unsubscribed pubsubTopic:${this.pubsubTopic} from peerId:${peerId.toString()} for contentTopics:${contentTopics}` + ); + + return true; + } +} diff --git a/packages/sdk/src/filter_next/types.ts b/packages/sdk/src/filter_next/types.ts new file mode 100644 index 0000000000..4cd6e49927 --- /dev/null +++ b/packages/sdk/src/filter_next/types.ts @@ -0,0 +1,25 @@ +import { ConnectionManager } from "@waku/core"; +import { FilterCore } from "@waku/core"; +import type { Libp2p, NextFilterOptions } from "@waku/interfaces"; +import { WakuMessage } from "@waku/proto"; + +import { PeerManager } from "../peer_manager/index.js"; + +export type FilterConstructorParams = { + options?: Partial; + libp2p: Libp2p; + peerManager: PeerManager; + connectionManager: ConnectionManager; +}; + +export type SubscriptionEvents = { + [contentTopic: string]: CustomEvent; +}; + +export type SubscriptionParams = { + libp2p: Libp2p; + pubsubTopic: string; + protocol: FilterCore; + config: NextFilterOptions; + peerManager: PeerManager; +}; diff --git a/packages/sdk/src/filter_next/utils.spec.ts b/packages/sdk/src/filter_next/utils.spec.ts new file mode 100644 index 0000000000..8275f7b344 --- /dev/null +++ b/packages/sdk/src/filter_next/utils.spec.ts @@ -0,0 +1,100 @@ +import { expect } from "chai"; +import sinon from "sinon"; + +import { TTLSet } from "./utils.js"; + +describe("TTLSet", () => { + let clock: sinon.SinonFakeTimers; + + beforeEach(() => { + clock = sinon.useFakeTimers(); + }); + + afterEach(() => { + clock.restore(); + sinon.restore(); + }); + + it("should add and check entries correctly", () => { + const ttlSet = new TTLSet(60_000); + + ttlSet.add("test-entry"); + + expect(ttlSet.has("test-entry")).to.be.true; + expect(ttlSet.has("non-existent-entry")).to.be.false; + }); + + it("should support chaining for add method", () => { + const ttlSet = new TTLSet(60_000); + + ttlSet.add("entry1").add("entry2"); + + expect(ttlSet.has("entry1")).to.be.true; + expect(ttlSet.has("entry2")).to.be.true; + }); + + it("should remove expired entries after TTL has passed", () => { + const ttlSet = new TTLSet(1_000, 500); + + ttlSet.add("expiring-entry"); + expect(ttlSet.has("expiring-entry")).to.be.true; + + clock.tick(1_500); + + expect(ttlSet.has("expiring-entry")).to.be.false; + }); + + it("should keep entries that haven't expired yet", () => { + const ttlSet = new TTLSet(2_000, 500); + + ttlSet.add("entry"); + expect(ttlSet.has("entry")).to.be.true; + + clock.tick(1000); + + expect(ttlSet.has("entry")).to.be.true; + }); + + it("should handle different types of entries", () => { + const numberSet = new TTLSet(60_000); + numberSet.add(123); + expect(numberSet.has(123)).to.be.true; + expect(numberSet.has(456)).to.be.false; + + const objectSet = new TTLSet(60_000); + const obj1 = { id: 1 }; + const obj2 = { id: 2 }; + objectSet.add(obj1); + expect(objectSet.has(obj1)).to.be.true; + expect(objectSet.has(obj2)).to.be.false; + }); + + it("should properly clean up resources when disposed", () => { + const ttlSet = new TTLSet(60_000); + const clearIntervalSpy = sinon.spy(global, "clearInterval"); + + ttlSet.add("test-entry"); + ttlSet.dispose(); + + expect(clearIntervalSpy.called).to.be.true; + expect(ttlSet.has("test-entry")).to.be.false; + }); + + it("should continually clean up expired entries at intervals", () => { + const ttlSet = new TTLSet(1_000, 500); + + ttlSet.add("entry1"); + + clock.tick(750); + expect(ttlSet.has("entry1")).to.be.true; + + ttlSet.add("entry2"); + + clock.tick(750); + expect(ttlSet.has("entry1")).to.be.false; + expect(ttlSet.has("entry2")).to.be.true; + + clock.tick(750); + expect(ttlSet.has("entry2")).to.be.false; + }); +}); diff --git a/packages/sdk/src/filter_next/utils.ts b/packages/sdk/src/filter_next/utils.ts new file mode 100644 index 0000000000..00476031ce --- /dev/null +++ b/packages/sdk/src/filter_next/utils.ts @@ -0,0 +1,48 @@ +export class TTLSet { + private readonly ttlMs: number; + private cleanupIntervalId: number | null = null; + private readonly entryTimestamps = new Map(); + + /** + * Creates a new CustomSet with TTL functionality. + * @param ttlMs - The time-to-live in milliseconds for each entry. + * @param cleanupIntervalMs - Optional interval between cleanup operations (default: 5000ms). + */ + public constructor(ttlMs: number, cleanupIntervalMs: number = 5000) { + this.ttlMs = ttlMs; + this.startCleanupInterval(cleanupIntervalMs); + } + + public dispose(): void { + if (this.cleanupIntervalId !== null) { + clearInterval(this.cleanupIntervalId); + this.cleanupIntervalId = null; + } + + this.entryTimestamps.clear(); + } + + public add(entry: T): this { + this.entryTimestamps.set(entry, Date.now()); + return this; + } + + public has(entry: T): boolean { + return this.entryTimestamps.has(entry); + } + + private startCleanupInterval(intervalMs: number): void { + this.cleanupIntervalId = setInterval(() => { + this.removeExpiredEntries(); + }, intervalMs) as unknown as number; + } + + private removeExpiredEntries(): void { + const now = Date.now(); + for (const [entry, timestamp] of this.entryTimestamps.entries()) { + if (now - timestamp > this.ttlMs) { + this.entryTimestamps.delete(entry); + } + } + } +} diff --git a/packages/sdk/src/waku/waku.ts b/packages/sdk/src/waku/waku.ts index aff3618823..323f548149 100644 --- a/packages/sdk/src/waku/waku.ts +++ b/packages/sdk/src/waku/waku.ts @@ -15,6 +15,7 @@ import type { IEncoder, IFilter, ILightPush, + INextFilter, IRelay, IStore, IWaku, @@ -26,6 +27,7 @@ import { DefaultNetworkConfig, Protocols } from "@waku/interfaces"; import { Logger } from "@waku/utils"; import { Filter } from "../filter/index.js"; +import { NextFilter } from "../filter_next/index.js"; import { HealthIndicator } from "../health_indicator/index.js"; import { LightPush } from "../light_push/index.js"; import { PeerManager } from "../peer_manager/index.js"; @@ -51,6 +53,7 @@ export class WakuNode implements IWaku { public relay?: IRelay; public store?: IStore; public filter?: IFilter; + public nextFilter?: INextFilter; public lightPush?: ILightPush; public connectionManager: ConnectionManager; public health: HealthIndicator; @@ -135,6 +138,13 @@ export class WakuNode implements IWaku { lightPush: this.lightPush, options: options.filter }); + + this.nextFilter = new NextFilter({ + libp2p, + connectionManager: this.connectionManager, + peerManager: this.peerManager, + options: options.filter + }); } log.info( diff --git a/packages/tests/tests/filter next/push.node.spec.ts b/packages/tests/tests/filter next/push.node.spec.ts new file mode 100644 index 0000000000..e6bc9f1614 --- /dev/null +++ b/packages/tests/tests/filter next/push.node.spec.ts @@ -0,0 +1,340 @@ +import { LightNode, Protocols } from "@waku/interfaces"; +import { utf8ToBytes } from "@waku/sdk"; +import { expect } from "chai"; + +import { + afterEachCustom, + beforeEachCustom, + delay, + runMultipleNodes, + ServiceNodesFleet, + teardownNodesWithRedundancy, + TEST_STRING, + TEST_TIMESTAMPS +} from "../../src/index.js"; + +import { + messageText, + TestContentTopic, + TestDecoder, + TestEncoder, + TestPubsubTopic, + TestShardInfo +} from "./utils.js"; + +const runTests = (strictCheckNodes: boolean): void => { + describe(`Waku Filter Next: FilterPush: Multiple Nodes: Strict Checking: ${strictCheckNodes}`, function () { + // Set the timeout for all tests in this suite. Can be overwritten at test level + this.timeout(10000); + let waku: LightNode; + let serviceNodes: ServiceNodesFleet; + let ctx: Mocha.Context; + + beforeEachCustom(this, async () => { + ctx = this.ctx; + [serviceNodes, waku] = await runMultipleNodes(this.ctx, TestShardInfo, { + lightpush: true, + filter: true + }); + }); + + afterEachCustom(this, async () => { + await teardownNodesWithRedundancy(serviceNodes, waku); + }); + + TEST_STRING.forEach((testItem) => { + it(`Check received message containing ${testItem.description}`, async function () { + await waku.nextFilter.subscribe( + TestDecoder, + serviceNodes.messageCollector.callback + ); + + await waku.lightPush.send(TestEncoder, { + payload: utf8ToBytes(testItem.value) + }); + + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + true + ); + serviceNodes.messageCollector.verifyReceivedMessage(0, { + expectedMessageText: testItem.value, + expectedContentTopic: TestContentTopic, + expectedPubsubTopic: TestPubsubTopic + }); + }); + }); + + TEST_TIMESTAMPS.forEach((testItem) => { + it(`Check received message with timestamp: ${testItem} `, async function () { + await waku.nextFilter.subscribe( + TestDecoder, + serviceNodes.messageCollector.callback + ); + await delay(400); + + await serviceNodes.sendRelayMessage( + { + contentTopic: TestContentTopic, + payload: Buffer.from(utf8ToBytes(messageText)).toString("base64"), + timestamp: testItem as any + }, + TestPubsubTopic + ); + + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + true + ); + serviceNodes.messageCollector.verifyReceivedMessage(0, { + expectedMessageText: messageText, + checkTimestamp: false, + expectedContentTopic: TestContentTopic, + expectedPubsubTopic: TestPubsubTopic + }); + + // Check if the timestamp matches + const timestamp = serviceNodes.messageCollector.getMessage(0).timestamp; + if (testItem == undefined) { + expect(timestamp).to.eq(undefined); + } + if (timestamp !== undefined && timestamp instanceof Date) { + expect(testItem?.toString()).to.contain( + timestamp.getTime().toString() + ); + } + }); + }); + + it("Check message with invalid timestamp is not received", async function () { + await waku.nextFilter.subscribe( + TestDecoder, + serviceNodes.messageCollector.callback + ); + await delay(400); + + await serviceNodes.sendRelayMessage( + { + contentTopic: TestContentTopic, + payload: Buffer.from(utf8ToBytes(messageText)).toString("base64"), + timestamp: "2023-09-06T12:05:38.609Z" as any + }, + TestPubsubTopic + ); + + // Verify that no message was received + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + false + ); + }); + + it("Check message on other pubsub topic is not received", async function () { + await waku.nextFilter.subscribe( + TestDecoder, + serviceNodes.messageCollector.callback + ); + await delay(400); + + await serviceNodes.sendRelayMessage( + { + contentTopic: TestContentTopic, + payload: Buffer.from(utf8ToBytes(messageText)).toString("base64"), + timestamp: BigInt(Date.now()) * BigInt(1000000) + }, + "WrongContentTopic" + ); + + expect( + await serviceNodes.messageCollector.waitForMessages(1, { + pubsubTopic: TestPubsubTopic + }) + ).to.eq(false); + }); + + it("Check message with no pubsub topic is not received", async function () { + await waku.nextFilter.subscribe( + TestDecoder, + serviceNodes.messageCollector.callback + ); + await delay(400); + + await serviceNodes.nodes[0].restCall( + `/relay/v1/messages/`, + "POST", + { + contentTopic: TestContentTopic, + payload: Buffer.from(utf8ToBytes(messageText)).toString("base64"), + timestamp: BigInt(Date.now()) * BigInt(1000000) + }, + async (res) => res.status === 200 + ); + + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + false + ); + }); + + it("Check message with no content topic is not received", async function () { + await waku.nextFilter.subscribe( + TestDecoder, + serviceNodes.messageCollector.callback + ); + await delay(400); + + await serviceNodes.sendRelayMessage( + { + payload: Buffer.from(utf8ToBytes(messageText)).toString("base64"), + timestamp: BigInt(Date.now()) * BigInt(1000000) + }, + TestPubsubTopic + ); + + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + false + ); + }); + + it("Check message with no payload is not received", async function () { + await waku.nextFilter.subscribe( + TestDecoder, + serviceNodes.messageCollector.callback + ); + await delay(400); + + await serviceNodes.sendRelayMessage( + { + contentTopic: TestContentTopic, + timestamp: BigInt(Date.now()) * BigInt(1000000), + payload: undefined as any + }, + TestPubsubTopic + ); + + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + false + ); + }); + + it("Check message with non string payload is not received", async function () { + await waku.nextFilter.subscribe( + TestDecoder, + serviceNodes.messageCollector.callback + ); + await delay(400); + + await serviceNodes.sendRelayMessage( + { + contentTopic: TestContentTopic, + payload: 12345 as unknown as string, + timestamp: BigInt(Date.now()) * BigInt(1000000) + }, + TestPubsubTopic + ); + + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + false + ); + }); + + it("Check message received after jswaku node is restarted", async function () { + await waku.nextFilter.subscribe( + TestDecoder, + serviceNodes.messageCollector.callback + ); + await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + true + ); + + await waku.stop(); + expect(waku.isStarted()).to.eq(false); + await waku.start(); + expect(waku.isStarted()).to.eq(true); + + for (const node of serviceNodes.nodes) { + await waku.dial(await node.getMultiaddrWithId()); + await waku.waitForPeers([Protocols.Filter, Protocols.LightPush]); + } + + await waku.nextFilter.subscribe( + TestDecoder, + serviceNodes.messageCollector.callback + ); + + await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") }); + + expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq( + true + ); + serviceNodes.messageCollector.verifyReceivedMessage(0, { + expectedMessageText: "M1", + expectedContentTopic: TestContentTopic, + expectedPubsubTopic: TestPubsubTopic + }); + serviceNodes.messageCollector.verifyReceivedMessage(1, { + expectedMessageText: "M2", + expectedContentTopic: TestContentTopic, + expectedPubsubTopic: TestPubsubTopic + }); + }); + + it("Check message received after old nwaku nodes are not available and new are created", async function () { + let callback = serviceNodes.messageCollector.callback; + + await waku.nextFilter.subscribe(TestDecoder, (...args) => + callback(...args) + ); + + await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); + + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + true + ); + serviceNodes.messageCollector.verifyReceivedMessage(0, { + expectedMessageText: "M1", + expectedContentTopic: TestContentTopic, + expectedPubsubTopic: TestPubsubTopic + }); + + await teardownNodesWithRedundancy(serviceNodes, []); + serviceNodes = await ServiceNodesFleet.createAndRun( + ctx, + 2, + false, + TestShardInfo, + { + lightpush: true, + filter: true + }, + false + ); + + callback = serviceNodes.messageCollector.callback; + + const peerConnectEvent = new Promise((resolve, reject) => { + waku.libp2p.addEventListener("peer:connect", (e) => { + resolve(e); + }); + setTimeout(() => reject, 1000); + }); + + for (const node of serviceNodes.nodes) { + await waku.dial(await node.getMultiaddrWithId()); + await waku.waitForPeers([Protocols.Filter, Protocols.LightPush]); + } + + await peerConnectEvent; + + await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") }); + + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + true + ); + serviceNodes.messageCollector.verifyReceivedMessage(1, { + expectedMessageText: "M2", + expectedContentTopic: TestContentTopic, + expectedPubsubTopic: TestPubsubTopic + }); + }); + }); +}; + +[true, false].map(runTests); diff --git a/packages/tests/tests/filter next/subscribe.node.spec.ts b/packages/tests/tests/filter next/subscribe.node.spec.ts new file mode 100644 index 0000000000..0fff4802cf --- /dev/null +++ b/packages/tests/tests/filter next/subscribe.node.spec.ts @@ -0,0 +1,668 @@ +import { createDecoder, createEncoder, DecodedMessage } from "@waku/core"; +import { IDecoder, LightNode } from "@waku/interfaces"; +import { + ecies, + generatePrivateKey, + generateSymmetricKey, + getPublicKey, + symmetric +} from "@waku/message-encryption"; +import { Protocols, utf8ToBytes } from "@waku/sdk"; +import { expect } from "chai"; + +import { + afterEachCustom, + beforeEachCustom, + delay, + generateTestData, + makeLogFileName, + MessageCollector, + runMultipleNodes, + ServiceNode, + ServiceNodesFleet, + tearDownNodes, + teardownNodesWithRedundancy, + TEST_STRING, + waitForConnections +} from "../../src/index.js"; + +import { + ClusterId, + messagePayload, + messageText, + ShardIndex, + TestContentTopic, + TestDecoder, + TestEncoder, + TestPubsubTopic, + TestShardInfo +} from "./utils.js"; + +const runTests = (strictCheckNodes: boolean): void => { + describe(`Waku Filter Next: Subscribe: Multiple Service Nodes: Strict Check mode: ${strictCheckNodes}`, function () { + this.timeout(100000); + let waku: LightNode; + let serviceNodes: ServiceNodesFleet; + + beforeEachCustom(this, async () => { + [serviceNodes, waku] = await runMultipleNodes( + this.ctx, + TestShardInfo, + undefined, + strictCheckNodes + ); + }); + + afterEachCustom(this, async () => { + await teardownNodesWithRedundancy(serviceNodes, waku); + }); + + it("Subscribe and receive messages via lightPush", async function () { + expect(waku.libp2p.getConnections()).has.length(2); + + await waku.nextFilter.subscribe( + TestDecoder, + serviceNodes.messageCollector.callback + ); + + await waku.lightPush.send(TestEncoder, messagePayload); + + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + true + ); + serviceNodes.messageCollector.verifyReceivedMessage(0, { + expectedMessageText: messageText, + expectedContentTopic: TestContentTopic + }); + + await serviceNodes.confirmMessageLength(1); + }); + + it("Subscribe and receive ecies encrypted messages via lightPush", async function () { + const privateKey = generatePrivateKey(); + const publicKey = getPublicKey(privateKey); + const encoder = ecies.createEncoder({ + contentTopic: TestContentTopic, + publicKey, + pubsubTopic: TestPubsubTopic + }); + const decoder = ecies.createDecoder( + TestContentTopic, + privateKey, + TestPubsubTopic + ); + + await waku.nextFilter.subscribe( + decoder, + serviceNodes.messageCollector.callback + ); + + await waku.lightPush.send(encoder, messagePayload); + + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + true + ); + serviceNodes.messageCollector.verifyReceivedMessage(0, { + expectedMessageText: messageText, + expectedContentTopic: TestContentTopic, + expectedVersion: 1, + expectedPubsubTopic: TestPubsubTopic + }); + + await serviceNodes.confirmMessageLength(2); + }); + + it("Subscribe and receive symmetrically encrypted messages via lightPush", async function () { + const symKey = generateSymmetricKey(); + const encoder = symmetric.createEncoder({ + contentTopic: TestContentTopic, + symKey, + pubsubTopic: TestPubsubTopic + }); + const decoder = symmetric.createDecoder( + TestContentTopic, + symKey, + TestPubsubTopic + ); + + await waku.nextFilter.subscribe( + decoder, + serviceNodes.messageCollector.callback + ); + + await waku.lightPush.send(encoder, messagePayload); + + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + true + ); + serviceNodes.messageCollector.verifyReceivedMessage(0, { + expectedMessageText: messageText, + expectedContentTopic: TestContentTopic, + expectedVersion: 1, + expectedPubsubTopic: TestPubsubTopic + }); + + await serviceNodes.confirmMessageLength(2); + }); + + it("Subscribe and receive messages via waku relay post", async function () { + await waku.nextFilter.subscribe( + TestDecoder, + serviceNodes.messageCollector.callback + ); + + await delay(400); + + // Send a test message using the relay post method. + const relayMessage = ServiceNodesFleet.toMessageRpcQuery({ + contentTopic: TestContentTopic, + payload: utf8ToBytes(messageText) + }); + await serviceNodes.sendRelayMessage(relayMessage, TestPubsubTopic); + + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + true + ); + serviceNodes.messageCollector.verifyReceivedMessage(0, { + expectedMessageText: messageText, + expectedContentTopic: TestContentTopic, + expectedPubsubTopic: TestPubsubTopic + }); + + await serviceNodes.confirmMessageLength(1); + }); + + it("Subscribe and receive 2 messages on the same topic", async function () { + await waku.nextFilter.subscribe( + TestDecoder, + serviceNodes.messageCollector.callback + ); + + await waku.lightPush.send(TestEncoder, messagePayload); + + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + true + ); + serviceNodes.messageCollector.verifyReceivedMessage(0, { + expectedMessageText: messageText, + expectedContentTopic: TestContentTopic + }); + + // Send another message on the same topic. + const newMessageText = "Filtering still works!"; + await waku.lightPush.send(TestEncoder, { + payload: utf8ToBytes(newMessageText) + }); + + // Verify that the second message was successfully received. + expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq( + true + ); + serviceNodes.messageCollector.verifyReceivedMessage(1, { + expectedMessageText: newMessageText, + expectedContentTopic: TestContentTopic + }); + + await serviceNodes.confirmMessageLength(2); + }); + + it("Subscribe and receive messages on 2 different content topics", async function () { + // Subscribe to the first content topic and send a message. + await waku.nextFilter.subscribe( + TestDecoder, + serviceNodes.messageCollector.callback + ); + await waku.lightPush.send(TestEncoder, messagePayload); + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + true + ); + serviceNodes.messageCollector.verifyReceivedMessage(0, { + expectedMessageText: messageText, + expectedContentTopic: TestContentTopic, + expectedPubsubTopic: TestPubsubTopic + }); + + // Modify subscription to include a new content topic and send a message. + const newMessageText = "Filtering still works!"; + const newMessagePayload = { payload: utf8ToBytes(newMessageText) }; + const newContentTopic = "/test/2/waku-filter/default"; + const newEncoder = createEncoder({ + contentTopic: newContentTopic, + pubsubTopic: TestPubsubTopic + }); + const newDecoder = createDecoder(newContentTopic, TestPubsubTopic); + await waku.nextFilter.subscribe( + newDecoder, + serviceNodes.messageCollector.callback + ); + await waku.lightPush.send(newEncoder, { + payload: utf8ToBytes(newMessageText) + }); + expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq( + true + ); + serviceNodes.messageCollector.verifyReceivedMessage(1, { + expectedContentTopic: newContentTopic, + expectedMessageText: newMessageText, + expectedPubsubTopic: TestPubsubTopic + }); + + // Send another message on the initial content topic to verify it still works. + await waku.lightPush.send(TestEncoder, newMessagePayload); + expect(await serviceNodes.messageCollector.waitForMessages(3)).to.eq( + true + ); + serviceNodes.messageCollector.verifyReceivedMessage(2, { + expectedMessageText: newMessageText, + expectedContentTopic: TestContentTopic, + expectedPubsubTopic: TestPubsubTopic + }); + + await serviceNodes.confirmMessageLength(3); + }); + + it("Subscribe and receives messages on 20 topics", async function () { + const topicCount = 20; + const td = generateTestData(topicCount, { pubsubTopic: TestPubsubTopic }); + + // Subscribe to all 20 topics. + for (let i = 0; i < topicCount; i++) { + await waku.nextFilter.subscribe( + td.decoders[i], + serviceNodes.messageCollector.callback + ); + } + + // Send a unique message on each topic. + for (let i = 0; i < topicCount; i++) { + await waku.lightPush.send(td.encoders[i], { + payload: utf8ToBytes(`Message for Topic ${i + 1}`) + }); + } + + // Verify that each message was received on the corresponding topic. + expect(await serviceNodes.messageCollector.waitForMessages(20)).to.eq( + true + ); + td.contentTopics.forEach((topic, index) => { + serviceNodes.messageCollector.verifyReceivedMessage(index, { + expectedContentTopic: topic, + expectedMessageText: `Message for Topic ${index + 1}`, + expectedPubsubTopic: TestPubsubTopic + }); + }); + }); + + // skip for now, will be enabled once old Filter is removed as it exausts amount of streams avaialble + it.skip("Subscribe to 30 topics in separate streams (30 streams for Filter is limit) at once and receives messages", async function () { + this.timeout(100_000); + const topicCount = 30; + const td = generateTestData(topicCount, { pubsubTopic: TestPubsubTopic }); + + for (let i = 0; i < topicCount; i++) { + await waku.nextFilter.subscribe( + td.decoders[i], + serviceNodes.messageCollector.callback + ); + } + + // Send a unique message on each topic. + for (let i = 0; i < topicCount; i++) { + await waku.lightPush.send(td.encoders[i], { + payload: utf8ToBytes(`Message for Topic ${i + 1}`) + }); + } + + // Verify that each message was received on the corresponding topic. + expect( + await serviceNodes.messageCollector.waitForMessages(topicCount) + ).to.eq(true); + td.contentTopics.forEach((topic, index) => { + serviceNodes.messageCollector.verifyReceivedMessage(index, { + expectedContentTopic: topic, + expectedMessageText: `Message for Topic ${index + 1}`, + expectedPubsubTopic: TestPubsubTopic + }); + }); + }); + + it("Subscribe to 100 topics (new limit) at once and receives messages", async function () { + this.timeout(100_000); + const topicCount = 100; + const td = generateTestData(topicCount, { pubsubTopic: TestPubsubTopic }); + + await waku.nextFilter.subscribe( + td.decoders, + serviceNodes.messageCollector.callback + ); + + // Send a unique message on each topic. + for (let i = 0; i < topicCount; i++) { + await waku.lightPush.send(td.encoders[i], { + payload: utf8ToBytes(`Message for Topic ${i + 1}`) + }); + } + + // Verify that each message was received on the corresponding topic. + expect( + await serviceNodes.messageCollector.waitForMessages(topicCount) + ).to.eq(true); + td.contentTopics.forEach((topic, index) => { + serviceNodes.messageCollector.verifyReceivedMessage(index, { + expectedContentTopic: topic, + expectedMessageText: `Message for Topic ${index + 1}`, + expectedPubsubTopic: TestPubsubTopic + }); + }); + }); + + it("Error when try to subscribe to more than 101 topics (new limit)", async function () { + const topicCount = 101; + const td = generateTestData(topicCount, { pubsubTopic: TestPubsubTopic }); + + try { + await waku.nextFilter.subscribe( + td.decoders, + serviceNodes.messageCollector.callback + ); + } catch (err) { + if ( + err instanceof Error && + err.message.includes( + `exceeds maximum content topics: ${topicCount - 1}` + ) + ) { + return; + } else { + throw err; + } + } + }); + + it("Overlapping topic subscription", async function () { + // Define two sets of test data with overlapping topics. + const topicCount1 = 2; + const td1 = generateTestData(topicCount1, { + pubsubTopic: TestPubsubTopic + }); + + const topicCount2 = 4; + const td2 = generateTestData(topicCount2, { + pubsubTopic: TestPubsubTopic + }); + + await waku.nextFilter.subscribe( + td1.decoders, + serviceNodes.messageCollector.callback + ); + + // Subscribe to the second set of topics which has overlapping topics with the first set. + await waku.nextFilter.subscribe( + td2.decoders, + serviceNodes.messageCollector.callback + ); + + // Send messages to the first set of topics. + for (let i = 0; i < topicCount1; i++) { + const messageText = `Topic Set 1: Message Number: ${i + 1}`; + await waku.lightPush.send(td1.encoders[i], { + payload: utf8ToBytes(messageText) + }); + } + + // Send messages to the second set of topics. + for (let i = 0; i < topicCount2; i++) { + const messageText = `Topic Set 2: Message Number: ${i + 1}`; + await waku.lightPush.send(td2.encoders[i], { + payload: utf8ToBytes(messageText) + }); + } + + // Since there are overlapping topics, there should be 10 messages in total because overlaping decoders handle them + expect( + await serviceNodes.messageCollector.waitForMessages(10, { exact: true }) + ).to.eq(true); + }); + + it("Refresh subscription", async function () { + await waku.nextFilter.subscribe( + TestDecoder, + serviceNodes.messageCollector.callback + ); + await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); + + // Resubscribe (refresh) to the same topic and send another message. + await waku.nextFilter.subscribe( + TestDecoder, + serviceNodes.messageCollector.callback + ); + await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") }); + + // Confirm both messages were received. + expect( + await serviceNodes.messageCollector.waitForMessages(2, { exact: true }) + ).to.eq(true); + serviceNodes.messageCollector.verifyReceivedMessage(0, { + expectedMessageText: "M1", + expectedContentTopic: TestContentTopic, + expectedPubsubTopic: TestPubsubTopic + }); + serviceNodes.messageCollector.verifyReceivedMessage(1, { + expectedMessageText: "M2", + expectedContentTopic: TestContentTopic, + expectedPubsubTopic: TestPubsubTopic + }); + }); + + TEST_STRING.forEach((testItem) => { + it(`Subscribe to topic containing ${testItem.description} and receive message`, async function () { + const newContentTopic = testItem.value; + const newEncoder = waku.createEncoder({ + contentTopic: newContentTopic, + shardInfo: { + clusterId: ClusterId, + shard: ShardIndex + } + }); + const newDecoder = waku.createDecoder({ + contentTopic: newContentTopic, + shardInfo: { + clusterId: ClusterId, + shard: ShardIndex + } + }); + + await waku.nextFilter.subscribe( + newDecoder as IDecoder, + serviceNodes.messageCollector.callback + ); + await waku.lightPush.send(newEncoder, messagePayload); + + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + true + ); + serviceNodes.messageCollector.verifyReceivedMessage(0, { + expectedMessageText: messageText, + expectedContentTopic: newContentTopic, + expectedPubsubTopic: TestPubsubTopic + }); + }); + }); + + it("Add multiple subscription objects on single nwaku node", async function () { + await waku.nextFilter.subscribe( + TestDecoder, + serviceNodes.messageCollector.callback + ); + await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); + + const newContentTopic = "/test/2/waku-filter/default"; + const newEncoder = createEncoder({ + contentTopic: newContentTopic, + pubsubTopic: TestPubsubTopic + }); + const newDecoder = createDecoder(newContentTopic, TestPubsubTopic); + await waku.nextFilter.subscribe( + newDecoder, + serviceNodes.messageCollector.callback + ); + + await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M2") }); + + // Check if both messages were received + expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq( + true + ); + serviceNodes.messageCollector.verifyReceivedMessage(0, { + expectedMessageText: "M1", + expectedContentTopic: TestContentTopic, + expectedPubsubTopic: TestPubsubTopic + }); + serviceNodes.messageCollector.verifyReceivedMessage(1, { + expectedContentTopic: newContentTopic, + expectedMessageText: "M2", + expectedPubsubTopic: TestPubsubTopic + }); + }); + + it("Renews subscription after lossing a connection", async function () { + // setup check + expect(waku.libp2p.getConnections()).has.length(2); + + await waku.nextFilter.subscribe( + TestDecoder, + serviceNodes.messageCollector.callback + ); + + await waku.lightPush.send(TestEncoder, messagePayload); + + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + true + ); + serviceNodes.messageCollector.verifyReceivedMessage(0, { + expectedMessageText: messageText, + expectedContentTopic: TestContentTopic + }); + + await serviceNodes.confirmMessageLength(1); + + // check renew logic + const nwakuPeers = await Promise.all( + serviceNodes.nodes.map((v) => v.getMultiaddrWithId()) + ); + await Promise.all(nwakuPeers.map((v) => waku.libp2p.hangUp(v))); + + expect(waku.libp2p.getConnections().length).eq(0); + + await Promise.all(nwakuPeers.map((v) => waku.libp2p.dial(v))); + await waitForConnections(nwakuPeers.length, waku); + + const testText = "second try"; + await waku.lightPush.send(TestEncoder, { + payload: utf8ToBytes(testText) + }); + + expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq( + true + ); + serviceNodes.messageCollector.verifyReceivedMessage(1, { + expectedMessageText: testText, + expectedContentTopic: TestContentTopic + }); + }); + + it("Subscribe and receive messages from 2 nwaku nodes each with different pubsubtopics", async function () { + await waku.nextFilter.subscribe( + TestDecoder, + serviceNodes.messageCollector.callback + ); + + // Set up and start a new nwaku node with customPubsubTopic1 + const nwaku2 = new ServiceNode(makeLogFileName(this) + "3"); + + try { + const customContentTopic = "/test/4/waku-filter/default"; + const customDecoder = createDecoder(customContentTopic, { + clusterId: ClusterId, + shard: 4 + }); + const customEncoder = createEncoder({ + contentTopic: customContentTopic, + pubsubTopicShardInfo: { clusterId: ClusterId, shard: 4 } + }); + + await nwaku2.start({ + filter: true, + lightpush: true, + relay: true, + clusterId: ClusterId, + shard: [4] + }); + await waku.dial(await nwaku2.getMultiaddrWithId()); + await waku.waitForPeers([Protocols.Filter, Protocols.LightPush]); + + await nwaku2.ensureSubscriptions([customDecoder.pubsubTopic]); + + const messageCollector2 = new MessageCollector(); + + await waku.nextFilter.subscribe( + customDecoder, + messageCollector2.callback + ); + + // Making sure that messages are send and reveiced for both subscriptions + // While loop is done because of https://github.com/waku-org/js-waku/issues/1606 + while ( + !(await serviceNodes.messageCollector.waitForMessages(1, { + pubsubTopic: TestDecoder.pubsubTopic + })) || + !(await messageCollector2.waitForMessages(1, { + pubsubTopic: customDecoder.pubsubTopic + })) + ) { + await waku.lightPush.send(TestEncoder, { + payload: utf8ToBytes("M1") + }); + await waku.lightPush.send(customEncoder, { + payload: utf8ToBytes("M2") + }); + } + + serviceNodes.messageCollector.verifyReceivedMessage(0, { + expectedContentTopic: TestDecoder.contentTopic, + expectedPubsubTopic: TestDecoder.pubsubTopic, + expectedMessageText: "M1" + }); + + messageCollector2.verifyReceivedMessage(0, { + expectedContentTopic: customDecoder.contentTopic, + expectedPubsubTopic: customDecoder.pubsubTopic, + expectedMessageText: "M2" + }); + } catch (e) { + await tearDownNodes([nwaku2], []); + } + }); + + it("Should fail to subscribe with decoder with wrong shard", async function () { + const wrongDecoder = createDecoder(TestDecoder.contentTopic, { + clusterId: ClusterId, + shard: 5 + }); + + // this subscription object is set up with the `customPubsubTopic1` but we're passing it a Decoder with the `customPubsubTopic2` + try { + await waku.nextFilter.subscribe( + wrongDecoder, + serviceNodes.messageCollector.callback + ); + } catch (error) { + expect((error as Error).message).to.include( + `Pubsub topic ${wrongDecoder.pubsubTopic} has not been configured on this instance.` + ); + } + }); + }); +}; + +[true, false].map((strictCheckNodes) => runTests(strictCheckNodes)); diff --git a/packages/tests/tests/filter next/unsubscribe.node.spec.ts b/packages/tests/tests/filter next/unsubscribe.node.spec.ts new file mode 100644 index 0000000000..ef6e9bf93e --- /dev/null +++ b/packages/tests/tests/filter next/unsubscribe.node.spec.ts @@ -0,0 +1,214 @@ +import { createDecoder, createEncoder } from "@waku/core"; +import { type LightNode } from "@waku/interfaces"; +import { utf8ToBytes } from "@waku/sdk"; +import { expect } from "chai"; + +import { + afterEachCustom, + beforeEachCustom, + generateTestData, + runMultipleNodes, + ServiceNodesFleet, + teardownNodesWithRedundancy +} from "../../src/index.js"; + +import { + ClusterId, + messagePayload, + messageText, + TestContentTopic, + TestDecoder, + TestEncoder, + TestPubsubTopic +} from "./utils.js"; + +const runTests = (strictCheckNodes: boolean): void => { + describe(`Waku Filter Next: Unsubscribe: Multiple Nodes: Strict Checking: ${strictCheckNodes}`, function () { + // Set the timeout for all tests in this suite. Can be overwritten at test level + this.timeout(10000); + let waku: LightNode; + let serviceNodes: ServiceNodesFleet; + + beforeEachCustom(this, async () => { + [serviceNodes, waku] = await runMultipleNodes( + this.ctx, + { + contentTopics: [TestContentTopic], + clusterId: ClusterId + }, + { filter: true, lightpush: true } + ); + }); + + afterEachCustom(this, async () => { + await teardownNodesWithRedundancy(serviceNodes, waku); + }); + + it("Unsubscribe 1 topic - node subscribed to 1 topic", async function () { + await waku.nextFilter.subscribe( + TestDecoder, + serviceNodes.messageCollector.callback + ); + + await waku.lightPush.send(TestEncoder, messagePayload); + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + true + ); + + await waku.nextFilter.unsubscribe(TestDecoder); + await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); + expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq( + false + ); + + serviceNodes.messageCollector.verifyReceivedMessage(0, { + expectedMessageText: messageText, + expectedContentTopic: TestContentTopic + }); + expect(serviceNodes.messageCollector.count).to.eq(1); + + await serviceNodes.confirmMessageLength(2); + }); + + it("Unsubscribe 1 topic - node subscribed to 2 topics", async function () { + // Subscribe to 2 topics and send messages + await waku.nextFilter.subscribe( + TestDecoder, + serviceNodes.messageCollector.callback + ); + + const newContentTopic = "/test/2/waku-filter"; + const newEncoder = createEncoder({ + contentTopic: newContentTopic, + pubsubTopic: TestPubsubTopic + }); + const newDecoder = createDecoder(newContentTopic, TestPubsubTopic); + await waku.nextFilter.subscribe( + newDecoder, + serviceNodes.messageCollector.callback + ); + await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); + await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M2") }); + expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq( + true + ); + + // Unsubscribe from the first topic and send again + await waku.nextFilter.unsubscribe(TestDecoder); + await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M3") }); + await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M4") }); + expect(await serviceNodes.messageCollector.waitForMessages(3)).to.eq( + true + ); + + // Check that from 4 messages send 3 were received + expect(serviceNodes.messageCollector.count).to.eq(3); + await serviceNodes.confirmMessageLength(4); + }); + + it("Unsubscribe 2 topics - node subscribed to 2 topics", async function () { + // Subscribe to 2 topics and send messages + await waku.nextFilter.subscribe( + TestDecoder, + serviceNodes.messageCollector.callback + ); + const newContentTopic = "/test/2/waku-filter"; + const newEncoder = createEncoder({ + contentTopic: newContentTopic, + pubsubTopic: TestPubsubTopic + }); + const newDecoder = createDecoder(newContentTopic, TestPubsubTopic); + await waku.nextFilter.subscribe( + newDecoder, + serviceNodes.messageCollector.callback + ); + + await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); + await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M2") }); + expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq( + true + ); + + // Unsubscribe from both and send again + await waku.nextFilter.unsubscribe(TestDecoder); + await waku.nextFilter.unsubscribe(newDecoder); + await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M3") }); + await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M4") }); + expect(await serviceNodes.messageCollector.waitForMessages(3)).to.eq( + false + ); + + // Check that from 4 messages send 2 were received + expect(serviceNodes.messageCollector.count).to.eq(2); + await serviceNodes.confirmMessageLength(4); + }); + + it("Unsubscribe topics the node is not subscribed to", async function () { + // Subscribe to 1 topic and send message + await waku.nextFilter.subscribe( + TestDecoder, + serviceNodes.messageCollector.callback + ); + + await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + true + ); + + expect(serviceNodes.messageCollector.count).to.eq(1); + + // Unsubscribe from topics that the node is not not subscribed to and send again + await waku.nextFilter.unsubscribe( + createDecoder("/test/2/waku-filter", TestDecoder.pubsubTopic) + ); + await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") }); + expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq( + true + ); + + // Check that both messages were received + expect(serviceNodes.messageCollector.count).to.eq(2); + await serviceNodes.confirmMessageLength(2); + }); + + it("Unsubscribe from 100 topics (new limit) at once and receives messages", async function () { + this.timeout(100_000); + const topicCount = 100; + const td = generateTestData(topicCount, { pubsubTopic: TestPubsubTopic }); + + await waku.nextFilter.subscribe( + td.decoders, + serviceNodes.messageCollector.callback + ); + + for (let i = 0; i < topicCount; i++) { + await waku.lightPush.send(td.encoders[i], { + payload: utf8ToBytes(`Message for Topic ${i + 1}`) + }); + } + + expect( + await serviceNodes.messageCollector.waitForMessages(topicCount) + ).to.eq(true); + td.contentTopics.forEach((topic, index) => { + serviceNodes.messageCollector.verifyReceivedMessage(index, { + expectedContentTopic: topic, + expectedMessageText: `Message for Topic ${index + 1}`, + expectedPubsubTopic: TestPubsubTopic + }); + }); + + await waku.nextFilter.unsubscribe(td.decoders); + + for (let i = 0; i < topicCount; i++) { + await waku.lightPush.send(td.encoders[i], { + payload: utf8ToBytes(`Message for Topic ${i + 1}`) + }); + } + + expect(serviceNodes.messageCollector.count).to.eq(100); + }); + }); +}; + +[true, false].map(runTests); diff --git a/packages/tests/tests/filter next/utils.ts b/packages/tests/tests/filter next/utils.ts new file mode 100644 index 0000000000..86e8ef2ce0 --- /dev/null +++ b/packages/tests/tests/filter next/utils.ts @@ -0,0 +1,166 @@ +import { createDecoder, createEncoder } from "@waku/core"; +import { + CreateNodeOptions, + DefaultNetworkConfig, + ISubscription, + IWaku, + LightNode, + NetworkConfig, + Protocols +} from "@waku/interfaces"; +import { createLightNode } from "@waku/sdk"; +import { + contentTopicToPubsubTopic, + contentTopicToShardIndex, + derivePubsubTopicsFromNetworkConfig, + Logger +} from "@waku/utils"; +import { utf8ToBytes } from "@waku/utils/bytes"; +import { Context } from "mocha"; +import pRetry from "p-retry"; + +import { + NOISE_KEY_1, + ServiceNodesFleet, + waitForConnections +} from "../../src/index.js"; + +// Constants for test configuration. +export const log = new Logger("test:filter"); +export const TestContentTopic = "/test/1/waku-filter/default"; +export const ClusterId = 2; +export const ShardIndex = contentTopicToShardIndex(TestContentTopic); +export const TestShardInfo = { + contentTopics: [TestContentTopic], + clusterId: ClusterId +}; +export const TestPubsubTopic = contentTopicToPubsubTopic( + TestContentTopic, + ClusterId +); +export const TestEncoder = createEncoder({ + contentTopic: TestContentTopic, + pubsubTopic: TestPubsubTopic +}); +export const TestDecoder = createDecoder(TestContentTopic, TestPubsubTopic); +export const messageText = "Filtering works!"; +export const messagePayload = { payload: utf8ToBytes(messageText) }; + +// Utility to validate errors related to pings in the subscription. +export async function validatePingError( + subscription: ISubscription +): Promise { + try { + const { failures, successes } = await subscription.ping(); + if (failures.length === 0 || successes.length > 0) { + throw new Error( + "Ping was successful but was expected to fail with a specific error." + ); + } + } catch (err) { + if ( + err instanceof Error && + err.message.includes("peer has no subscriptions") + ) { + return; + } else { + throw err; + } + } +} + +export async function runMultipleNodes( + context: Context, + networkConfig: NetworkConfig = DefaultNetworkConfig, + strictChecking: boolean = false, + numServiceNodes = 3, + withoutFilter = false +): Promise<[ServiceNodesFleet, LightNode]> { + const pubsubTopics = derivePubsubTopicsFromNetworkConfig(networkConfig); + // create numServiceNodes nodes + const serviceNodes = await ServiceNodesFleet.createAndRun( + context, + numServiceNodes, + strictChecking, + networkConfig, + undefined, + withoutFilter + ); + + const wakuOptions: CreateNodeOptions = { + staticNoiseKey: NOISE_KEY_1, + libp2p: { + addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } + } + }; + + log.info("Starting js waku node with :", JSON.stringify(wakuOptions)); + let waku: LightNode | undefined; + try { + waku = await createLightNode(wakuOptions); + await waku.start(); + } catch (error) { + log.error("jswaku node failed to start:", error); + } + + if (!waku) { + throw new Error("Failed to initialize waku"); + } + + for (const node of serviceNodes.nodes) { + await waku.dial(await node.getMultiaddrWithId()); + await waku.waitForPeers([Protocols.Filter, Protocols.LightPush]); + await node.ensureSubscriptions(pubsubTopics); + + const wakuConnections = waku.libp2p.getConnections(); + + if (wakuConnections.length < 1) { + throw new Error(`Expected at least 1 connection for js-waku.`); + } + + await node.waitForLog(waku.libp2p.peerId.toString(), 100); + } + + await waitForConnections(numServiceNodes, waku); + + return [serviceNodes, waku]; +} + +export async function teardownNodesWithRedundancy( + serviceNodes: ServiceNodesFleet, + wakuNodes: IWaku | IWaku[] +): Promise { + const wNodes = Array.isArray(wakuNodes) ? wakuNodes : [wakuNodes]; + + const stopNwakuNodes = serviceNodes.nodes.map(async (node) => { + await pRetry( + async () => { + try { + await node.stop(); + } catch (error) { + log.error("Service Node failed to stop:", error); + throw error; + } + }, + { retries: 3 } + ); + }); + + const stopWakuNodes = wNodes.map(async (waku) => { + if (waku) { + await pRetry( + async () => { + try { + await waku.stop(); + } catch (error) { + log.error("Waku failed to stop:", error); + throw error; + } + }, + { retries: 3 } + ); + } + }); + + await Promise.all([...stopNwakuNodes, ...stopWakuNodes]); +}