diff --git a/packages/interfaces/src/filter.ts b/packages/interfaces/src/filter.ts index 2b770b30c1..67946a02a9 100644 --- a/packages/interfaces/src/filter.ts +++ b/packages/interfaces/src/filter.ts @@ -1,15 +1,14 @@ import type { PeerId } from "@libp2p/interface"; import type { IDecodedMessage, IDecoder } from "./message.js"; -import type { ContentTopic, PubsubTopic, ThisOrThat } from "./misc.js"; +import type { ContentTopic, ThisOrThat } from "./misc.js"; import type { Callback, IBaseProtocolCore, IBaseProtocolSDK, ProtocolError, ProtocolUseOptions, - SDKProtocolResult, - ShardingParams + SDKProtocolResult } from "./protocols.js"; import type { IReceiver } from "./receiver.js"; @@ -37,12 +36,28 @@ export interface ISubscriptionSDK { export type IFilterSDK = IReceiver & IBaseProtocolSDK & { protocol: IBaseProtocolCore } & { - createSubscription( - pubsubTopicShardInfo?: ShardingParams | PubsubTopic, - options?: ProtocolUseOptions - ): Promise; + subscribe( + decoders: IDecoder | IDecoder[], + callback: Callback, + protocolUseOptions?: ProtocolUseOptions, + subscribeOptions?: SubscribeOptions + ): Promise; }; +export type SubscribeResult = SubscriptionSuccess | SubscriptionError; + +type SubscriptionSuccess = { + subscription: ISubscriptionSDK; + error: null; + results: SDKProtocolResult; +}; + +type SubscriptionError = { + subscription: null; + error: ProtocolError; + results: null; +}; + export type CreateSubscriptionResult = ThisOrThat< "subscription", ISubscriptionSDK, diff --git a/packages/interfaces/src/protocols.ts b/packages/interfaces/src/protocols.ts index a9bdeda2e1..6d0a051513 100644 --- a/packages/interfaces/src/protocols.ts +++ b/packages/interfaces/src/protocols.ts @@ -173,6 +173,11 @@ export enum ProtocolError { * Ensure that the pubsub topic used for decoder creation is the same as the one used for protocol. */ TOPIC_DECODER_MISMATCH = "Topic decoder mismatch", + /** + * The topics passed in the decoders do not match each other, or don't exist at all. + * Ensure that all the pubsub topics used in the decoders are valid and match each other. + */ + INVALID_DECODER_TOPICS = "Invalid decoder topics", /** * Failure to find a peer with suitable protocols. This may due to a connection issue. * Mitigation can be: retrying after a given time period, display connectivity issue diff --git a/packages/interfaces/src/receiver.ts b/packages/interfaces/src/receiver.ts index f329460b4e..071baed171 100644 --- a/packages/interfaces/src/receiver.ts +++ b/packages/interfaces/src/receiver.ts @@ -13,8 +13,10 @@ export interface IReceiver { toSubscriptionIterator: ( decoders: IDecoder | IDecoder[] ) => Promise>; - subscribe: ( - decoders: IDecoder | IDecoder[], - callback: Callback - ) => Unsubscribe | Promise; + subscribeWithUnsubscribe: SubscribeWithUnsubscribe; } + +type SubscribeWithUnsubscribe = ( + decoders: IDecoder | IDecoder[], + callback: Callback +) => Unsubscribe | Promise; diff --git a/packages/relay/src/index.ts b/packages/relay/src/index.ts index c8c1f39238..6724ae344c 100644 --- a/packages/relay/src/index.ts +++ b/packages/relay/src/index.ts @@ -148,7 +148,7 @@ class Relay implements IRelay { }; } - public subscribe( + public subscribeWithUnsubscribe( decoders: IDecoder | IDecoder[], callback: Callback ): () => void { @@ -171,6 +171,8 @@ class Relay implements IRelay { }; } + public subscribe = this.subscribeWithUnsubscribe; + private removeObservers( observers: Array<[PubsubTopic, Observer]> ): void { diff --git a/packages/sdk/src/protocols/filter.ts b/packages/sdk/src/protocols/filter.ts index c40661a728..36a3d6f820 100644 --- a/packages/sdk/src/protocols/filter.ts +++ b/packages/sdk/src/protocols/filter.ts @@ -21,6 +21,7 @@ import { type SDKProtocolResult, type ShardingParams, type SubscribeOptions, + SubscribeResult, type Unsubscribe } from "@waku/interfaces"; import { messageHashStr } from "@waku/message-hash"; @@ -448,19 +449,89 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK { this.activeSubscriptions = new Map(); } - //TODO: move to SubscriptionManager - private getActiveSubscription( - pubsubTopic: PubsubTopic - ): SubscriptionManager | undefined { - return this.activeSubscriptions.get(pubsubTopic); - } + /** + * Opens a subscription with the Filter protocol using the provided decoders and callback. + * This method combines the functionality of creating a subscription and subscribing to it. + * + * @param {IDecoder | IDecoder[]} decoders - A single decoder or an array of decoders to use for decoding messages. + * @param {Callback} callback - The callback function to be invoked with decoded messages. + * @param {ProtocolUseOptions} [protocolUseOptions] - Optional settings for using the protocol. + * @param {SubscribeOptions} [subscribeOptions=DEFAULT_SUBSCRIBE_OPTIONS] - Options for the subscription. + * + * @returns {Promise} A promise that resolves to an object containing: + * - subscription: The created subscription object if successful, or null if failed. + * - error: A ProtocolError if the subscription creation failed, or null if successful. + * - results: An object containing arrays of failures and successes from the subscription process. + * Only present if the subscription was created successfully. + * + * @throws {Error} If there's an unexpected error during the subscription process. + * + * @remarks + * This method attempts to create a subscription using the pubsub topic derived from the provided decoders, + * then tries to subscribe using the created subscription. The return value should be interpreted as follows: + * - If `subscription` is null and `error` is non-null, a critical error occurred and the subscription failed completely. + * - If `subscription` is non-null and `error` is null, the subscription was created successfully. + * In this case, check the `results` field for detailed information about successes and failures during the subscription process. + * - Even if the subscription was created successfully, there might be some failures in the results. + * + * @example + * ```typescript + * const {subscription, error, results} = await waku.filter.subscribe(decoders, callback); + * if (!subscription || error) { + * console.error("Failed to create subscription:", error); + * } + * console.log("Subscription created successfully"); + * if (results.failures.length > 0) { + * console.warn("Some errors occurred during subscription:", results.failures); + * } + * console.log("Successful subscriptions:", results.successes); + * + * ``` + */ + public async subscribe( + decoders: IDecoder | IDecoder[], + callback: Callback, + protocolUseOptions?: ProtocolUseOptions, + subscribeOptions: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS + ): Promise { + const uniquePubsubTopics = this.getUniquePubsubTopics(decoders); - private setActiveSubscription( - pubsubTopic: PubsubTopic, - subscription: SubscriptionManager - ): SubscriptionManager { - this.activeSubscriptions.set(pubsubTopic, subscription); - return subscription; + if (uniquePubsubTopics.length !== 1) { + return { + subscription: null, + error: ProtocolError.INVALID_DECODER_TOPICS, + results: null + }; + } + + const pubsubTopic = uniquePubsubTopics[0]; + + const { subscription, error } = await this.createSubscription( + pubsubTopic, + protocolUseOptions + ); + + if (error) { + return { + subscription: null, + error: error, + results: null + }; + } + + const { failures, successes } = await subscription.subscribe( + decoders, + callback, + subscribeOptions + ); + return { + subscription, + error: null, + results: { + failures: failures, + successes: successes + } + }; } /** @@ -469,7 +540,7 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK { * @param pubsubTopicShardInfo The pubsub topic to subscribe to. * @returns The subscription object. */ - public async createSubscription( + private async createSubscription( pubsubTopicShardInfo: ShardingParams | PubsubTopic, options?: ProtocolUseOptions ): Promise { @@ -516,7 +587,6 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK { }; } - //TODO: remove this dependency on IReceiver /** * This method is used to satisfy the `IReceiver` interface. * @@ -532,7 +602,7 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK { * This method should not be used directly. * Instead, use `createSubscription` to create a new subscription. */ - public async subscribe( + public async subscribeWithUnsubscribe( decoders: IDecoder | IDecoder[], callback: Callback, options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS @@ -578,6 +648,21 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK { return toAsyncIterator(this, decoders); } + //TODO: move to SubscriptionManager + private getActiveSubscription( + pubsubTopic: PubsubTopic + ): SubscriptionManager | undefined { + return this.activeSubscriptions.get(pubsubTopic); + } + + private setActiveSubscription( + pubsubTopic: PubsubTopic, + subscription: SubscriptionManager + ): SubscriptionManager { + this.activeSubscriptions.set(pubsubTopic, subscription); + return subscription; + } + private getUniquePubsubTopics( decoders: IDecoder | IDecoder[] ): string[] { diff --git a/packages/tests/tests/ephemeral.node.spec.ts b/packages/tests/tests/ephemeral.node.spec.ts index c08a4a49a1..2c11e19c5c 100644 --- a/packages/tests/tests/ephemeral.node.spec.ts +++ b/packages/tests/tests/ephemeral.node.spec.ts @@ -4,7 +4,7 @@ import { DecodedMessage, waitForRemotePeer } from "@waku/core"; -import { ISubscriptionSDK, Protocols } from "@waku/interfaces"; +import { Protocols } from "@waku/interfaces"; import type { LightNode } from "@waku/interfaces"; import { generatePrivateKey, @@ -83,8 +83,6 @@ describe("Waku Message Ephemeral field", function () { let waku: LightNode; let nwaku: ServiceNode; - let subscription: ISubscriptionSDK; - afterEachCustom(this, async () => { await tearDownNodes(nwaku, waku); }); @@ -122,11 +120,6 @@ describe("Waku Message Ephemeral field", function () { Protocols.LightPush, Protocols.Store ]); - - const { error, subscription: _subscription } = - await waku.filter.createSubscription(TestEncoder.pubsubTopic); - if (error) throw error; - subscription = _subscription; }); it("Ephemeral messages are not stored", async function () { @@ -218,7 +211,7 @@ describe("Waku Message Ephemeral field", function () { const callback = (msg: DecodedMessage): void => { messages.push(msg); }; - await subscription.subscribe([TestDecoder], callback); + await waku.filter.subscribe([TestDecoder], callback); await delay(200); const normalTxt = "Normal message"; @@ -265,7 +258,7 @@ describe("Waku Message Ephemeral field", function () { const callback = (msg: DecodedMessage): void => { messages.push(msg); }; - await subscription.subscribe([decoder], callback); + await waku.filter.subscribe([decoder], callback); await delay(200); const normalTxt = "Normal message"; @@ -316,7 +309,7 @@ describe("Waku Message Ephemeral field", function () { const callback = (msg: DecodedMessage): void => { messages.push(msg); }; - await subscription.subscribe([decoder], callback); + await waku.filter.subscribe([decoder], callback); await delay(200); const normalTxt = "Normal message"; diff --git a/packages/tests/tests/filter/peer_management.spec.ts b/packages/tests/tests/filter/peer_management.spec.ts index f78d83f69e..dee9aa33d9 100644 --- a/packages/tests/tests/filter/peer_management.spec.ts +++ b/packages/tests/tests/filter/peer_management.spec.ts @@ -28,7 +28,6 @@ describe("Waku Filter: Peer Management: E2E", function () { this.timeout(15000); let waku: LightNode; let serviceNodes: ServiceNodesFleet; - let subscription: ISubscriptionSDK; const contentTopic = "/test"; @@ -47,13 +46,6 @@ describe("Waku Filter: Peer Management: E2E", function () { undefined, 5 ); - const { error, subscription: sub } = await waku.filter.createSubscription( - DefaultTestPubsubTopic - ); - if (!sub || error) { - throw new Error("Could not create subscription"); - } - subscription = sub; }); afterEachCustom(this, async () => { @@ -62,12 +54,15 @@ describe("Waku Filter: Peer Management: E2E", function () { it("Number of peers are maintained correctly", async function () { const messages: DecodedMessage[] = []; - const { failures, successes } = await subscription.subscribe( - [decoder], - (msg) => { - messages.push(msg); - } - ); + const { error, results } = await waku.filter.subscribe([decoder], (msg) => { + messages.push(msg); + }); + + if (error) { + throw error; + } + + const { successes, failures } = results; await waku.lightPush.send(encoder, { payload: utf8ToBytes("Hello_World") @@ -82,20 +77,42 @@ describe("Waku Filter: Peer Management: E2E", function () { }); it("Ping succeeds for all connected peers", async function () { - await subscription.subscribe([decoder], () => {}); + const { error, subscription } = await waku.filter.subscribe( + [decoder], + () => {} + ); + if (error) { + throw error; + } const pingResult = await subscription.ping(); expect(pingResult.successes.length).to.equal(waku.filter.numPeersToUse); expect(pingResult.failures.length).to.equal(0); }); it("Ping fails for unsubscribed peers", async function () { + const { error, subscription } = await waku.filter.subscribe( + [decoder], + () => {} + ); + if (error) { + throw error; + } + await subscription.unsubscribe([contentTopic]); const pingResult = await subscription.ping(); expect(pingResult.successes.length).to.equal(0); expect(pingResult.failures.length).to.be.greaterThan(0); }); it("Keep-alive pings maintain the connection", async function () { - await subscription.subscribe([decoder], () => {}, { keepAlive: 100 }); + const { error, subscription } = await waku.filter.subscribe( + [decoder], + () => {}, + undefined, + { keepAlive: 100 } + ); + if (error) { + throw error; + } await delay(1000); @@ -106,9 +123,17 @@ describe("Waku Filter: Peer Management: E2E", function () { it("Renews peer on consistent ping failures", async function () { const maxPingFailures = 3; - await subscription.subscribe([decoder], () => {}, { - pingsBeforePeerRenewed: maxPingFailures - }); + const { error, subscription } = await waku.filter.subscribe( + [decoder], + () => {}, + undefined, + { + pingsBeforePeerRenewed: maxPingFailures + } + ); + if (error) { + throw error; + } const disconnectedNodePeerId = waku.filter.connectedPeers[0].id; await waku.connectionManager.dropConnection(disconnectedNodePeerId); @@ -135,9 +160,17 @@ describe("Waku Filter: Peer Management: E2E", function () { it("Tracks peer failures correctly", async function () { const maxPingFailures = 3; - await subscription.subscribe([decoder], () => {}, { - pingsBeforePeerRenewed: maxPingFailures - }); + const { error, subscription } = await waku.filter.subscribe( + [decoder], + () => {}, + undefined, + { + pingsBeforePeerRenewed: maxPingFailures + } + ); + if (error) { + throw error; + } const targetPeer = waku.filter.connectedPeers[0]; await waku.connectionManager.dropConnection(targetPeer.id); @@ -163,18 +196,24 @@ describe("Waku Filter: Peer Management: E2E", function () { }); it("Maintains correct number of peers after multiple subscribe/unsubscribe cycles", async function () { + let subscription: ISubscriptionSDK; for (let i = 0; i < 3; i++) { - await subscription.subscribe([decoder], () => {}); + const { error, subscription: _subscription } = + await waku.filter.subscribe([decoder], () => {}); + if (error) { + throw error; + } + subscription = _subscription; let pingResult = await subscription.ping(); expect(pingResult.successes.length).to.equal(waku.filter.numPeersToUse); await subscription.unsubscribe([contentTopic]); pingResult = await subscription.ping(); expect(pingResult.failures.length).to.be.greaterThan(0); + await subscription.subscribe([decoder], () => {}); } - await subscription.subscribe([decoder], () => {}); - const finalPingResult = await subscription.ping(); + const finalPingResult = await subscription!.ping(); expect(finalPingResult.successes.length).to.equal( waku.filter.numPeersToUse ); @@ -200,17 +239,15 @@ describe("Waku Filter: Peer Management: E2E", function () { ).toString(); await waku.dial(await nodeWithoutDiscovery.getMultiaddrWithId()); - const { error, subscription: sub } = await waku.filter.createSubscription( - DefaultTestPubsubTopic - ); - if (!sub || error) { - throw new Error("Could not create subscription"); - } - const messages: DecodedMessage[] = []; - const { successes } = await sub.subscribe([decoder], (msg) => { + const { error, results } = await waku.filter.subscribe([decoder], (msg) => { messages.push(msg); }); + if (error) { + throw error; + } + + const { successes } = results; expect(successes.length).to.be.greaterThan(0); expect(successes.length).to.be.equal(waku.filter.numPeersToUse); diff --git a/packages/tests/tests/filter/ping.node.spec.ts b/packages/tests/tests/filter/ping.node.spec.ts index c6e3e11a2b..dcc4c2740b 100644 --- a/packages/tests/tests/filter/ping.node.spec.ts +++ b/packages/tests/tests/filter/ping.node.spec.ts @@ -24,14 +24,9 @@ const runTests = (strictCheckNodes: boolean): void => { this.timeout(10000); let waku: LightNode; let serviceNodes: ServiceNodesFleet; - let subscription: ISubscriptionSDK; beforeEachCustom(this, async () => { [serviceNodes, waku] = await runMultipleNodes(this.ctx, TestShardInfo); - const { error, subscription: _subscription } = - await waku.filter.createSubscription(TestShardInfo); - if (error) throw error; - subscription = _subscription; }); afterEachCustom(this, async () => { @@ -39,10 +34,13 @@ const runTests = (strictCheckNodes: boolean): void => { }); it("Ping on subscribed peer", async function () { - await subscription.subscribe( + const { error, subscription } = await waku.filter.subscribe( [TestDecoder], serviceNodes.messageCollector.callback ); + if (error) { + throw error; + } await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( true @@ -60,14 +58,25 @@ const runTests = (strictCheckNodes: boolean): void => { }); it("Ping on peer without subscriptions", async function () { + const { subscription, error } = await waku.filter.subscribe( + [TestDecoder], + serviceNodes.messageCollector.callback + ); + if (error) { + throw error; + } + await subscription.unsubscribe([TestContentTopic]); await validatePingError(subscription); }); it("Ping on unsubscribed peer", async function () { - await subscription.subscribe( + const { error, subscription } = await waku.filter.subscribe( [TestDecoder], serviceNodes.messageCollector.callback ); + if (error) { + throw error; + } await subscription.ping(); await subscription.unsubscribe([TestContentTopic]); @@ -76,11 +85,17 @@ const runTests = (strictCheckNodes: boolean): void => { }); it("Reopen subscription with peer with lost subscription", async function () { + let subscription: ISubscriptionSDK; const openSubscription = async (): Promise => { - await subscription.subscribe( - [TestDecoder], - serviceNodes.messageCollector.callback - ); + const { error, subscription: _subscription } = + await waku.filter.subscribe( + [TestDecoder], + serviceNodes.messageCollector.callback + ); + if (error) { + throw error; + } + subscription = _subscription; }; const unsubscribe = async (): Promise => { diff --git a/packages/tests/tests/filter/push.node.spec.ts b/packages/tests/tests/filter/push.node.spec.ts index 0b41585ff9..5d69c73948 100644 --- a/packages/tests/tests/filter/push.node.spec.ts +++ b/packages/tests/tests/filter/push.node.spec.ts @@ -1,5 +1,5 @@ import { waitForRemotePeer } from "@waku/core"; -import { ISubscriptionSDK, LightNode, Protocols } from "@waku/interfaces"; +import { LightNode, Protocols } from "@waku/interfaces"; import { utf8ToBytes } from "@waku/sdk"; import { expect } from "chai"; @@ -29,15 +29,9 @@ const runTests = (strictCheckNodes: boolean): void => { this.timeout(10000); let waku: LightNode; let serviceNodes: ServiceNodesFleet; - let subscription: ISubscriptionSDK; beforeEachCustom(this, async () => { [serviceNodes, waku] = await runMultipleNodes(this.ctx, TestShardInfo); - - const { error, subscription: _subscription } = - await waku.filter.createSubscription(TestShardInfo); - if (error) throw error; - subscription = _subscription; }); afterEachCustom(this, async () => { @@ -46,7 +40,7 @@ const runTests = (strictCheckNodes: boolean): void => { TEST_STRING.forEach((testItem) => { it(`Check received message containing ${testItem.description}`, async function () { - await subscription.subscribe( + await waku.filter.subscribe( [TestDecoder], serviceNodes.messageCollector.callback ); @@ -67,7 +61,7 @@ const runTests = (strictCheckNodes: boolean): void => { TEST_TIMESTAMPS.forEach((testItem) => { it(`Check received message with timestamp: ${testItem} `, async function () { - await subscription.subscribe( + await waku.filter.subscribe( [TestDecoder], serviceNodes.messageCollector.callback ); @@ -106,7 +100,7 @@ const runTests = (strictCheckNodes: boolean): void => { }); it("Check message with invalid timestamp is not received", async function () { - await subscription.subscribe( + await waku.filter.subscribe( [TestDecoder], serviceNodes.messageCollector.callback ); @@ -128,7 +122,7 @@ const runTests = (strictCheckNodes: boolean): void => { }); it("Check message on other pubsub topic is not received", async function () { - await subscription.subscribe( + await waku.filter.subscribe( [TestDecoder], serviceNodes.messageCollector.callback ); @@ -151,7 +145,7 @@ const runTests = (strictCheckNodes: boolean): void => { }); it("Check message with no content topic is not received", async function () { - await subscription.subscribe( + await waku.filter.subscribe( [TestDecoder], serviceNodes.messageCollector.callback ); @@ -171,7 +165,7 @@ const runTests = (strictCheckNodes: boolean): void => { }); it("Check message with no payload is not received", async function () { - await subscription.subscribe( + await waku.filter.subscribe( [TestDecoder], serviceNodes.messageCollector.callback ); @@ -199,7 +193,7 @@ const runTests = (strictCheckNodes: boolean): void => { }); it("Check message with non string payload is not received", async function () { - await subscription.subscribe( + await waku.filter.subscribe( [TestDecoder], serviceNodes.messageCollector.callback ); @@ -222,7 +216,7 @@ const runTests = (strictCheckNodes: boolean): void => { // Will be skipped until https://github.com/waku-org/js-waku/issues/1464 si done it.skip("Check message received after jswaku node is restarted", async function () { // Subscribe and send message - await subscription.subscribe( + await waku.filter.subscribe( [TestDecoder], serviceNodes.messageCollector.callback ); @@ -242,11 +236,8 @@ const runTests = (strictCheckNodes: boolean): void => { await waku.dial(await node.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]); } - const { error, subscription: _subscription } = - await waku.filter.createSubscription(TestShardInfo); - if (error) throw error; - subscription = _subscription; - await subscription.subscribe( + + await waku.filter.subscribe( [TestDecoder], serviceNodes.messageCollector.callback ); @@ -271,7 +262,7 @@ const runTests = (strictCheckNodes: boolean): void => { // Will be skipped until https://github.com/waku-org/js-waku/issues/1464 si done it.skip("Check message received after nwaku node is restarted", async function () { - await subscription.subscribe( + await waku.filter.subscribe( [TestDecoder], serviceNodes.messageCollector.callback ); diff --git a/packages/tests/tests/filter/single_node/multiple_pubsub.node.spec.ts b/packages/tests/tests/filter/single_node/multiple_pubsub.node.spec.ts index ec8d9515dc..75e12b1919 100644 --- a/packages/tests/tests/filter/single_node/multiple_pubsub.node.spec.ts +++ b/packages/tests/tests/filter/single_node/multiple_pubsub.node.spec.ts @@ -1,7 +1,6 @@ import { createDecoder, createEncoder, waitForRemotePeer } from "@waku/core"; import type { ContentTopicInfo, - ISubscriptionSDK, LightNode, ShardInfo, SingleShardInfo @@ -32,7 +31,6 @@ describe("Waku Filter V2: Multiple PubsubTopics", function () { let waku: LightNode; let nwaku: ServiceNode; let nwaku2: ServiceNode; - let subscription: ISubscriptionSDK; let messageCollector: MessageCollector; const customPubsubTopic1 = singleShardInfoToPubsubTopic({ @@ -61,12 +59,6 @@ describe("Waku Filter V2: Multiple PubsubTopics", function () { beforeEachCustom(this, async () => { [nwaku, waku] = await runNodes(this.ctx, shardInfo); - - const { error, subscription: _subscription } = - await waku.filter.createSubscription(shardInfo); - if (error) throw error; - subscription = _subscription; - messageCollector = new MessageCollector(); }); @@ -75,7 +67,7 @@ describe("Waku Filter V2: Multiple PubsubTopics", function () { }); it("Subscribe and receive messages on custom pubsubtopic", async function () { - await subscription.subscribe([customDecoder1], messageCollector.callback); + await waku.filter.subscribe([customDecoder1], messageCollector.callback); await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") }); expect(await messageCollector.waitForMessages(1)).to.eq(true); messageCollector.verifyReceivedMessage(0, { @@ -86,18 +78,11 @@ describe("Waku Filter V2: Multiple PubsubTopics", function () { }); it("Subscribe and receive messages on 2 different pubsubtopics", async function () { - await subscription.subscribe([customDecoder1], messageCollector.callback); - - // Subscribe from the same lightnode to the 2nd pubsubtopic - const { error, subscription: subscription2 } = - await waku.filter.createSubscription(customPubsubTopic2); - if (error) { - throw error; - } + await waku.filter.subscribe([customDecoder1], messageCollector.callback); const messageCollector2 = new MessageCollector(); - await subscription2.subscribe([customDecoder2], messageCollector2.callback); + await waku.filter.subscribe([customDecoder2], messageCollector2.callback); await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") }); await waku.lightPush.send(customEncoder2, { payload: utf8ToBytes("M2") }); @@ -119,7 +104,7 @@ describe("Waku Filter V2: Multiple PubsubTopics", function () { }); it("Subscribe and receive messages from 2 nwaku nodes each with different pubsubtopics", async function () { - await subscription.subscribe([customDecoder1], messageCollector.callback); + await waku.filter.subscribe([customDecoder1], messageCollector.callback); // Set up and start a new nwaku node with customPubsubTopic1 nwaku2 = new ServiceNode(makeLogFileName(this) + "2"); @@ -133,19 +118,11 @@ describe("Waku Filter V2: Multiple PubsubTopics", function () { await waku.dial(await nwaku2.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]); - // Subscribe from the same lightnode to the new nwaku on the new pubsubtopic - const { error, subscription: subscription2 } = - await waku.filter.createSubscription(customPubsubTopic2); - - if (error) { - throw error; - } - await nwaku2.ensureSubscriptions([customPubsubTopic2]); const messageCollector2 = new MessageCollector(); - await subscription2.subscribe([customDecoder2], messageCollector2.callback); + await waku.filter.subscribe([customDecoder2], messageCollector2.callback); // Making sure that messages are send and reveiced for both subscriptions // While loop is done because of https://github.com/waku-org/js-waku/issues/1606 @@ -173,17 +150,6 @@ describe("Waku Filter V2: Multiple PubsubTopics", function () { expectedMessageText: "M2" }); }); - - it("Should fail to subscribe with decoder with wrong pubsubTopic", async function () { - // this subscription object is set up with the `customPubsubTopic1` but we're passing it a Decoder with the `customPubsubTopic2` - try { - await subscription.subscribe([customDecoder2], messageCollector.callback); - } catch (error) { - expect((error as Error).message).to.include( - "Pubsub topic not configured" - ); - } - }); }); describe("Waku Filter V2 (Autosharding): Multiple PubsubTopics", function () { @@ -193,7 +159,6 @@ describe("Waku Filter V2 (Autosharding): Multiple PubsubTopics", function () { let waku: LightNode; let nwaku: ServiceNode; let nwaku2: ServiceNode; - let subscription: ISubscriptionSDK; let messageCollector: MessageCollector; const customContentTopic1 = "/waku/2/content/utf8"; @@ -235,10 +200,6 @@ describe("Waku Filter V2 (Autosharding): Multiple PubsubTopics", function () { beforeEachCustom(this, async () => { [nwaku, waku] = await runNodes(this.ctx, contentTopicInfo); - const { error, subscription: _subscription } = - await waku.filter.createSubscription(autoshardingPubsubTopic1); - if (error) throw error; - subscription = _subscription; messageCollector = new MessageCollector(); }); @@ -247,7 +208,7 @@ describe("Waku Filter V2 (Autosharding): Multiple PubsubTopics", function () { }); it("Subscribe and receive messages on autosharded pubsubtopic", async function () { - await subscription.subscribe([customDecoder1], messageCollector.callback); + await waku.filter.subscribe([customDecoder1], messageCollector.callback); await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") }); expect( await messageCollector.waitForMessagesAutosharding(1, { @@ -262,19 +223,10 @@ describe("Waku Filter V2 (Autosharding): Multiple PubsubTopics", function () { }); it("Subscribe and receive messages on 2 different pubsubtopics", async function () { - await subscription.subscribe([customDecoder1], messageCollector.callback); - - // Subscribe from the same lightnode to the 2nd pubsubtopic - const { error, subscription: subscription2 } = - await waku.filter.createSubscription(autoshardingPubsubTopic2); - - if (error) { - throw error; - } + await waku.filter.subscribe([customDecoder1], messageCollector.callback); const messageCollector2 = new MessageCollector(); - - await subscription2.subscribe([customDecoder2], messageCollector2.callback); + await waku.filter.subscribe([customDecoder2], messageCollector2.callback); await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") }); await waku.lightPush.send(customEncoder2, { payload: utf8ToBytes("M2") }); @@ -304,7 +256,7 @@ describe("Waku Filter V2 (Autosharding): Multiple PubsubTopics", function () { }); it("Subscribe and receive messages from 2 nwaku nodes each with different pubsubtopics", async function () { - await subscription.subscribe([customDecoder1], messageCollector.callback); + await waku.filter.subscribe([customDecoder1], messageCollector.callback); // Set up and start a new nwaku node with customPubsubTopic1 nwaku2 = new ServiceNode(makeLogFileName(this) + "2"); @@ -319,19 +271,11 @@ describe("Waku Filter V2 (Autosharding): Multiple PubsubTopics", function () { await waku.dial(await nwaku2.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]); - // Subscribe from the same lightnode to the new nwaku on the new pubsubtopic - const { error, subscription: subscription2 } = - await waku.filter.createSubscription(autoshardingPubsubTopic2); - - if (error) { - throw error; - } - await nwaku2.ensureSubscriptionsAutosharding([customContentTopic2]); const messageCollector2 = new MessageCollector(); - await subscription2.subscribe([customDecoder2], messageCollector2.callback); + await waku.filter.subscribe([customDecoder2], messageCollector2.callback); // Making sure that messages are send and reveiced for both subscriptions // While loop is done because of https://github.com/waku-org/js-waku/issues/1606 @@ -363,7 +307,7 @@ describe("Waku Filter V2 (Autosharding): Multiple PubsubTopics", function () { it("Should fail to subscribe with decoder with wrong pubsubTopic", async function () { // this subscription object is set up with the `customPubsubTopic1` but we're passing it a Decoder with the `customPubsubTopic2` try { - await subscription.subscribe([customDecoder2], messageCollector.callback); + await waku.filter.subscribe([customDecoder2], messageCollector.callback); } catch (error) { expect((error as Error).message).to.include( "Pubsub topic not configured" @@ -378,7 +322,6 @@ describe("Waku Filter V2 (Named sharding): Multiple PubsubTopics", function () { let waku: LightNode; let nwaku: ServiceNode; let nwaku2: ServiceNode; - let subscription: ISubscriptionSDK; let messageCollector: MessageCollector; const customPubsubTopic1 = singleShardInfoToPubsubTopic({ @@ -408,11 +351,6 @@ describe("Waku Filter V2 (Named sharding): Multiple PubsubTopics", function () { beforeEachCustom(this, async () => { [nwaku, waku] = await runNodes(this.ctx, shardInfo); - const { error, subscription: _subscription } = - await waku.filter.createSubscription(customPubsubTopic1); - if (error) throw error; - subscription = _subscription; - messageCollector = new MessageCollector(); }); @@ -421,7 +359,7 @@ describe("Waku Filter V2 (Named sharding): Multiple PubsubTopics", function () { }); it("Subscribe and receive messages on custom pubsubtopic", async function () { - await subscription.subscribe([customDecoder1], messageCollector.callback); + await waku.filter.subscribe([customDecoder1], messageCollector.callback); await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") }); expect(await messageCollector.waitForMessages(1)).to.eq(true); messageCollector.verifyReceivedMessage(0, { @@ -432,18 +370,11 @@ describe("Waku Filter V2 (Named sharding): Multiple PubsubTopics", function () { }); it("Subscribe and receive messages on 2 different pubsubtopics", async function () { - await subscription.subscribe([customDecoder1], messageCollector.callback); - - // Subscribe from the same lightnode to the 2nd pubsubtopic - const { error, subscription: subscription2 } = - await waku.filter.createSubscription(customPubsubTopic2); - if (error) { - throw error; - } + await waku.filter.subscribe([customDecoder1], messageCollector.callback); const messageCollector2 = new MessageCollector(); - await subscription2.subscribe([customDecoder2], messageCollector2.callback); + await waku.filter.subscribe([customDecoder2], messageCollector2.callback); await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") }); await waku.lightPush.send(customEncoder2, { payload: utf8ToBytes("M2") }); @@ -465,7 +396,7 @@ describe("Waku Filter V2 (Named sharding): Multiple PubsubTopics", function () { }); it("Subscribe and receive messages from 2 nwaku nodes each with different pubsubtopics", async function () { - await subscription.subscribe([customDecoder1], messageCollector.callback); + await waku.filter.subscribe([customDecoder1], messageCollector.callback); // Set up and start a new nwaku node with customPubsubTopic1 nwaku2 = new ServiceNode(makeLogFileName(this) + "2"); @@ -479,17 +410,11 @@ describe("Waku Filter V2 (Named sharding): Multiple PubsubTopics", function () { await waku.dial(await nwaku2.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]); - // Subscribe from the same lightnode to the new nwaku on the new pubsubtopic - const { error, subscription: subscription2 } = - await waku.filter.createSubscription(customPubsubTopic2); - if (error) { - throw error; - } await nwaku2.ensureSubscriptions([customPubsubTopic2]); const messageCollector2 = new MessageCollector(); - await subscription2.subscribe([customDecoder2], messageCollector2.callback); + await waku.filter.subscribe([customDecoder2], messageCollector2.callback); // Making sure that messages are send and reveiced for both subscriptions // While loop is done because of https://github.com/waku-org/js-waku/issues/1606 @@ -521,7 +446,7 @@ describe("Waku Filter V2 (Named sharding): Multiple PubsubTopics", function () { it("Should fail to subscribe with decoder with wrong pubsubTopic", async function () { // this subscription object is set up with the `customPubsubTopic1` but we're passing it a Decoder with the `customPubsubTopic2` try { - await subscription.subscribe([customDecoder2], messageCollector.callback); + await waku.filter.subscribe([customDecoder2], messageCollector.callback); } catch (error) { expect((error as Error).message).to.include( "Pubsub topic not configured" diff --git a/packages/tests/tests/filter/single_node/ping.node.spec.ts b/packages/tests/tests/filter/single_node/ping.node.spec.ts index 71040dfaee..f26395c221 100644 --- a/packages/tests/tests/filter/single_node/ping.node.spec.ts +++ b/packages/tests/tests/filter/single_node/ping.node.spec.ts @@ -24,16 +24,10 @@ describe("Waku Filter V2: Ping", function () { this.timeout(10000); let waku: LightNode; let nwaku: ServiceNode; - let subscription: ISubscriptionSDK; let messageCollector: MessageCollector; beforeEachCustom(this, async () => { [nwaku, waku] = await runNodes(this.ctx, TestShardInfo); - - const { error, subscription: _subscription } = - await waku.filter.createSubscription(TestShardInfo); - if (error) throw error; - subscription = _subscription; messageCollector = new MessageCollector(); }); @@ -42,7 +36,13 @@ describe("Waku Filter V2: Ping", function () { }); it("Ping on subscribed peer", async function () { - await subscription.subscribe([TestDecoder], messageCollector.callback); + const { subscription, error } = await waku.filter.subscribe( + [TestDecoder], + messageCollector.callback + ); + if (error) { + throw error; + } await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); expect(await messageCollector.waitForMessages(1)).to.eq(true); @@ -56,11 +56,26 @@ describe("Waku Filter V2: Ping", function () { }); it("Ping on peer without subscriptions", async function () { + const { subscription, error } = await waku.filter.subscribe( + [TestDecoder], + messageCollector.callback + ); + if (error) { + throw error; + } + await subscription.unsubscribe([TestContentTopic]); await validatePingError(subscription); }); it("Ping on unsubscribed peer", async function () { - await subscription.subscribe([TestDecoder], messageCollector.callback); + const { error, subscription } = await waku.filter.subscribe( + [TestDecoder], + messageCollector.callback + ); + if (error) { + throw error; + } + await subscription.ping(); await subscription.unsubscribe([TestContentTopic]); @@ -69,8 +84,16 @@ describe("Waku Filter V2: Ping", function () { }); it("Reopen subscription with peer with lost subscription", async function () { + let subscription: ISubscriptionSDK; const openSubscription = async (): Promise => { - await subscription.subscribe([TestDecoder], messageCollector.callback); + const result = await waku.filter.subscribe( + [TestDecoder], + messageCollector.callback + ); + if (result.error) { + throw result.error; + } + subscription = result.subscription; }; const unsubscribe = async (): Promise => { diff --git a/packages/tests/tests/filter/single_node/push.node.spec.ts b/packages/tests/tests/filter/single_node/push.node.spec.ts index fdb6a75760..6e47e586ae 100644 --- a/packages/tests/tests/filter/single_node/push.node.spec.ts +++ b/packages/tests/tests/filter/single_node/push.node.spec.ts @@ -1,5 +1,5 @@ import { waitForRemotePeer } from "@waku/core"; -import { ISubscriptionSDK, LightNode, Protocols } from "@waku/interfaces"; +import { LightNode, Protocols } from "@waku/interfaces"; import { utf8ToBytes } from "@waku/sdk"; import { expect } from "chai"; @@ -28,17 +28,10 @@ describe("Waku Filter V2: FilterPush", function () { this.timeout(10000); let waku: LightNode; let nwaku: ServiceNode; - let subscription: ISubscriptionSDK; let messageCollector: MessageCollector; beforeEachCustom(this, async () => { [nwaku, waku] = await runNodes(this.ctx, TestShardInfo); - - const { error, subscription: _subscription } = - await waku.filter.createSubscription(TestShardInfo); - if (error) throw error; - subscription = _subscription; - messageCollector = new MessageCollector(nwaku); }); @@ -48,7 +41,7 @@ describe("Waku Filter V2: FilterPush", function () { TEST_STRING.forEach((testItem) => { it(`Check received message containing ${testItem.description}`, async function () { - await subscription.subscribe([TestDecoder], messageCollector.callback); + await waku.filter.subscribe([TestDecoder], messageCollector.callback); await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes(testItem.value) }); @@ -63,7 +56,7 @@ describe("Waku Filter V2: FilterPush", function () { TEST_TIMESTAMPS.forEach((testItem) => { it(`Check received message with timestamp: ${testItem} `, async function () { - await subscription.subscribe([TestDecoder], messageCollector.callback); + await waku.filter.subscribe([TestDecoder], messageCollector.callback); await delay(400); await nwaku.restCall( @@ -97,7 +90,7 @@ describe("Waku Filter V2: FilterPush", function () { }); it("Check message with invalid timestamp is not received", async function () { - await subscription.subscribe([TestDecoder], messageCollector.callback); + await waku.filter.subscribe([TestDecoder], messageCollector.callback); await delay(400); await nwaku.restCall( @@ -116,7 +109,7 @@ describe("Waku Filter V2: FilterPush", function () { }); it("Check message on other pubsub topic is not received", async function () { - await subscription.subscribe([TestDecoder], messageCollector.callback); + await waku.filter.subscribe([TestDecoder], messageCollector.callback); await delay(400); await nwaku.restCall( @@ -134,7 +127,7 @@ describe("Waku Filter V2: FilterPush", function () { }); it("Check message with no pubsub topic is not received", async function () { - await subscription.subscribe([TestDecoder], messageCollector.callback); + await waku.filter.subscribe([TestDecoder], messageCollector.callback); await delay(400); await nwaku.restCall( @@ -152,7 +145,7 @@ describe("Waku Filter V2: FilterPush", function () { }); it("Check message with no content topic is not received", async function () { - await subscription.subscribe([TestDecoder], messageCollector.callback); + await waku.filter.subscribe([TestDecoder], messageCollector.callback); await delay(400); await nwaku.restCall( @@ -169,7 +162,7 @@ describe("Waku Filter V2: FilterPush", function () { }); it("Check message with no payload is not received", async function () { - await subscription.subscribe([TestDecoder], messageCollector.callback); + await waku.filter.subscribe([TestDecoder], messageCollector.callback); await delay(400); await nwaku.restCall( @@ -191,7 +184,7 @@ describe("Waku Filter V2: FilterPush", function () { }); it("Check message with non string payload is not received", async function () { - await subscription.subscribe([TestDecoder], messageCollector.callback); + await waku.filter.subscribe([TestDecoder], messageCollector.callback); await delay(400); await nwaku.restCall( @@ -211,7 +204,7 @@ describe("Waku Filter V2: FilterPush", function () { // Will be skipped until https://github.com/waku-org/js-waku/issues/1464 si done it.skip("Check message received after jswaku node is restarted", async function () { // Subscribe and send message - await subscription.subscribe([TestDecoder], messageCollector.callback); + await waku.filter.subscribe([TestDecoder], messageCollector.callback); await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); expect(await messageCollector.waitForMessages(1)).to.eq(true); @@ -224,11 +217,8 @@ describe("Waku Filter V2: FilterPush", function () { // Redo the connection and create a new subscription await waku.dial(await nwaku.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]); - const { error, subscription: _subscription } = - await waku.filter.createSubscription(); - if (error) throw error; - subscription = _subscription; - await subscription.subscribe([TestDecoder], messageCollector.callback); + + await waku.filter.subscribe([TestDecoder], messageCollector.callback); await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") }); @@ -246,7 +236,7 @@ describe("Waku Filter V2: FilterPush", function () { // Will be skipped until https://github.com/waku-org/js-waku/issues/1464 si done it.skip("Check message received after nwaku node is restarted", async function () { - await subscription.subscribe([TestDecoder], messageCollector.callback); + await waku.filter.subscribe([TestDecoder], messageCollector.callback); await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); expect(await messageCollector.waitForMessages(1)).to.eq(true); diff --git a/packages/tests/tests/filter/single_node/subscribe.node.spec.ts b/packages/tests/tests/filter/single_node/subscribe.node.spec.ts index 6ef09e3182..928903fac4 100644 --- a/packages/tests/tests/filter/single_node/subscribe.node.spec.ts +++ b/packages/tests/tests/filter/single_node/subscribe.node.spec.ts @@ -1,5 +1,5 @@ import { createDecoder, createEncoder, waitForRemotePeer } from "@waku/core"; -import { ISubscriptionSDK, LightNode, Protocols } from "@waku/interfaces"; +import { LightNode, Protocols } from "@waku/interfaces"; import { ecies, generatePrivateKey, @@ -40,18 +40,11 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () { let waku2: LightNode; let nwaku: ServiceNode; let nwaku2: ServiceNode; - let subscription: ISubscriptionSDK; let messageCollector: MessageCollector; let ctx: Context; beforeEachCustom(this, async () => { [nwaku, waku] = await runNodes(this.ctx, TestShardInfo); - - const { error, subscription: _subscription } = - await waku.filter.createSubscription(TestShardInfo); - if (error) throw error; - subscription = _subscription; - messageCollector = new MessageCollector(); await nwaku.ensureSubscriptions([TestPubsubTopic]); }); @@ -61,7 +54,13 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () { }); it("Subscribe and receive messages via lightPush", async function () { - await subscription.subscribe([TestDecoder], messageCollector.callback); + const { error } = await waku.filter.subscribe( + [TestDecoder], + messageCollector.callback + ); + if (error) { + throw error; + } await waku.lightPush.send(TestEncoder, messagePayload); @@ -88,7 +87,13 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () { TestPubsubTopic ); - await subscription.subscribe([decoder], messageCollector.callback); + const { error } = await waku.filter.subscribe( + [decoder], + messageCollector.callback + ); + if (error) { + throw error; + } await waku.lightPush.send(encoder, messagePayload); @@ -115,7 +120,13 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () { TestPubsubTopic ); - await subscription.subscribe([decoder], messageCollector.callback); + const { error } = await waku.filter.subscribe( + [decoder], + messageCollector.callback + ); + if (error) { + throw error; + } await waku.lightPush.send(encoder, messagePayload); @@ -130,7 +141,13 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () { }); it("Subscribe and receive messages via waku relay post", async function () { - await subscription.subscribe([TestDecoder], messageCollector.callback); + const { error } = await waku.filter.subscribe( + [TestDecoder], + messageCollector.callback + ); + if (error) { + throw error; + } await delay(400); @@ -152,7 +169,7 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () { }); it("Subscribe and receive 2 messages on the same topic", async function () { - await subscription.subscribe([TestDecoder], messageCollector.callback); + await waku.filter.subscribe([TestDecoder], messageCollector.callback); await waku.lightPush.send(TestEncoder, messagePayload); @@ -181,7 +198,13 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () { it("Subscribe and receive messages on 2 different content topics", async function () { // Subscribe to the first content topic and send a message. - await subscription.subscribe([TestDecoder], messageCollector.callback); + const { error, subscription } = await waku.filter.subscribe( + [TestDecoder], + messageCollector.callback + ); + if (error) { + throw error; + } await waku.lightPush.send(TestEncoder, messagePayload); expect(await messageCollector.waitForMessages(1)).to.eq(true); messageCollector.verifyReceivedMessage(0, { @@ -227,7 +250,7 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () { // Subscribe to all 20 topics. for (let i = 0; i < topicCount; i++) { - await subscription.subscribe([td.decoders[i]], messageCollector.callback); + await waku.filter.subscribe([td.decoders[i]], messageCollector.callback); } // Send a unique message on each topic. @@ -253,7 +276,7 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () { const topicCount = 100; const td = generateTestData(topicCount, { pubsubTopic: TestPubsubTopic }); - await subscription.subscribe(td.decoders, messageCollector.callback); + await waku.filter.subscribe(td.decoders, messageCollector.callback); // Send a unique message on each topic. for (let i = 0; i < topicCount; i++) { @@ -278,10 +301,14 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () { const td = generateTestData(topicCount, { pubsubTopic: TestPubsubTopic }); try { - const { failures, successes } = await subscription.subscribe( + const { error, results } = await waku.filter.subscribe( td.decoders, messageCollector.callback ); + if (error) { + throw error; + } + const { failures, successes } = results; if (failures.length === 0 || successes.length > 0) { throw new Error( `Subscribe to ${topicCount} topics was successful but was expected to fail with a specific error.` @@ -309,10 +336,10 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () { const td2 = generateTestData(topicCount2, { pubsubTopic: TestPubsubTopic }); // Subscribe to the first set of topics. - await subscription.subscribe(td1.decoders, messageCollector.callback); + await waku.filter.subscribe(td1.decoders, messageCollector.callback); // Subscribe to the second set of topics which has overlapping topics with the first set. - await subscription.subscribe(td2.decoders, messageCollector.callback); + await waku.filter.subscribe(td2.decoders, messageCollector.callback); // Send messages to the first set of topics. for (let i = 0; i < topicCount1; i++) { @@ -339,11 +366,11 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () { }); it("Refresh subscription", async function () { - await subscription.subscribe([TestDecoder], messageCollector.callback); + await waku.filter.subscribe([TestDecoder], messageCollector.callback); await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); // Resubscribe (refresh) to the same topic and send another message. - await subscription.subscribe([TestDecoder], messageCollector.callback); + await waku.filter.subscribe([TestDecoder], messageCollector.callback); await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") }); // Confirm both messages were received. @@ -371,7 +398,7 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () { }); const newDecoder = createDecoder(newContentTopic, TestPubsubTopic); - await subscription.subscribe([newDecoder], messageCollector.callback); + await waku.filter.subscribe([newDecoder], messageCollector.callback); await waku.lightPush.send(newEncoder, messagePayload); expect(await messageCollector.waitForMessages(1)).to.eq(true); @@ -384,22 +411,16 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () { }); it("Add multiple subscription objects on single nwaku node", async function () { - await subscription.subscribe([TestDecoder], messageCollector.callback); + await waku.filter.subscribe([TestDecoder], messageCollector.callback); await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); - // Create a second subscription on a different topic - const { error, subscription: subscription2 } = - await waku.filter.createSubscription(TestShardInfo); - if (error) { - throw error; - } const newContentTopic = "/test/2/waku-filter/default"; const newEncoder = createEncoder({ contentTopic: newContentTopic, pubsubTopic: TestPubsubTopic }); const newDecoder = createDecoder(newContentTopic, TestPubsubTopic); - await subscription2.subscribe([newDecoder], messageCollector.callback); + await waku.filter.subscribe([newDecoder], messageCollector.callback); await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M2") }); @@ -418,17 +439,13 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () { }); it("Subscribe and receive messages from multiple nwaku nodes", async function () { - await subscription.subscribe([TestDecoder], messageCollector.callback); + await waku.filter.subscribe([TestDecoder], messageCollector.callback); // Set up and start a new nwaku node [nwaku2, waku2] = await runNodes(ctx, TestShardInfo); await waku.dial(await nwaku2.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]); - const { error, subscription: subscription2 } = - await waku.filter.createSubscription(TestShardInfo); - if (error) { - throw error; - } + await nwaku2.ensureSubscriptions([TestPubsubTopic]); // Send a message using the new subscription const newContentTopic = "/test/2/waku-filter/default"; @@ -437,7 +454,7 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () { pubsubTopic: TestPubsubTopic }); const newDecoder = createDecoder(newContentTopic, TestPubsubTopic); - await subscription2.subscribe([newDecoder], messageCollector.callback); + await waku.filter.subscribe([newDecoder], messageCollector.callback); // Making sure that messages are send and reveiced for both subscriptions while (!(await messageCollector.waitForMessages(2))) { diff --git a/packages/tests/tests/filter/single_node/unsubscribe.node.spec.ts b/packages/tests/tests/filter/single_node/unsubscribe.node.spec.ts index 1f7a6258a4..a2af7c4b4b 100644 --- a/packages/tests/tests/filter/single_node/unsubscribe.node.spec.ts +++ b/packages/tests/tests/filter/single_node/unsubscribe.node.spec.ts @@ -1,5 +1,4 @@ import { createDecoder, createEncoder } from "@waku/core"; -import { ISubscriptionSDK } from "@waku/interfaces"; import { LightNode } from "@waku/interfaces"; import { utf8ToBytes } from "@waku/sdk"; import { expect } from "chai"; @@ -28,16 +27,11 @@ describe("Waku Filter V2: Unsubscribe", function () { this.timeout(10000); let waku: LightNode; let nwaku: ServiceNode; - let subscription: ISubscriptionSDK; let messageCollector: MessageCollector; beforeEachCustom(this, async () => { [nwaku, waku] = await runNodes(this.ctx, TestShardInfo); - const { error, subscription: _subscription } = - await waku.filter.createSubscription(TestShardInfo); - if (error) throw error; - subscription = _subscription; messageCollector = new MessageCollector(); await nwaku.ensureSubscriptions([TestPubsubTopic]); }); @@ -47,7 +41,13 @@ describe("Waku Filter V2: Unsubscribe", function () { }); it("Unsubscribe 1 topic - node subscribed to 1 topic", async function () { - await subscription.subscribe([TestDecoder], messageCollector.callback); + const { subscription, error } = await waku.filter.subscribe( + [TestDecoder], + messageCollector.callback + ); + if (error) { + throw error; + } await waku.lightPush.send(TestEncoder, messagePayload); expect(await messageCollector.waitForMessages(1)).to.eq(true); @@ -68,7 +68,13 @@ describe("Waku Filter V2: Unsubscribe", function () { it("Unsubscribe 1 topic - node subscribed to 2 topics", async function () { // Subscribe to 2 topics and send messages - await subscription.subscribe([TestDecoder], messageCollector.callback); + const { error, subscription } = await waku.filter.subscribe( + [TestDecoder], + messageCollector.callback + ); + if (error) { + throw error; + } const newContentTopic = "/test/2/waku-filter"; const newEncoder = createEncoder({ contentTopic: newContentTopic, @@ -93,7 +99,13 @@ describe("Waku Filter V2: Unsubscribe", function () { it("Unsubscribe 2 topics - node subscribed to 2 topics", async function () { // Subscribe to 2 topics and send messages - await subscription.subscribe([TestDecoder], messageCollector.callback); + const { error, subscription } = await waku.filter.subscribe( + [TestDecoder], + messageCollector.callback + ); + if (error) { + throw error; + } const newContentTopic = "/test/2/waku-filter/default"; const newEncoder = createEncoder({ contentTopic: newContentTopic, @@ -118,7 +130,13 @@ describe("Waku Filter V2: Unsubscribe", function () { it("Unsubscribe topics the node is not subscribed to", async function () { // Subscribe to 1 topic and send message - await subscription.subscribe([TestDecoder], messageCollector.callback); + const { error, subscription } = await waku.filter.subscribe( + [TestDecoder], + messageCollector.callback + ); + if (error) { + throw error; + } await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); expect(await messageCollector.waitForMessages(1)).to.eq(true); @@ -136,7 +154,13 @@ describe("Waku Filter V2: Unsubscribe", function () { }); it("Unsubscribes all - node subscribed to 1 topic", async function () { - await subscription.subscribe([TestDecoder], messageCollector.callback); + const { error, subscription } = await waku.filter.subscribe( + [TestDecoder], + messageCollector.callback + ); + if (error) { + throw error; + } await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); expect(await messageCollector.waitForMessages(1)).to.eq(true); expect(messageCollector.count).to.eq(1); @@ -155,7 +179,13 @@ describe("Waku Filter V2: Unsubscribe", function () { // Subscribe to 10 topics and send message const topicCount = 10; const td = generateTestData(topicCount, { pubsubTopic: TestPubsubTopic }); - await subscription.subscribe(td.decoders, messageCollector.callback); + const { error, subscription } = await waku.filter.subscribe( + td.decoders, + messageCollector.callback + ); + if (error) { + throw error; + } for (let i = 0; i < topicCount; i++) { await waku.lightPush.send(td.encoders[i], { payload: utf8ToBytes(`M${i + 1}`) diff --git a/packages/tests/tests/filter/subscribe.node.spec.ts b/packages/tests/tests/filter/subscribe.node.spec.ts index b350643670..46a0cf9989 100644 --- a/packages/tests/tests/filter/subscribe.node.spec.ts +++ b/packages/tests/tests/filter/subscribe.node.spec.ts @@ -1,5 +1,5 @@ import { createDecoder, createEncoder } from "@waku/core"; -import { ISubscriptionSDK, LightNode } from "@waku/interfaces"; +import { LightNode } from "@waku/interfaces"; import { ecies, generatePrivateKey, @@ -36,7 +36,6 @@ const runTests = (strictCheckNodes: boolean): void => { this.timeout(100000); let waku: LightNode; let serviceNodes: ServiceNodesFleet; - let subscription: ISubscriptionSDK; beforeEachCustom(this, async () => { [serviceNodes, waku] = await runMultipleNodes( @@ -45,12 +44,6 @@ const runTests = (strictCheckNodes: boolean): void => { undefined, strictCheckNodes ); - const { error, subscription: _subscription } = - await waku.filter.createSubscription(TestShardInfo); - - if (!error) { - subscription = _subscription; - } }); afterEachCustom(this, async () => { @@ -60,7 +53,7 @@ const runTests = (strictCheckNodes: boolean): void => { it("Subscribe and receive messages via lightPush", async function () { expect(waku.libp2p.getConnections()).has.length(3); - await subscription.subscribe( + await waku.filter.subscribe( [TestDecoder], serviceNodes.messageCollector.callback ); @@ -92,7 +85,7 @@ const runTests = (strictCheckNodes: boolean): void => { TestPubsubTopic ); - await subscription.subscribe( + await waku.filter.subscribe( [decoder], serviceNodes.messageCollector.callback ); @@ -125,7 +118,7 @@ const runTests = (strictCheckNodes: boolean): void => { TestPubsubTopic ); - await subscription.subscribe( + await waku.filter.subscribe( [decoder], serviceNodes.messageCollector.callback ); @@ -146,7 +139,7 @@ const runTests = (strictCheckNodes: boolean): void => { }); it("Subscribe and receive messages via waku relay post", async function () { - await subscription.subscribe( + await waku.filter.subscribe( [TestDecoder], serviceNodes.messageCollector.callback ); @@ -173,7 +166,7 @@ const runTests = (strictCheckNodes: boolean): void => { }); it("Subscribe and receive 2 messages on the same topic", async function () { - await subscription.subscribe( + await waku.filter.subscribe( [TestDecoder], serviceNodes.messageCollector.callback ); @@ -208,7 +201,7 @@ const runTests = (strictCheckNodes: boolean): void => { it("Subscribe and receive messages on 2 different content topics", async function () { // Subscribe to the first content topic and send a message. - await subscription.subscribe( + await waku.filter.subscribe( [TestDecoder], serviceNodes.messageCollector.callback ); @@ -231,7 +224,7 @@ const runTests = (strictCheckNodes: boolean): void => { pubsubTopic: TestPubsubTopic }); const newDecoder = createDecoder(newContentTopic, TestPubsubTopic); - await subscription.subscribe( + await waku.filter.subscribe( [newDecoder], serviceNodes.messageCollector.callback ); @@ -267,7 +260,7 @@ const runTests = (strictCheckNodes: boolean): void => { // Subscribe to all 20 topics. for (let i = 0; i < topicCount; i++) { - await subscription.subscribe( + await waku.filter.subscribe( [td.decoders[i]], serviceNodes.messageCollector.callback ); @@ -298,7 +291,7 @@ const runTests = (strictCheckNodes: boolean): void => { const topicCount = 100; const td = generateTestData(topicCount, { pubsubTopic: TestPubsubTopic }); - await subscription.subscribe( + await waku.filter.subscribe( td.decoders, serviceNodes.messageCollector.callback ); @@ -328,10 +321,14 @@ const runTests = (strictCheckNodes: boolean): void => { const td = generateTestData(topicCount, { pubsubTopic: TestPubsubTopic }); try { - const { failures, successes } = await subscription.subscribe( + const { error, results } = await waku.filter.subscribe( td.decoders, serviceNodes.messageCollector.callback ); + if (error) { + throw error; + } + const { failures, successes } = results; if (failures.length === 0 || successes.length > 0) { throw new Error( `Subscribe to ${topicCount} topics was successful but was expected to fail with a specific error.` @@ -363,13 +360,13 @@ const runTests = (strictCheckNodes: boolean): void => { }); // Subscribe to the first set of topics. - await subscription.subscribe( + await waku.filter.subscribe( td1.decoders, serviceNodes.messageCollector.callback ); // Subscribe to the second set of topics which has overlapping topics with the first set. - await subscription.subscribe( + await waku.filter.subscribe( td2.decoders, serviceNodes.messageCollector.callback ); @@ -398,14 +395,14 @@ const runTests = (strictCheckNodes: boolean): void => { }); it("Refresh subscription", async function () { - await subscription.subscribe( + await waku.filter.subscribe( [TestDecoder], serviceNodes.messageCollector.callback ); await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); // Resubscribe (refresh) to the same topic and send another message. - await subscription.subscribe( + await waku.filter.subscribe( [TestDecoder], serviceNodes.messageCollector.callback ); @@ -436,7 +433,7 @@ const runTests = (strictCheckNodes: boolean): void => { }); const newDecoder = createDecoder(newContentTopic, TestPubsubTopic); - await subscription.subscribe( + await waku.filter.subscribe( [newDecoder], serviceNodes.messageCollector.callback ); @@ -454,25 +451,19 @@ const runTests = (strictCheckNodes: boolean): void => { }); it("Add multiple subscription objects on single nwaku node", async function () { - await subscription.subscribe( + await waku.filter.subscribe( [TestDecoder], serviceNodes.messageCollector.callback ); await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); - // Create a second subscription on a different topic - const { error, subscription: subscription2 } = - await waku.filter.createSubscription(TestShardInfo); - if (error) { - throw error; - } const newContentTopic = "/test/2/waku-filter/default"; const newEncoder = createEncoder({ contentTopic: newContentTopic, pubsubTopic: TestPubsubTopic }); const newDecoder = createDecoder(newContentTopic, TestPubsubTopic); - await subscription2.subscribe( + await waku.filter.subscribe( [newDecoder], serviceNodes.messageCollector.callback ); diff --git a/packages/tests/tests/filter/unsubscribe.node.spec.ts b/packages/tests/tests/filter/unsubscribe.node.spec.ts index 7f3a74bf11..de644d9cc6 100644 --- a/packages/tests/tests/filter/unsubscribe.node.spec.ts +++ b/packages/tests/tests/filter/unsubscribe.node.spec.ts @@ -1,5 +1,5 @@ import { createDecoder, createEncoder } from "@waku/core"; -import { ISubscriptionSDK, LightNode } from "@waku/interfaces"; +import { type LightNode } from "@waku/interfaces"; import { utf8ToBytes } from "@waku/sdk"; import { expect } from "chai"; @@ -28,22 +28,12 @@ const runTests = (strictCheckNodes: boolean): void => { this.timeout(10000); let waku: LightNode; let serviceNodes: ServiceNodesFleet; - let subscription: ISubscriptionSDK; beforeEachCustom(this, async () => { [serviceNodes, waku] = await runMultipleNodes(this.ctx, { contentTopics: [TestContentTopic], clusterId: ClusterId }); - const { error, subscription: _subscription } = - await waku.filter.createSubscription({ - contentTopics: [TestContentTopic], - clusterId: ClusterId - }); - - if (!error) { - subscription = _subscription; - } }); afterEachCustom(this, async () => { @@ -51,10 +41,13 @@ const runTests = (strictCheckNodes: boolean): void => { }); it("Unsubscribe 1 topic - node subscribed to 1 topic", async function () { - await subscription.subscribe( + const { error, subscription } = await waku.filter.subscribe( [TestDecoder], serviceNodes.messageCollector.callback ); + if (error) { + throw error; + } await waku.lightPush.send(TestEncoder, messagePayload); expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( true @@ -79,17 +72,20 @@ const runTests = (strictCheckNodes: boolean): void => { it("Unsubscribe 1 topic - node subscribed to 2 topics", async function () { // Subscribe to 2 topics and send messages - await subscription.subscribe( + const { error, subscription } = await waku.filter.subscribe( [TestDecoder], serviceNodes.messageCollector.callback ); + if (error) { + throw error; + } const newContentTopic = "/test/2/waku-filter"; const newEncoder = createEncoder({ contentTopic: newContentTopic, pubsubTopic: TestPubsubTopic }); const newDecoder = createDecoder(newContentTopic, TestPubsubTopic); - await subscription.subscribe( + await waku.filter.subscribe( [newDecoder], serviceNodes.messageCollector.callback ); @@ -114,7 +110,7 @@ const runTests = (strictCheckNodes: boolean): void => { it("Unsubscribe 2 topics - node subscribed to 2 topics", async function () { // Subscribe to 2 topics and send messages - await subscription.subscribe( + await waku.filter.subscribe( [TestDecoder], serviceNodes.messageCollector.callback ); @@ -124,10 +120,13 @@ const runTests = (strictCheckNodes: boolean): void => { pubsubTopic: TestPubsubTopic }); const newDecoder = createDecoder(newContentTopic, TestPubsubTopic); - await subscription.subscribe( + const { error, subscription } = await waku.filter.subscribe( [newDecoder], serviceNodes.messageCollector.callback ); + if (error) { + throw error; + } await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M2") }); expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq( @@ -149,10 +148,13 @@ const runTests = (strictCheckNodes: boolean): void => { it("Unsubscribe topics the node is not subscribed to", async function () { // Subscribe to 1 topic and send message - await subscription.subscribe( + const { error, subscription } = await waku.filter.subscribe( [TestDecoder], serviceNodes.messageCollector.callback ); + if (error) { + throw error; + } await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( true @@ -174,10 +176,13 @@ const runTests = (strictCheckNodes: boolean): void => { }); it("Unsubscribes all - node subscribed to 1 topic", async function () { - await subscription.subscribe( + const { error, subscription } = await waku.filter.subscribe( [TestDecoder], serviceNodes.messageCollector.callback ); + if (error) { + throw error; + } await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( true @@ -200,10 +205,13 @@ const runTests = (strictCheckNodes: boolean): void => { // Subscribe to 10 topics and send message const topicCount = 10; const td = generateTestData(topicCount, { pubsubTopic: TestPubsubTopic }); - await subscription.subscribe( + const { error, subscription } = await waku.filter.subscribe( td.decoders, serviceNodes.messageCollector.callback ); + if (error) { + throw error; + } for (let i = 0; i < topicCount; i++) { await waku.lightPush.send(td.encoders[i], { payload: utf8ToBytes(`M${i + 1}`) diff --git a/packages/tests/tests/health-manager/node.spec.ts b/packages/tests/tests/health-manager/node.spec.ts index 823ae9dc19..1eaebaab68 100644 --- a/packages/tests/tests/health-manager/node.spec.ts +++ b/packages/tests/tests/health-manager/node.spec.ts @@ -10,7 +10,12 @@ import { ServiceNodesFleet } from "../../src"; -import { messagePayload, TestEncoder, TestShardInfo } from "./utils"; +import { + messagePayload, + TestDecoder, + TestEncoder, + TestShardInfo +} from "./utils"; describe("Node Health Status Matrix Tests", function () { let waku: LightNode; @@ -46,7 +51,7 @@ describe("Node Health Status Matrix Tests", function () { } if (filterPeers > 0) { - await waku.filter.createSubscription(TestShardInfo); + await waku.filter.subscribe([TestDecoder], () => {}); } const lightPushHealth = waku.health.getProtocolStatus( diff --git a/packages/tests/tests/health-manager/protocols.spec.ts b/packages/tests/tests/health-manager/protocols.spec.ts index 965753f420..a0db807616 100644 --- a/packages/tests/tests/health-manager/protocols.spec.ts +++ b/packages/tests/tests/health-manager/protocols.spec.ts @@ -67,14 +67,12 @@ describe("Health Manager", function () { num ); - const { error, subscription } = - await waku.filter.createSubscription(TestShardInfo); + const { error } = await waku.filter.subscribe([TestDecoder], () => {}); + if (error) { expect(error).to.not.equal(undefined); } - await subscription?.subscribe([TestDecoder], () => {}); - const health = waku.health.getProtocolStatus(Protocols.Filter); if (!health) { expect(health).to.not.equal(undefined); diff --git a/packages/tests/tests/relay/index.node.spec.ts b/packages/tests/tests/relay/index.node.spec.ts index f9f4175f21..d9d0860427 100644 --- a/packages/tests/tests/relay/index.node.spec.ts +++ b/packages/tests/tests/relay/index.node.spec.ts @@ -67,10 +67,10 @@ describe("Waku Relay", function () { const symDecoder = createSymDecoder(symTopic, symKey, TestPubsubTopic); const msgs: DecodedMessage[] = []; - void waku2.relay.subscribe([eciesDecoder], (wakuMsg) => { + void waku2.relay.subscribeWithUnsubscribe([eciesDecoder], (wakuMsg) => { msgs.push(wakuMsg); }); - void waku2.relay.subscribe([symDecoder], (wakuMsg) => { + void waku2.relay.subscribeWithUnsubscribe([symDecoder], (wakuMsg) => { msgs.push(wakuMsg); }); @@ -97,7 +97,7 @@ describe("Waku Relay", function () { // The promise **fails** if we receive a message on this observer. const receivedMsgPromise: Promise = new Promise( (resolve, reject) => { - const deleteObserver = waku2.relay.subscribe( + const deleteObserver = waku2.relay.subscribeWithUnsubscribe( [createDecoder(contentTopic)], reject ) as () => void; diff --git a/packages/tests/tests/relay/interop.node.spec.ts b/packages/tests/tests/relay/interop.node.spec.ts index 09909bb673..ed6128bf52 100644 --- a/packages/tests/tests/relay/interop.node.spec.ts +++ b/packages/tests/tests/relay/interop.node.spec.ts @@ -76,8 +76,9 @@ describe("Waku Relay, Interop", function () { const receivedMsgPromise: Promise = new Promise( (resolve) => { - void waku.relay.subscribe(TestDecoder, (msg) => - resolve(msg) + void waku.relay.subscribeWithUnsubscribe( + TestDecoder, + (msg) => resolve(msg) ); } ); @@ -119,7 +120,7 @@ describe("Waku Relay, Interop", function () { const waku2ReceivedMsgPromise: Promise = new Promise( (resolve) => { - void waku2.relay.subscribe(TestDecoder, resolve); + void waku2.relay.subscribeWithUnsubscribe(TestDecoder, resolve); } ); diff --git a/packages/tests/tests/relay/multiple_pubsub.node.spec.ts b/packages/tests/tests/relay/multiple_pubsub.node.spec.ts index b6b4d8a4a3..1ca29a2411 100644 --- a/packages/tests/tests/relay/multiple_pubsub.node.spec.ts +++ b/packages/tests/tests/relay/multiple_pubsub.node.spec.ts @@ -124,9 +124,18 @@ describe("Waku Relay, multiple pubsub topics", function () { waitForRemotePeer(waku3, [Protocols.Relay]) ]); - await waku1.relay.subscribe([testItem.decoder], msgCollector1.callback); - await waku2.relay.subscribe([testItem.decoder], msgCollector2.callback); - await waku3.relay.subscribe([testItem.decoder], msgCollector3.callback); + await waku1.relay.subscribeWithUnsubscribe( + [testItem.decoder], + msgCollector1.callback + ); + await waku2.relay.subscribeWithUnsubscribe( + [testItem.decoder], + msgCollector2.callback + ); + await waku3.relay.subscribeWithUnsubscribe( + [testItem.decoder], + msgCollector3.callback + ); // The nodes are setup in such a way that all messages send should be relayed to the other nodes in the network const relayResponse1 = await waku1.relay.send(testItem.encoder, { @@ -222,15 +231,18 @@ describe("Waku Relay, multiple pubsub topics", function () { waitForRemotePeer(waku3, [Protocols.Relay]) ]); - await waku1.relay.subscribe( + await waku1.relay.subscribeWithUnsubscribe( [customDecoder1, customDecoder2], msgCollector1.callback ); - await waku2.relay.subscribe( + await waku2.relay.subscribeWithUnsubscribe( [customDecoder1, customDecoder2], msgCollector2.callback ); - await waku3.relay.subscribe([customDecoder1], msgCollector3.callback); + await waku3.relay.subscribeWithUnsubscribe( + [customDecoder1], + msgCollector3.callback + ); // The nodes are setup in such a way that all messages send should be relayed to the other nodes in the network // However onlt waku1 and waku2 are receiving messages on the CustomPubSubTopic @@ -290,7 +302,7 @@ describe("Waku Relay, multiple pubsub topics", function () { const waku2ReceivedMsgPromise: Promise = new Promise( (resolve) => { - void waku2.relay.subscribe([customDecoder1], resolve); + void waku2.relay.subscribeWithUnsubscribe([customDecoder1], resolve); } ); @@ -298,7 +310,7 @@ describe("Waku Relay, multiple pubsub topics", function () { // pubsub topic. const waku3NoMsgPromise: Promise = new Promise( (resolve, reject) => { - void waku3.relay.subscribe([TestDecoder], reject); + void waku3.relay.subscribeWithUnsubscribe([TestDecoder], reject); setTimeout(resolve, 1000); } ); @@ -417,9 +429,18 @@ describe("Waku Relay (Autosharding), multiple pubsub topics", function () { waitForRemotePeer(waku3, [Protocols.Relay]) ]); - await waku1.relay.subscribe([testItem.decoder], msgCollector1.callback); - await waku2.relay.subscribe([testItem.decoder], msgCollector2.callback); - await waku3.relay.subscribe([testItem.decoder], msgCollector3.callback); + await waku1.relay.subscribeWithUnsubscribe( + [testItem.decoder], + msgCollector1.callback + ); + await waku2.relay.subscribeWithUnsubscribe( + [testItem.decoder], + msgCollector2.callback + ); + await waku3.relay.subscribeWithUnsubscribe( + [testItem.decoder], + msgCollector3.callback + ); // The nodes are setup in such a way that all messages send should be relayed to the other nodes in the network const relayResponse1 = await waku1.relay.send(testItem.encoder, { @@ -524,15 +545,18 @@ describe("Waku Relay (Autosharding), multiple pubsub topics", function () { waitForRemotePeer(waku3, [Protocols.Relay]) ]); - await waku1.relay.subscribe( + await waku1.relay.subscribeWithUnsubscribe( [customDecoder1, customDecoder2], msgCollector1.callback ); - await waku2.relay.subscribe( + await waku2.relay.subscribeWithUnsubscribe( [customDecoder1, customDecoder2], msgCollector2.callback ); - await waku3.relay.subscribe([customDecoder1], msgCollector3.callback); + await waku3.relay.subscribeWithUnsubscribe( + [customDecoder1], + msgCollector3.callback + ); // The nodes are setup in such a way that all messages send should be relayed to the other nodes in the network // However onlt waku1 and waku2 are receiving messages on the CustomPubSubTopic @@ -619,7 +643,7 @@ describe("Waku Relay (Autosharding), multiple pubsub topics", function () { const waku2ReceivedMsgPromise: Promise = new Promise( (resolve) => { - void waku2.relay.subscribe([customDecoder1], resolve); + void waku2.relay.subscribeWithUnsubscribe([customDecoder1], resolve); } ); @@ -627,7 +651,7 @@ describe("Waku Relay (Autosharding), multiple pubsub topics", function () { // pubsub topic. const waku3NoMsgPromise: Promise = new Promise( (resolve, reject) => { - void waku3.relay.subscribe([TestDecoder], reject); + void waku3.relay.subscribeWithUnsubscribe([TestDecoder], reject); setTimeout(resolve, 1000); } ); @@ -725,9 +749,18 @@ describe("Waku Relay (named sharding), multiple pubsub topics", function () { waitForRemotePeer(waku3, [Protocols.Relay]) ]); - await waku1.relay.subscribe([testItem.decoder], msgCollector1.callback); - await waku2.relay.subscribe([testItem.decoder], msgCollector2.callback); - await waku3.relay.subscribe([testItem.decoder], msgCollector3.callback); + await waku1.relay.subscribeWithUnsubscribe( + [testItem.decoder], + msgCollector1.callback + ); + await waku2.relay.subscribeWithUnsubscribe( + [testItem.decoder], + msgCollector2.callback + ); + await waku3.relay.subscribeWithUnsubscribe( + [testItem.decoder], + msgCollector3.callback + ); // The nodes are setup in such a way that all messages send should be relayed to the other nodes in the network const relayResponse1 = await waku1.relay.send(testItem.encoder, { @@ -823,15 +856,18 @@ describe("Waku Relay (named sharding), multiple pubsub topics", function () { waitForRemotePeer(waku3, [Protocols.Relay]) ]); - await waku1.relay.subscribe( + await waku1.relay.subscribeWithUnsubscribe( [customDecoder1, customDecoder2], msgCollector1.callback ); - await waku2.relay.subscribe( + await waku2.relay.subscribeWithUnsubscribe( [customDecoder1, customDecoder2], msgCollector2.callback ); - await waku3.relay.subscribe([customDecoder1], msgCollector3.callback); + await waku3.relay.subscribeWithUnsubscribe( + [customDecoder1], + msgCollector3.callback + ); // The nodes are setup in such a way that all messages send should be relayed to the other nodes in the network // However onlt waku1 and waku2 are receiving messages on the CustomPubSubTopic @@ -891,7 +927,7 @@ describe("Waku Relay (named sharding), multiple pubsub topics", function () { const waku2ReceivedMsgPromise: Promise = new Promise( (resolve) => { - void waku2.relay.subscribe([customDecoder1], resolve); + void waku2.relay.subscribeWithUnsubscribe([customDecoder1], resolve); } ); @@ -899,7 +935,7 @@ describe("Waku Relay (named sharding), multiple pubsub topics", function () { // pubsub topic. const waku3NoMsgPromise: Promise = new Promise( (resolve, reject) => { - void waku3.relay.subscribe([TestDecoder], reject); + void waku3.relay.subscribeWithUnsubscribe([TestDecoder], reject); setTimeout(resolve, 1000); } ); diff --git a/packages/tests/tests/relay/publish.node.spec.ts b/packages/tests/tests/relay/publish.node.spec.ts index f93c4ea609..f7fc8be951 100644 --- a/packages/tests/tests/relay/publish.node.spec.ts +++ b/packages/tests/tests/relay/publish.node.spec.ts @@ -35,7 +35,10 @@ describe("Waku Relay, Publish", function () { beforeEachCustom(this, async () => { [waku1, waku2] = await runJSNodes(); messageCollector = new MessageCollector(); - await waku2.relay.subscribe([TestDecoder], messageCollector.callback); + await waku2.relay.subscribeWithUnsubscribe( + [TestDecoder], + messageCollector.callback + ); }); afterEachCustom(this, async () => { diff --git a/packages/tests/tests/relay/subscribe.node.spec.ts b/packages/tests/tests/relay/subscribe.node.spec.ts index b234acd53b..a147c4ad68 100644 --- a/packages/tests/tests/relay/subscribe.node.spec.ts +++ b/packages/tests/tests/relay/subscribe.node.spec.ts @@ -85,7 +85,10 @@ describe("Waku Relay, Subscribe", function () { }); it("Subscribe and publish message", async function () { - await waku2.relay.subscribe([TestDecoder], messageCollector.callback); + await waku2.relay.subscribeWithUnsubscribe( + [TestDecoder], + messageCollector.callback + ); await waku1.relay.send(TestEncoder, { payload: utf8ToBytes(messageText) }); expect( await messageCollector.waitForMessages(1, TestWaitMessageOptions) @@ -98,7 +101,10 @@ describe("Waku Relay, Subscribe", function () { it("Subscribe and publish 10000 messages on the same topic", async function () { const messageCount = 10000; - await waku2.relay.subscribe([TestDecoder], messageCollector.callback); + await waku2.relay.subscribeWithUnsubscribe( + [TestDecoder], + messageCollector.callback + ); // Send a unique message on each topic. for (let i = 0; i < messageCount; i++) { await waku1.relay.send(TestEncoder, { @@ -131,8 +137,14 @@ describe("Waku Relay, Subscribe", function () { }); const secondDecoder = createDecoder(secondContentTopic, TestPubsubTopic); - await waku2.relay.subscribe([TestDecoder], messageCollector.callback); - await waku2.relay.subscribe([secondDecoder], messageCollector.callback); + await waku2.relay.subscribeWithUnsubscribe( + [TestDecoder], + messageCollector.callback + ); + await waku2.relay.subscribeWithUnsubscribe( + [secondDecoder], + messageCollector.callback + ); await waku1.relay.send(TestEncoder, { payload: utf8ToBytes("M1") }); await waku1.relay.send(secondEncoder, { payload: utf8ToBytes("M2") }); expect( @@ -158,7 +170,10 @@ describe("Waku Relay, Subscribe", function () { // Subscribe to topics one by one for (let i = 0; i < topicCount; i++) { - await waku2.relay.subscribe([td.decoders[i]], messageCollector.callback); + await waku2.relay.subscribeWithUnsubscribe( + [td.decoders[i]], + messageCollector.callback + ); } // Send a unique message on each topic. @@ -189,7 +204,10 @@ describe("Waku Relay, Subscribe", function () { const td = generateTestData(topicCount, TestWaitMessageOptions); // Subscribe to all topics at once - await waku2.relay.subscribe(td.decoders, messageCollector.callback); + await waku2.relay.subscribeWithUnsubscribe( + td.decoders, + messageCollector.callback + ); // Send a unique message on each topic. for (let i = 0; i < topicCount; i++) { @@ -217,8 +235,14 @@ describe("Waku Relay, Subscribe", function () { // Will be skipped until https://github.com/waku-org/js-waku/issues/1678 is fixed it.skip("Refresh subscription", async function () { - await waku2.relay.subscribe([TestDecoder], messageCollector.callback); - await waku2.relay.subscribe([TestDecoder], messageCollector.callback); + await waku2.relay.subscribeWithUnsubscribe( + [TestDecoder], + messageCollector.callback + ); + await waku2.relay.subscribeWithUnsubscribe( + [TestDecoder], + messageCollector.callback + ); await waku1.relay.send(TestEncoder, { payload: utf8ToBytes("M1") }); @@ -239,9 +263,15 @@ describe("Waku Relay, Subscribe", function () { const td2 = generateTestData(topicCount2, TestWaitMessageOptions); // Subscribe to the first set of topics. - await waku2.relay.subscribe(td1.decoders, messageCollector.callback); + await waku2.relay.subscribeWithUnsubscribe( + td1.decoders, + messageCollector.callback + ); // Subscribe to the second set of topics which has overlapping topics with the first set. - await waku2.relay.subscribe(td2.decoders, messageCollector.callback); + await waku2.relay.subscribeWithUnsubscribe( + td2.decoders, + messageCollector.callback + ); // Send messages to the first set of topics. for (let i = 0; i < topicCount1; i++) { @@ -278,7 +308,10 @@ describe("Waku Relay, Subscribe", function () { }); const newDecoder = createDecoder(newContentTopic, TestPubsubTopic); - await waku2.relay.subscribe([newDecoder], messageCollector.callback); + await waku2.relay.subscribeWithUnsubscribe( + [newDecoder], + messageCollector.callback + ); await waku1.relay.send(newEncoder, { payload: utf8ToBytes(messageText) }); diff --git a/packages/tests/tests/waku.node.spec.ts b/packages/tests/tests/waku.node.spec.ts index 85d56a6258..e2c43c33e9 100644 --- a/packages/tests/tests/waku.node.spec.ts +++ b/packages/tests/tests/waku.node.spec.ts @@ -223,7 +223,7 @@ describe("Decryption Keys", function () { const receivedMsgPromise: Promise = new Promise( (resolve) => { - void waku2.relay.subscribe([decoder], resolve); + void waku2.relay.subscribeWithUnsubscribe([decoder], resolve); } ); diff --git a/packages/utils/src/common/to_async_iterator.ts b/packages/utils/src/common/to_async_iterator.ts index 489ef234ef..a0226369a8 100644 --- a/packages/utils/src/common/to_async_iterator.ts +++ b/packages/utils/src/common/to_async_iterator.ts @@ -37,9 +37,12 @@ export async function toAsyncIterator( const messages: T[] = []; let unsubscribe: undefined | Unsubscribe; - unsubscribe = await receiver.subscribe(decoder, (message: T) => { - messages.push(message); - }); + unsubscribe = await receiver.subscribeWithUnsubscribe( + decoder, + (message: T) => { + messages.push(message); + } + ); const isWithTimeout = Number.isInteger(iteratorOptions?.timeoutMs); const timeoutMs = iteratorOptions?.timeoutMs ?? 0;