feat(filter)!: new simpler filter API (#2092)

* chore: rename IReceiver subscribe

* feat!: new `subscribe() API that only takes in decoders and callback

* chore: `to_async_iterator` uses new function name

* chore: make `createSubscription` private, and shorten error handling

* chore: update subscribe return type

* tests: use new API

* fix: tests
This commit is contained in:
Danish Arora 2024-08-05 15:52:58 +05:30 committed by GitHub
parent fea4f2577b
commit fdd9dc44a4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
25 changed files with 586 additions and 378 deletions

View File

@ -1,15 +1,14 @@
import type { PeerId } from "@libp2p/interface";
import type { IDecodedMessage, IDecoder } from "./message.js";
import type { ContentTopic, PubsubTopic, ThisOrThat } from "./misc.js";
import type { ContentTopic, ThisOrThat } from "./misc.js";
import type {
Callback,
IBaseProtocolCore,
IBaseProtocolSDK,
ProtocolError,
ProtocolUseOptions,
SDKProtocolResult,
ShardingParams
SDKProtocolResult
} from "./protocols.js";
import type { IReceiver } from "./receiver.js";
@ -37,12 +36,28 @@ export interface ISubscriptionSDK {
export type IFilterSDK = IReceiver &
IBaseProtocolSDK & { protocol: IBaseProtocolCore } & {
createSubscription(
pubsubTopicShardInfo?: ShardingParams | PubsubTopic,
options?: ProtocolUseOptions
): Promise<CreateSubscriptionResult>;
subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>,
protocolUseOptions?: ProtocolUseOptions,
subscribeOptions?: SubscribeOptions
): Promise<SubscribeResult>;
};
export type SubscribeResult = SubscriptionSuccess | SubscriptionError;
type SubscriptionSuccess = {
subscription: ISubscriptionSDK;
error: null;
results: SDKProtocolResult;
};
type SubscriptionError = {
subscription: null;
error: ProtocolError;
results: null;
};
export type CreateSubscriptionResult = ThisOrThat<
"subscription",
ISubscriptionSDK,

View File

@ -173,6 +173,11 @@ export enum ProtocolError {
* Ensure that the pubsub topic used for decoder creation is the same as the one used for protocol.
*/
TOPIC_DECODER_MISMATCH = "Topic decoder mismatch",
/**
* The topics passed in the decoders do not match each other, or don't exist at all.
* Ensure that all the pubsub topics used in the decoders are valid and match each other.
*/
INVALID_DECODER_TOPICS = "Invalid decoder topics",
/**
* Failure to find a peer with suitable protocols. This may due to a connection issue.
* Mitigation can be: retrying after a given time period, display connectivity issue

View File

@ -13,8 +13,10 @@ export interface IReceiver {
toSubscriptionIterator: <T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[]
) => Promise<IAsyncIterator<T>>;
subscribe: <T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>
) => Unsubscribe | Promise<Unsubscribe>;
subscribeWithUnsubscribe: SubscribeWithUnsubscribe;
}
type SubscribeWithUnsubscribe = <T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>
) => Unsubscribe | Promise<Unsubscribe>;

View File

@ -148,7 +148,7 @@ class Relay implements IRelay {
};
}
public subscribe<T extends IDecodedMessage>(
public subscribeWithUnsubscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>
): () => void {
@ -171,6 +171,8 @@ class Relay implements IRelay {
};
}
public subscribe = this.subscribeWithUnsubscribe;
private removeObservers<T extends IDecodedMessage>(
observers: Array<[PubsubTopic, Observer<T>]>
): void {

View File

@ -21,6 +21,7 @@ import {
type SDKProtocolResult,
type ShardingParams,
type SubscribeOptions,
SubscribeResult,
type Unsubscribe
} from "@waku/interfaces";
import { messageHashStr } from "@waku/message-hash";
@ -448,19 +449,89 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
this.activeSubscriptions = new Map();
}
//TODO: move to SubscriptionManager
private getActiveSubscription(
pubsubTopic: PubsubTopic
): SubscriptionManager | undefined {
return this.activeSubscriptions.get(pubsubTopic);
}
/**
* Opens a subscription with the Filter protocol using the provided decoders and callback.
* This method combines the functionality of creating a subscription and subscribing to it.
*
* @param {IDecoder<T> | IDecoder<T>[]} decoders - A single decoder or an array of decoders to use for decoding messages.
* @param {Callback<T>} callback - The callback function to be invoked with decoded messages.
* @param {ProtocolUseOptions} [protocolUseOptions] - Optional settings for using the protocol.
* @param {SubscribeOptions} [subscribeOptions=DEFAULT_SUBSCRIBE_OPTIONS] - Options for the subscription.
*
* @returns {Promise<SubscribeResult>} A promise that resolves to an object containing:
* - subscription: The created subscription object if successful, or null if failed.
* - error: A ProtocolError if the subscription creation failed, or null if successful.
* - results: An object containing arrays of failures and successes from the subscription process.
* Only present if the subscription was created successfully.
*
* @throws {Error} If there's an unexpected error during the subscription process.
*
* @remarks
* This method attempts to create a subscription using the pubsub topic derived from the provided decoders,
* then tries to subscribe using the created subscription. The return value should be interpreted as follows:
* - If `subscription` is null and `error` is non-null, a critical error occurred and the subscription failed completely.
* - If `subscription` is non-null and `error` is null, the subscription was created successfully.
* In this case, check the `results` field for detailed information about successes and failures during the subscription process.
* - Even if the subscription was created successfully, there might be some failures in the results.
*
* @example
* ```typescript
* const {subscription, error, results} = await waku.filter.subscribe(decoders, callback);
* if (!subscription || error) {
* console.error("Failed to create subscription:", error);
* }
* console.log("Subscription created successfully");
* if (results.failures.length > 0) {
* console.warn("Some errors occurred during subscription:", results.failures);
* }
* console.log("Successful subscriptions:", results.successes);
*
* ```
*/
public async subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>,
protocolUseOptions?: ProtocolUseOptions,
subscribeOptions: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS
): Promise<SubscribeResult> {
const uniquePubsubTopics = this.getUniquePubsubTopics(decoders);
private setActiveSubscription(
pubsubTopic: PubsubTopic,
subscription: SubscriptionManager
): SubscriptionManager {
this.activeSubscriptions.set(pubsubTopic, subscription);
return subscription;
if (uniquePubsubTopics.length !== 1) {
return {
subscription: null,
error: ProtocolError.INVALID_DECODER_TOPICS,
results: null
};
}
const pubsubTopic = uniquePubsubTopics[0];
const { subscription, error } = await this.createSubscription(
pubsubTopic,
protocolUseOptions
);
if (error) {
return {
subscription: null,
error: error,
results: null
};
}
const { failures, successes } = await subscription.subscribe(
decoders,
callback,
subscribeOptions
);
return {
subscription,
error: null,
results: {
failures: failures,
successes: successes
}
};
}
/**
@ -469,7 +540,7 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
* @param pubsubTopicShardInfo The pubsub topic to subscribe to.
* @returns The subscription object.
*/
public async createSubscription(
private async createSubscription(
pubsubTopicShardInfo: ShardingParams | PubsubTopic,
options?: ProtocolUseOptions
): Promise<CreateSubscriptionResult> {
@ -516,7 +587,6 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
};
}
//TODO: remove this dependency on IReceiver
/**
* This method is used to satisfy the `IReceiver` interface.
*
@ -532,7 +602,7 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
* This method should not be used directly.
* Instead, use `createSubscription` to create a new subscription.
*/
public async subscribe<T extends IDecodedMessage>(
public async subscribeWithUnsubscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>,
options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS
@ -578,6 +648,21 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
return toAsyncIterator(this, decoders);
}
//TODO: move to SubscriptionManager
private getActiveSubscription(
pubsubTopic: PubsubTopic
): SubscriptionManager | undefined {
return this.activeSubscriptions.get(pubsubTopic);
}
private setActiveSubscription(
pubsubTopic: PubsubTopic,
subscription: SubscriptionManager
): SubscriptionManager {
this.activeSubscriptions.set(pubsubTopic, subscription);
return subscription;
}
private getUniquePubsubTopics<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[]
): string[] {

View File

@ -4,7 +4,7 @@ import {
DecodedMessage,
waitForRemotePeer
} from "@waku/core";
import { ISubscriptionSDK, Protocols } from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import type { LightNode } from "@waku/interfaces";
import {
generatePrivateKey,
@ -83,8 +83,6 @@ describe("Waku Message Ephemeral field", function () {
let waku: LightNode;
let nwaku: ServiceNode;
let subscription: ISubscriptionSDK;
afterEachCustom(this, async () => {
await tearDownNodes(nwaku, waku);
});
@ -122,11 +120,6 @@ describe("Waku Message Ephemeral field", function () {
Protocols.LightPush,
Protocols.Store
]);
const { error, subscription: _subscription } =
await waku.filter.createSubscription(TestEncoder.pubsubTopic);
if (error) throw error;
subscription = _subscription;
});
it("Ephemeral messages are not stored", async function () {
@ -218,7 +211,7 @@ describe("Waku Message Ephemeral field", function () {
const callback = (msg: DecodedMessage): void => {
messages.push(msg);
};
await subscription.subscribe([TestDecoder], callback);
await waku.filter.subscribe([TestDecoder], callback);
await delay(200);
const normalTxt = "Normal message";
@ -265,7 +258,7 @@ describe("Waku Message Ephemeral field", function () {
const callback = (msg: DecodedMessage): void => {
messages.push(msg);
};
await subscription.subscribe([decoder], callback);
await waku.filter.subscribe([decoder], callback);
await delay(200);
const normalTxt = "Normal message";
@ -316,7 +309,7 @@ describe("Waku Message Ephemeral field", function () {
const callback = (msg: DecodedMessage): void => {
messages.push(msg);
};
await subscription.subscribe([decoder], callback);
await waku.filter.subscribe([decoder], callback);
await delay(200);
const normalTxt = "Normal message";

View File

@ -28,7 +28,6 @@ describe("Waku Filter: Peer Management: E2E", function () {
this.timeout(15000);
let waku: LightNode;
let serviceNodes: ServiceNodesFleet;
let subscription: ISubscriptionSDK;
const contentTopic = "/test";
@ -47,13 +46,6 @@ describe("Waku Filter: Peer Management: E2E", function () {
undefined,
5
);
const { error, subscription: sub } = await waku.filter.createSubscription(
DefaultTestPubsubTopic
);
if (!sub || error) {
throw new Error("Could not create subscription");
}
subscription = sub;
});
afterEachCustom(this, async () => {
@ -62,12 +54,15 @@ describe("Waku Filter: Peer Management: E2E", function () {
it("Number of peers are maintained correctly", async function () {
const messages: DecodedMessage[] = [];
const { failures, successes } = await subscription.subscribe(
[decoder],
(msg) => {
messages.push(msg);
}
);
const { error, results } = await waku.filter.subscribe([decoder], (msg) => {
messages.push(msg);
});
if (error) {
throw error;
}
const { successes, failures } = results;
await waku.lightPush.send(encoder, {
payload: utf8ToBytes("Hello_World")
@ -82,20 +77,42 @@ describe("Waku Filter: Peer Management: E2E", function () {
});
it("Ping succeeds for all connected peers", async function () {
await subscription.subscribe([decoder], () => {});
const { error, subscription } = await waku.filter.subscribe(
[decoder],
() => {}
);
if (error) {
throw error;
}
const pingResult = await subscription.ping();
expect(pingResult.successes.length).to.equal(waku.filter.numPeersToUse);
expect(pingResult.failures.length).to.equal(0);
});
it("Ping fails for unsubscribed peers", async function () {
const { error, subscription } = await waku.filter.subscribe(
[decoder],
() => {}
);
if (error) {
throw error;
}
await subscription.unsubscribe([contentTopic]);
const pingResult = await subscription.ping();
expect(pingResult.successes.length).to.equal(0);
expect(pingResult.failures.length).to.be.greaterThan(0);
});
it("Keep-alive pings maintain the connection", async function () {
await subscription.subscribe([decoder], () => {}, { keepAlive: 100 });
const { error, subscription } = await waku.filter.subscribe(
[decoder],
() => {},
undefined,
{ keepAlive: 100 }
);
if (error) {
throw error;
}
await delay(1000);
@ -106,9 +123,17 @@ describe("Waku Filter: Peer Management: E2E", function () {
it("Renews peer on consistent ping failures", async function () {
const maxPingFailures = 3;
await subscription.subscribe([decoder], () => {}, {
pingsBeforePeerRenewed: maxPingFailures
});
const { error, subscription } = await waku.filter.subscribe(
[decoder],
() => {},
undefined,
{
pingsBeforePeerRenewed: maxPingFailures
}
);
if (error) {
throw error;
}
const disconnectedNodePeerId = waku.filter.connectedPeers[0].id;
await waku.connectionManager.dropConnection(disconnectedNodePeerId);
@ -135,9 +160,17 @@ describe("Waku Filter: Peer Management: E2E", function () {
it("Tracks peer failures correctly", async function () {
const maxPingFailures = 3;
await subscription.subscribe([decoder], () => {}, {
pingsBeforePeerRenewed: maxPingFailures
});
const { error, subscription } = await waku.filter.subscribe(
[decoder],
() => {},
undefined,
{
pingsBeforePeerRenewed: maxPingFailures
}
);
if (error) {
throw error;
}
const targetPeer = waku.filter.connectedPeers[0];
await waku.connectionManager.dropConnection(targetPeer.id);
@ -163,18 +196,24 @@ describe("Waku Filter: Peer Management: E2E", function () {
});
it("Maintains correct number of peers after multiple subscribe/unsubscribe cycles", async function () {
let subscription: ISubscriptionSDK;
for (let i = 0; i < 3; i++) {
await subscription.subscribe([decoder], () => {});
const { error, subscription: _subscription } =
await waku.filter.subscribe([decoder], () => {});
if (error) {
throw error;
}
subscription = _subscription;
let pingResult = await subscription.ping();
expect(pingResult.successes.length).to.equal(waku.filter.numPeersToUse);
await subscription.unsubscribe([contentTopic]);
pingResult = await subscription.ping();
expect(pingResult.failures.length).to.be.greaterThan(0);
await subscription.subscribe([decoder], () => {});
}
await subscription.subscribe([decoder], () => {});
const finalPingResult = await subscription.ping();
const finalPingResult = await subscription!.ping();
expect(finalPingResult.successes.length).to.equal(
waku.filter.numPeersToUse
);
@ -200,17 +239,15 @@ describe("Waku Filter: Peer Management: E2E", function () {
).toString();
await waku.dial(await nodeWithoutDiscovery.getMultiaddrWithId());
const { error, subscription: sub } = await waku.filter.createSubscription(
DefaultTestPubsubTopic
);
if (!sub || error) {
throw new Error("Could not create subscription");
}
const messages: DecodedMessage[] = [];
const { successes } = await sub.subscribe([decoder], (msg) => {
const { error, results } = await waku.filter.subscribe([decoder], (msg) => {
messages.push(msg);
});
if (error) {
throw error;
}
const { successes } = results;
expect(successes.length).to.be.greaterThan(0);
expect(successes.length).to.be.equal(waku.filter.numPeersToUse);

View File

@ -24,14 +24,9 @@ const runTests = (strictCheckNodes: boolean): void => {
this.timeout(10000);
let waku: LightNode;
let serviceNodes: ServiceNodesFleet;
let subscription: ISubscriptionSDK;
beforeEachCustom(this, async () => {
[serviceNodes, waku] = await runMultipleNodes(this.ctx, TestShardInfo);
const { error, subscription: _subscription } =
await waku.filter.createSubscription(TestShardInfo);
if (error) throw error;
subscription = _subscription;
});
afterEachCustom(this, async () => {
@ -39,10 +34,13 @@ const runTests = (strictCheckNodes: boolean): void => {
});
it("Ping on subscribed peer", async function () {
await subscription.subscribe(
const { error, subscription } = await waku.filter.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
if (error) {
throw error;
}
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
@ -60,14 +58,25 @@ const runTests = (strictCheckNodes: boolean): void => {
});
it("Ping on peer without subscriptions", async function () {
const { subscription, error } = await waku.filter.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
if (error) {
throw error;
}
await subscription.unsubscribe([TestContentTopic]);
await validatePingError(subscription);
});
it("Ping on unsubscribed peer", async function () {
await subscription.subscribe(
const { error, subscription } = await waku.filter.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
if (error) {
throw error;
}
await subscription.ping();
await subscription.unsubscribe([TestContentTopic]);
@ -76,11 +85,17 @@ const runTests = (strictCheckNodes: boolean): void => {
});
it("Reopen subscription with peer with lost subscription", async function () {
let subscription: ISubscriptionSDK;
const openSubscription = async (): Promise<void> => {
await subscription.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
const { error, subscription: _subscription } =
await waku.filter.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
if (error) {
throw error;
}
subscription = _subscription;
};
const unsubscribe = async (): Promise<void> => {

View File

@ -1,5 +1,5 @@
import { waitForRemotePeer } from "@waku/core";
import { ISubscriptionSDK, LightNode, Protocols } from "@waku/interfaces";
import { LightNode, Protocols } from "@waku/interfaces";
import { utf8ToBytes } from "@waku/sdk";
import { expect } from "chai";
@ -29,15 +29,9 @@ const runTests = (strictCheckNodes: boolean): void => {
this.timeout(10000);
let waku: LightNode;
let serviceNodes: ServiceNodesFleet;
let subscription: ISubscriptionSDK;
beforeEachCustom(this, async () => {
[serviceNodes, waku] = await runMultipleNodes(this.ctx, TestShardInfo);
const { error, subscription: _subscription } =
await waku.filter.createSubscription(TestShardInfo);
if (error) throw error;
subscription = _subscription;
});
afterEachCustom(this, async () => {
@ -46,7 +40,7 @@ const runTests = (strictCheckNodes: boolean): void => {
TEST_STRING.forEach((testItem) => {
it(`Check received message containing ${testItem.description}`, async function () {
await subscription.subscribe(
await waku.filter.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
@ -67,7 +61,7 @@ const runTests = (strictCheckNodes: boolean): void => {
TEST_TIMESTAMPS.forEach((testItem) => {
it(`Check received message with timestamp: ${testItem} `, async function () {
await subscription.subscribe(
await waku.filter.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
@ -106,7 +100,7 @@ const runTests = (strictCheckNodes: boolean): void => {
});
it("Check message with invalid timestamp is not received", async function () {
await subscription.subscribe(
await waku.filter.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
@ -128,7 +122,7 @@ const runTests = (strictCheckNodes: boolean): void => {
});
it("Check message on other pubsub topic is not received", async function () {
await subscription.subscribe(
await waku.filter.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
@ -151,7 +145,7 @@ const runTests = (strictCheckNodes: boolean): void => {
});
it("Check message with no content topic is not received", async function () {
await subscription.subscribe(
await waku.filter.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
@ -171,7 +165,7 @@ const runTests = (strictCheckNodes: boolean): void => {
});
it("Check message with no payload is not received", async function () {
await subscription.subscribe(
await waku.filter.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
@ -199,7 +193,7 @@ const runTests = (strictCheckNodes: boolean): void => {
});
it("Check message with non string payload is not received", async function () {
await subscription.subscribe(
await waku.filter.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
@ -222,7 +216,7 @@ const runTests = (strictCheckNodes: boolean): void => {
// Will be skipped until https://github.com/waku-org/js-waku/issues/1464 si done
it.skip("Check message received after jswaku node is restarted", async function () {
// Subscribe and send message
await subscription.subscribe(
await waku.filter.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
@ -242,11 +236,8 @@ const runTests = (strictCheckNodes: boolean): void => {
await waku.dial(await node.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]);
}
const { error, subscription: _subscription } =
await waku.filter.createSubscription(TestShardInfo);
if (error) throw error;
subscription = _subscription;
await subscription.subscribe(
await waku.filter.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
@ -271,7 +262,7 @@ const runTests = (strictCheckNodes: boolean): void => {
// Will be skipped until https://github.com/waku-org/js-waku/issues/1464 si done
it.skip("Check message received after nwaku node is restarted", async function () {
await subscription.subscribe(
await waku.filter.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);

View File

@ -1,7 +1,6 @@
import { createDecoder, createEncoder, waitForRemotePeer } from "@waku/core";
import type {
ContentTopicInfo,
ISubscriptionSDK,
LightNode,
ShardInfo,
SingleShardInfo
@ -32,7 +31,6 @@ describe("Waku Filter V2: Multiple PubsubTopics", function () {
let waku: LightNode;
let nwaku: ServiceNode;
let nwaku2: ServiceNode;
let subscription: ISubscriptionSDK;
let messageCollector: MessageCollector;
const customPubsubTopic1 = singleShardInfoToPubsubTopic({
@ -61,12 +59,6 @@ describe("Waku Filter V2: Multiple PubsubTopics", function () {
beforeEachCustom(this, async () => {
[nwaku, waku] = await runNodes(this.ctx, shardInfo);
const { error, subscription: _subscription } =
await waku.filter.createSubscription(shardInfo);
if (error) throw error;
subscription = _subscription;
messageCollector = new MessageCollector();
});
@ -75,7 +67,7 @@ describe("Waku Filter V2: Multiple PubsubTopics", function () {
});
it("Subscribe and receive messages on custom pubsubtopic", async function () {
await subscription.subscribe([customDecoder1], messageCollector.callback);
await waku.filter.subscribe([customDecoder1], messageCollector.callback);
await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") });
expect(await messageCollector.waitForMessages(1)).to.eq(true);
messageCollector.verifyReceivedMessage(0, {
@ -86,18 +78,11 @@ describe("Waku Filter V2: Multiple PubsubTopics", function () {
});
it("Subscribe and receive messages on 2 different pubsubtopics", async function () {
await subscription.subscribe([customDecoder1], messageCollector.callback);
// Subscribe from the same lightnode to the 2nd pubsubtopic
const { error, subscription: subscription2 } =
await waku.filter.createSubscription(customPubsubTopic2);
if (error) {
throw error;
}
await waku.filter.subscribe([customDecoder1], messageCollector.callback);
const messageCollector2 = new MessageCollector();
await subscription2.subscribe([customDecoder2], messageCollector2.callback);
await waku.filter.subscribe([customDecoder2], messageCollector2.callback);
await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") });
await waku.lightPush.send(customEncoder2, { payload: utf8ToBytes("M2") });
@ -119,7 +104,7 @@ describe("Waku Filter V2: Multiple PubsubTopics", function () {
});
it("Subscribe and receive messages from 2 nwaku nodes each with different pubsubtopics", async function () {
await subscription.subscribe([customDecoder1], messageCollector.callback);
await waku.filter.subscribe([customDecoder1], messageCollector.callback);
// Set up and start a new nwaku node with customPubsubTopic1
nwaku2 = new ServiceNode(makeLogFileName(this) + "2");
@ -133,19 +118,11 @@ describe("Waku Filter V2: Multiple PubsubTopics", function () {
await waku.dial(await nwaku2.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]);
// Subscribe from the same lightnode to the new nwaku on the new pubsubtopic
const { error, subscription: subscription2 } =
await waku.filter.createSubscription(customPubsubTopic2);
if (error) {
throw error;
}
await nwaku2.ensureSubscriptions([customPubsubTopic2]);
const messageCollector2 = new MessageCollector();
await subscription2.subscribe([customDecoder2], messageCollector2.callback);
await waku.filter.subscribe([customDecoder2], messageCollector2.callback);
// Making sure that messages are send and reveiced for both subscriptions
// While loop is done because of https://github.com/waku-org/js-waku/issues/1606
@ -173,17 +150,6 @@ describe("Waku Filter V2: Multiple PubsubTopics", function () {
expectedMessageText: "M2"
});
});
it("Should fail to subscribe with decoder with wrong pubsubTopic", async function () {
// this subscription object is set up with the `customPubsubTopic1` but we're passing it a Decoder with the `customPubsubTopic2`
try {
await subscription.subscribe([customDecoder2], messageCollector.callback);
} catch (error) {
expect((error as Error).message).to.include(
"Pubsub topic not configured"
);
}
});
});
describe("Waku Filter V2 (Autosharding): Multiple PubsubTopics", function () {
@ -193,7 +159,6 @@ describe("Waku Filter V2 (Autosharding): Multiple PubsubTopics", function () {
let waku: LightNode;
let nwaku: ServiceNode;
let nwaku2: ServiceNode;
let subscription: ISubscriptionSDK;
let messageCollector: MessageCollector;
const customContentTopic1 = "/waku/2/content/utf8";
@ -235,10 +200,6 @@ describe("Waku Filter V2 (Autosharding): Multiple PubsubTopics", function () {
beforeEachCustom(this, async () => {
[nwaku, waku] = await runNodes(this.ctx, contentTopicInfo);
const { error, subscription: _subscription } =
await waku.filter.createSubscription(autoshardingPubsubTopic1);
if (error) throw error;
subscription = _subscription;
messageCollector = new MessageCollector();
});
@ -247,7 +208,7 @@ describe("Waku Filter V2 (Autosharding): Multiple PubsubTopics", function () {
});
it("Subscribe and receive messages on autosharded pubsubtopic", async function () {
await subscription.subscribe([customDecoder1], messageCollector.callback);
await waku.filter.subscribe([customDecoder1], messageCollector.callback);
await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") });
expect(
await messageCollector.waitForMessagesAutosharding(1, {
@ -262,19 +223,10 @@ describe("Waku Filter V2 (Autosharding): Multiple PubsubTopics", function () {
});
it("Subscribe and receive messages on 2 different pubsubtopics", async function () {
await subscription.subscribe([customDecoder1], messageCollector.callback);
// Subscribe from the same lightnode to the 2nd pubsubtopic
const { error, subscription: subscription2 } =
await waku.filter.createSubscription(autoshardingPubsubTopic2);
if (error) {
throw error;
}
await waku.filter.subscribe([customDecoder1], messageCollector.callback);
const messageCollector2 = new MessageCollector();
await subscription2.subscribe([customDecoder2], messageCollector2.callback);
await waku.filter.subscribe([customDecoder2], messageCollector2.callback);
await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") });
await waku.lightPush.send(customEncoder2, { payload: utf8ToBytes("M2") });
@ -304,7 +256,7 @@ describe("Waku Filter V2 (Autosharding): Multiple PubsubTopics", function () {
});
it("Subscribe and receive messages from 2 nwaku nodes each with different pubsubtopics", async function () {
await subscription.subscribe([customDecoder1], messageCollector.callback);
await waku.filter.subscribe([customDecoder1], messageCollector.callback);
// Set up and start a new nwaku node with customPubsubTopic1
nwaku2 = new ServiceNode(makeLogFileName(this) + "2");
@ -319,19 +271,11 @@ describe("Waku Filter V2 (Autosharding): Multiple PubsubTopics", function () {
await waku.dial(await nwaku2.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]);
// Subscribe from the same lightnode to the new nwaku on the new pubsubtopic
const { error, subscription: subscription2 } =
await waku.filter.createSubscription(autoshardingPubsubTopic2);
if (error) {
throw error;
}
await nwaku2.ensureSubscriptionsAutosharding([customContentTopic2]);
const messageCollector2 = new MessageCollector();
await subscription2.subscribe([customDecoder2], messageCollector2.callback);
await waku.filter.subscribe([customDecoder2], messageCollector2.callback);
// Making sure that messages are send and reveiced for both subscriptions
// While loop is done because of https://github.com/waku-org/js-waku/issues/1606
@ -363,7 +307,7 @@ describe("Waku Filter V2 (Autosharding): Multiple PubsubTopics", function () {
it("Should fail to subscribe with decoder with wrong pubsubTopic", async function () {
// this subscription object is set up with the `customPubsubTopic1` but we're passing it a Decoder with the `customPubsubTopic2`
try {
await subscription.subscribe([customDecoder2], messageCollector.callback);
await waku.filter.subscribe([customDecoder2], messageCollector.callback);
} catch (error) {
expect((error as Error).message).to.include(
"Pubsub topic not configured"
@ -378,7 +322,6 @@ describe("Waku Filter V2 (Named sharding): Multiple PubsubTopics", function () {
let waku: LightNode;
let nwaku: ServiceNode;
let nwaku2: ServiceNode;
let subscription: ISubscriptionSDK;
let messageCollector: MessageCollector;
const customPubsubTopic1 = singleShardInfoToPubsubTopic({
@ -408,11 +351,6 @@ describe("Waku Filter V2 (Named sharding): Multiple PubsubTopics", function () {
beforeEachCustom(this, async () => {
[nwaku, waku] = await runNodes(this.ctx, shardInfo);
const { error, subscription: _subscription } =
await waku.filter.createSubscription(customPubsubTopic1);
if (error) throw error;
subscription = _subscription;
messageCollector = new MessageCollector();
});
@ -421,7 +359,7 @@ describe("Waku Filter V2 (Named sharding): Multiple PubsubTopics", function () {
});
it("Subscribe and receive messages on custom pubsubtopic", async function () {
await subscription.subscribe([customDecoder1], messageCollector.callback);
await waku.filter.subscribe([customDecoder1], messageCollector.callback);
await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") });
expect(await messageCollector.waitForMessages(1)).to.eq(true);
messageCollector.verifyReceivedMessage(0, {
@ -432,18 +370,11 @@ describe("Waku Filter V2 (Named sharding): Multiple PubsubTopics", function () {
});
it("Subscribe and receive messages on 2 different pubsubtopics", async function () {
await subscription.subscribe([customDecoder1], messageCollector.callback);
// Subscribe from the same lightnode to the 2nd pubsubtopic
const { error, subscription: subscription2 } =
await waku.filter.createSubscription(customPubsubTopic2);
if (error) {
throw error;
}
await waku.filter.subscribe([customDecoder1], messageCollector.callback);
const messageCollector2 = new MessageCollector();
await subscription2.subscribe([customDecoder2], messageCollector2.callback);
await waku.filter.subscribe([customDecoder2], messageCollector2.callback);
await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") });
await waku.lightPush.send(customEncoder2, { payload: utf8ToBytes("M2") });
@ -465,7 +396,7 @@ describe("Waku Filter V2 (Named sharding): Multiple PubsubTopics", function () {
});
it("Subscribe and receive messages from 2 nwaku nodes each with different pubsubtopics", async function () {
await subscription.subscribe([customDecoder1], messageCollector.callback);
await waku.filter.subscribe([customDecoder1], messageCollector.callback);
// Set up and start a new nwaku node with customPubsubTopic1
nwaku2 = new ServiceNode(makeLogFileName(this) + "2");
@ -479,17 +410,11 @@ describe("Waku Filter V2 (Named sharding): Multiple PubsubTopics", function () {
await waku.dial(await nwaku2.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]);
// Subscribe from the same lightnode to the new nwaku on the new pubsubtopic
const { error, subscription: subscription2 } =
await waku.filter.createSubscription(customPubsubTopic2);
if (error) {
throw error;
}
await nwaku2.ensureSubscriptions([customPubsubTopic2]);
const messageCollector2 = new MessageCollector();
await subscription2.subscribe([customDecoder2], messageCollector2.callback);
await waku.filter.subscribe([customDecoder2], messageCollector2.callback);
// Making sure that messages are send and reveiced for both subscriptions
// While loop is done because of https://github.com/waku-org/js-waku/issues/1606
@ -521,7 +446,7 @@ describe("Waku Filter V2 (Named sharding): Multiple PubsubTopics", function () {
it("Should fail to subscribe with decoder with wrong pubsubTopic", async function () {
// this subscription object is set up with the `customPubsubTopic1` but we're passing it a Decoder with the `customPubsubTopic2`
try {
await subscription.subscribe([customDecoder2], messageCollector.callback);
await waku.filter.subscribe([customDecoder2], messageCollector.callback);
} catch (error) {
expect((error as Error).message).to.include(
"Pubsub topic not configured"

View File

@ -24,16 +24,10 @@ describe("Waku Filter V2: Ping", function () {
this.timeout(10000);
let waku: LightNode;
let nwaku: ServiceNode;
let subscription: ISubscriptionSDK;
let messageCollector: MessageCollector;
beforeEachCustom(this, async () => {
[nwaku, waku] = await runNodes(this.ctx, TestShardInfo);
const { error, subscription: _subscription } =
await waku.filter.createSubscription(TestShardInfo);
if (error) throw error;
subscription = _subscription;
messageCollector = new MessageCollector();
});
@ -42,7 +36,13 @@ describe("Waku Filter V2: Ping", function () {
});
it("Ping on subscribed peer", async function () {
await subscription.subscribe([TestDecoder], messageCollector.callback);
const { subscription, error } = await waku.filter.subscribe(
[TestDecoder],
messageCollector.callback
);
if (error) {
throw error;
}
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
expect(await messageCollector.waitForMessages(1)).to.eq(true);
@ -56,11 +56,26 @@ describe("Waku Filter V2: Ping", function () {
});
it("Ping on peer without subscriptions", async function () {
const { subscription, error } = await waku.filter.subscribe(
[TestDecoder],
messageCollector.callback
);
if (error) {
throw error;
}
await subscription.unsubscribe([TestContentTopic]);
await validatePingError(subscription);
});
it("Ping on unsubscribed peer", async function () {
await subscription.subscribe([TestDecoder], messageCollector.callback);
const { error, subscription } = await waku.filter.subscribe(
[TestDecoder],
messageCollector.callback
);
if (error) {
throw error;
}
await subscription.ping();
await subscription.unsubscribe([TestContentTopic]);
@ -69,8 +84,16 @@ describe("Waku Filter V2: Ping", function () {
});
it("Reopen subscription with peer with lost subscription", async function () {
let subscription: ISubscriptionSDK;
const openSubscription = async (): Promise<void> => {
await subscription.subscribe([TestDecoder], messageCollector.callback);
const result = await waku.filter.subscribe(
[TestDecoder],
messageCollector.callback
);
if (result.error) {
throw result.error;
}
subscription = result.subscription;
};
const unsubscribe = async (): Promise<void> => {

View File

@ -1,5 +1,5 @@
import { waitForRemotePeer } from "@waku/core";
import { ISubscriptionSDK, LightNode, Protocols } from "@waku/interfaces";
import { LightNode, Protocols } from "@waku/interfaces";
import { utf8ToBytes } from "@waku/sdk";
import { expect } from "chai";
@ -28,17 +28,10 @@ describe("Waku Filter V2: FilterPush", function () {
this.timeout(10000);
let waku: LightNode;
let nwaku: ServiceNode;
let subscription: ISubscriptionSDK;
let messageCollector: MessageCollector;
beforeEachCustom(this, async () => {
[nwaku, waku] = await runNodes(this.ctx, TestShardInfo);
const { error, subscription: _subscription } =
await waku.filter.createSubscription(TestShardInfo);
if (error) throw error;
subscription = _subscription;
messageCollector = new MessageCollector(nwaku);
});
@ -48,7 +41,7 @@ describe("Waku Filter V2: FilterPush", function () {
TEST_STRING.forEach((testItem) => {
it(`Check received message containing ${testItem.description}`, async function () {
await subscription.subscribe([TestDecoder], messageCollector.callback);
await waku.filter.subscribe([TestDecoder], messageCollector.callback);
await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes(testItem.value)
});
@ -63,7 +56,7 @@ describe("Waku Filter V2: FilterPush", function () {
TEST_TIMESTAMPS.forEach((testItem) => {
it(`Check received message with timestamp: ${testItem} `, async function () {
await subscription.subscribe([TestDecoder], messageCollector.callback);
await waku.filter.subscribe([TestDecoder], messageCollector.callback);
await delay(400);
await nwaku.restCall<boolean>(
@ -97,7 +90,7 @@ describe("Waku Filter V2: FilterPush", function () {
});
it("Check message with invalid timestamp is not received", async function () {
await subscription.subscribe([TestDecoder], messageCollector.callback);
await waku.filter.subscribe([TestDecoder], messageCollector.callback);
await delay(400);
await nwaku.restCall<boolean>(
@ -116,7 +109,7 @@ describe("Waku Filter V2: FilterPush", function () {
});
it("Check message on other pubsub topic is not received", async function () {
await subscription.subscribe([TestDecoder], messageCollector.callback);
await waku.filter.subscribe([TestDecoder], messageCollector.callback);
await delay(400);
await nwaku.restCall<boolean>(
@ -134,7 +127,7 @@ describe("Waku Filter V2: FilterPush", function () {
});
it("Check message with no pubsub topic is not received", async function () {
await subscription.subscribe([TestDecoder], messageCollector.callback);
await waku.filter.subscribe([TestDecoder], messageCollector.callback);
await delay(400);
await nwaku.restCall<boolean>(
@ -152,7 +145,7 @@ describe("Waku Filter V2: FilterPush", function () {
});
it("Check message with no content topic is not received", async function () {
await subscription.subscribe([TestDecoder], messageCollector.callback);
await waku.filter.subscribe([TestDecoder], messageCollector.callback);
await delay(400);
await nwaku.restCall<boolean>(
@ -169,7 +162,7 @@ describe("Waku Filter V2: FilterPush", function () {
});
it("Check message with no payload is not received", async function () {
await subscription.subscribe([TestDecoder], messageCollector.callback);
await waku.filter.subscribe([TestDecoder], messageCollector.callback);
await delay(400);
await nwaku.restCall<boolean>(
@ -191,7 +184,7 @@ describe("Waku Filter V2: FilterPush", function () {
});
it("Check message with non string payload is not received", async function () {
await subscription.subscribe([TestDecoder], messageCollector.callback);
await waku.filter.subscribe([TestDecoder], messageCollector.callback);
await delay(400);
await nwaku.restCall<boolean>(
@ -211,7 +204,7 @@ describe("Waku Filter V2: FilterPush", function () {
// Will be skipped until https://github.com/waku-org/js-waku/issues/1464 si done
it.skip("Check message received after jswaku node is restarted", async function () {
// Subscribe and send message
await subscription.subscribe([TestDecoder], messageCollector.callback);
await waku.filter.subscribe([TestDecoder], messageCollector.callback);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
expect(await messageCollector.waitForMessages(1)).to.eq(true);
@ -224,11 +217,8 @@ describe("Waku Filter V2: FilterPush", function () {
// Redo the connection and create a new subscription
await waku.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]);
const { error, subscription: _subscription } =
await waku.filter.createSubscription();
if (error) throw error;
subscription = _subscription;
await subscription.subscribe([TestDecoder], messageCollector.callback);
await waku.filter.subscribe([TestDecoder], messageCollector.callback);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") });
@ -246,7 +236,7 @@ describe("Waku Filter V2: FilterPush", function () {
// Will be skipped until https://github.com/waku-org/js-waku/issues/1464 si done
it.skip("Check message received after nwaku node is restarted", async function () {
await subscription.subscribe([TestDecoder], messageCollector.callback);
await waku.filter.subscribe([TestDecoder], messageCollector.callback);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
expect(await messageCollector.waitForMessages(1)).to.eq(true);

View File

@ -1,5 +1,5 @@
import { createDecoder, createEncoder, waitForRemotePeer } from "@waku/core";
import { ISubscriptionSDK, LightNode, Protocols } from "@waku/interfaces";
import { LightNode, Protocols } from "@waku/interfaces";
import {
ecies,
generatePrivateKey,
@ -40,18 +40,11 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () {
let waku2: LightNode;
let nwaku: ServiceNode;
let nwaku2: ServiceNode;
let subscription: ISubscriptionSDK;
let messageCollector: MessageCollector;
let ctx: Context;
beforeEachCustom(this, async () => {
[nwaku, waku] = await runNodes(this.ctx, TestShardInfo);
const { error, subscription: _subscription } =
await waku.filter.createSubscription(TestShardInfo);
if (error) throw error;
subscription = _subscription;
messageCollector = new MessageCollector();
await nwaku.ensureSubscriptions([TestPubsubTopic]);
});
@ -61,7 +54,13 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () {
});
it("Subscribe and receive messages via lightPush", async function () {
await subscription.subscribe([TestDecoder], messageCollector.callback);
const { error } = await waku.filter.subscribe(
[TestDecoder],
messageCollector.callback
);
if (error) {
throw error;
}
await waku.lightPush.send(TestEncoder, messagePayload);
@ -88,7 +87,13 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () {
TestPubsubTopic
);
await subscription.subscribe([decoder], messageCollector.callback);
const { error } = await waku.filter.subscribe(
[decoder],
messageCollector.callback
);
if (error) {
throw error;
}
await waku.lightPush.send(encoder, messagePayload);
@ -115,7 +120,13 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () {
TestPubsubTopic
);
await subscription.subscribe([decoder], messageCollector.callback);
const { error } = await waku.filter.subscribe(
[decoder],
messageCollector.callback
);
if (error) {
throw error;
}
await waku.lightPush.send(encoder, messagePayload);
@ -130,7 +141,13 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () {
});
it("Subscribe and receive messages via waku relay post", async function () {
await subscription.subscribe([TestDecoder], messageCollector.callback);
const { error } = await waku.filter.subscribe(
[TestDecoder],
messageCollector.callback
);
if (error) {
throw error;
}
await delay(400);
@ -152,7 +169,7 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () {
});
it("Subscribe and receive 2 messages on the same topic", async function () {
await subscription.subscribe([TestDecoder], messageCollector.callback);
await waku.filter.subscribe([TestDecoder], messageCollector.callback);
await waku.lightPush.send(TestEncoder, messagePayload);
@ -181,7 +198,13 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () {
it("Subscribe and receive messages on 2 different content topics", async function () {
// Subscribe to the first content topic and send a message.
await subscription.subscribe([TestDecoder], messageCollector.callback);
const { error, subscription } = await waku.filter.subscribe(
[TestDecoder],
messageCollector.callback
);
if (error) {
throw error;
}
await waku.lightPush.send(TestEncoder, messagePayload);
expect(await messageCollector.waitForMessages(1)).to.eq(true);
messageCollector.verifyReceivedMessage(0, {
@ -227,7 +250,7 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () {
// Subscribe to all 20 topics.
for (let i = 0; i < topicCount; i++) {
await subscription.subscribe([td.decoders[i]], messageCollector.callback);
await waku.filter.subscribe([td.decoders[i]], messageCollector.callback);
}
// Send a unique message on each topic.
@ -253,7 +276,7 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () {
const topicCount = 100;
const td = generateTestData(topicCount, { pubsubTopic: TestPubsubTopic });
await subscription.subscribe(td.decoders, messageCollector.callback);
await waku.filter.subscribe(td.decoders, messageCollector.callback);
// Send a unique message on each topic.
for (let i = 0; i < topicCount; i++) {
@ -278,10 +301,14 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () {
const td = generateTestData(topicCount, { pubsubTopic: TestPubsubTopic });
try {
const { failures, successes } = await subscription.subscribe(
const { error, results } = await waku.filter.subscribe(
td.decoders,
messageCollector.callback
);
if (error) {
throw error;
}
const { failures, successes } = results;
if (failures.length === 0 || successes.length > 0) {
throw new Error(
`Subscribe to ${topicCount} topics was successful but was expected to fail with a specific error.`
@ -309,10 +336,10 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () {
const td2 = generateTestData(topicCount2, { pubsubTopic: TestPubsubTopic });
// Subscribe to the first set of topics.
await subscription.subscribe(td1.decoders, messageCollector.callback);
await waku.filter.subscribe(td1.decoders, messageCollector.callback);
// Subscribe to the second set of topics which has overlapping topics with the first set.
await subscription.subscribe(td2.decoders, messageCollector.callback);
await waku.filter.subscribe(td2.decoders, messageCollector.callback);
// Send messages to the first set of topics.
for (let i = 0; i < topicCount1; i++) {
@ -339,11 +366,11 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () {
});
it("Refresh subscription", async function () {
await subscription.subscribe([TestDecoder], messageCollector.callback);
await waku.filter.subscribe([TestDecoder], messageCollector.callback);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
// Resubscribe (refresh) to the same topic and send another message.
await subscription.subscribe([TestDecoder], messageCollector.callback);
await waku.filter.subscribe([TestDecoder], messageCollector.callback);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") });
// Confirm both messages were received.
@ -371,7 +398,7 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () {
});
const newDecoder = createDecoder(newContentTopic, TestPubsubTopic);
await subscription.subscribe([newDecoder], messageCollector.callback);
await waku.filter.subscribe([newDecoder], messageCollector.callback);
await waku.lightPush.send(newEncoder, messagePayload);
expect(await messageCollector.waitForMessages(1)).to.eq(true);
@ -384,22 +411,16 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () {
});
it("Add multiple subscription objects on single nwaku node", async function () {
await subscription.subscribe([TestDecoder], messageCollector.callback);
await waku.filter.subscribe([TestDecoder], messageCollector.callback);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
// Create a second subscription on a different topic
const { error, subscription: subscription2 } =
await waku.filter.createSubscription(TestShardInfo);
if (error) {
throw error;
}
const newContentTopic = "/test/2/waku-filter/default";
const newEncoder = createEncoder({
contentTopic: newContentTopic,
pubsubTopic: TestPubsubTopic
});
const newDecoder = createDecoder(newContentTopic, TestPubsubTopic);
await subscription2.subscribe([newDecoder], messageCollector.callback);
await waku.filter.subscribe([newDecoder], messageCollector.callback);
await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M2") });
@ -418,17 +439,13 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () {
});
it("Subscribe and receive messages from multiple nwaku nodes", async function () {
await subscription.subscribe([TestDecoder], messageCollector.callback);
await waku.filter.subscribe([TestDecoder], messageCollector.callback);
// Set up and start a new nwaku node
[nwaku2, waku2] = await runNodes(ctx, TestShardInfo);
await waku.dial(await nwaku2.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]);
const { error, subscription: subscription2 } =
await waku.filter.createSubscription(TestShardInfo);
if (error) {
throw error;
}
await nwaku2.ensureSubscriptions([TestPubsubTopic]);
// Send a message using the new subscription
const newContentTopic = "/test/2/waku-filter/default";
@ -437,7 +454,7 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () {
pubsubTopic: TestPubsubTopic
});
const newDecoder = createDecoder(newContentTopic, TestPubsubTopic);
await subscription2.subscribe([newDecoder], messageCollector.callback);
await waku.filter.subscribe([newDecoder], messageCollector.callback);
// Making sure that messages are send and reveiced for both subscriptions
while (!(await messageCollector.waitForMessages(2))) {

View File

@ -1,5 +1,4 @@
import { createDecoder, createEncoder } from "@waku/core";
import { ISubscriptionSDK } from "@waku/interfaces";
import { LightNode } from "@waku/interfaces";
import { utf8ToBytes } from "@waku/sdk";
import { expect } from "chai";
@ -28,16 +27,11 @@ describe("Waku Filter V2: Unsubscribe", function () {
this.timeout(10000);
let waku: LightNode;
let nwaku: ServiceNode;
let subscription: ISubscriptionSDK;
let messageCollector: MessageCollector;
beforeEachCustom(this, async () => {
[nwaku, waku] = await runNodes(this.ctx, TestShardInfo);
const { error, subscription: _subscription } =
await waku.filter.createSubscription(TestShardInfo);
if (error) throw error;
subscription = _subscription;
messageCollector = new MessageCollector();
await nwaku.ensureSubscriptions([TestPubsubTopic]);
});
@ -47,7 +41,13 @@ describe("Waku Filter V2: Unsubscribe", function () {
});
it("Unsubscribe 1 topic - node subscribed to 1 topic", async function () {
await subscription.subscribe([TestDecoder], messageCollector.callback);
const { subscription, error } = await waku.filter.subscribe(
[TestDecoder],
messageCollector.callback
);
if (error) {
throw error;
}
await waku.lightPush.send(TestEncoder, messagePayload);
expect(await messageCollector.waitForMessages(1)).to.eq(true);
@ -68,7 +68,13 @@ describe("Waku Filter V2: Unsubscribe", function () {
it("Unsubscribe 1 topic - node subscribed to 2 topics", async function () {
// Subscribe to 2 topics and send messages
await subscription.subscribe([TestDecoder], messageCollector.callback);
const { error, subscription } = await waku.filter.subscribe(
[TestDecoder],
messageCollector.callback
);
if (error) {
throw error;
}
const newContentTopic = "/test/2/waku-filter";
const newEncoder = createEncoder({
contentTopic: newContentTopic,
@ -93,7 +99,13 @@ describe("Waku Filter V2: Unsubscribe", function () {
it("Unsubscribe 2 topics - node subscribed to 2 topics", async function () {
// Subscribe to 2 topics and send messages
await subscription.subscribe([TestDecoder], messageCollector.callback);
const { error, subscription } = await waku.filter.subscribe(
[TestDecoder],
messageCollector.callback
);
if (error) {
throw error;
}
const newContentTopic = "/test/2/waku-filter/default";
const newEncoder = createEncoder({
contentTopic: newContentTopic,
@ -118,7 +130,13 @@ describe("Waku Filter V2: Unsubscribe", function () {
it("Unsubscribe topics the node is not subscribed to", async function () {
// Subscribe to 1 topic and send message
await subscription.subscribe([TestDecoder], messageCollector.callback);
const { error, subscription } = await waku.filter.subscribe(
[TestDecoder],
messageCollector.callback
);
if (error) {
throw error;
}
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
expect(await messageCollector.waitForMessages(1)).to.eq(true);
@ -136,7 +154,13 @@ describe("Waku Filter V2: Unsubscribe", function () {
});
it("Unsubscribes all - node subscribed to 1 topic", async function () {
await subscription.subscribe([TestDecoder], messageCollector.callback);
const { error, subscription } = await waku.filter.subscribe(
[TestDecoder],
messageCollector.callback
);
if (error) {
throw error;
}
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
expect(await messageCollector.waitForMessages(1)).to.eq(true);
expect(messageCollector.count).to.eq(1);
@ -155,7 +179,13 @@ describe("Waku Filter V2: Unsubscribe", function () {
// Subscribe to 10 topics and send message
const topicCount = 10;
const td = generateTestData(topicCount, { pubsubTopic: TestPubsubTopic });
await subscription.subscribe(td.decoders, messageCollector.callback);
const { error, subscription } = await waku.filter.subscribe(
td.decoders,
messageCollector.callback
);
if (error) {
throw error;
}
for (let i = 0; i < topicCount; i++) {
await waku.lightPush.send(td.encoders[i], {
payload: utf8ToBytes(`M${i + 1}`)

View File

@ -1,5 +1,5 @@
import { createDecoder, createEncoder } from "@waku/core";
import { ISubscriptionSDK, LightNode } from "@waku/interfaces";
import { LightNode } from "@waku/interfaces";
import {
ecies,
generatePrivateKey,
@ -36,7 +36,6 @@ const runTests = (strictCheckNodes: boolean): void => {
this.timeout(100000);
let waku: LightNode;
let serviceNodes: ServiceNodesFleet;
let subscription: ISubscriptionSDK;
beforeEachCustom(this, async () => {
[serviceNodes, waku] = await runMultipleNodes(
@ -45,12 +44,6 @@ const runTests = (strictCheckNodes: boolean): void => {
undefined,
strictCheckNodes
);
const { error, subscription: _subscription } =
await waku.filter.createSubscription(TestShardInfo);
if (!error) {
subscription = _subscription;
}
});
afterEachCustom(this, async () => {
@ -60,7 +53,7 @@ const runTests = (strictCheckNodes: boolean): void => {
it("Subscribe and receive messages via lightPush", async function () {
expect(waku.libp2p.getConnections()).has.length(3);
await subscription.subscribe(
await waku.filter.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
@ -92,7 +85,7 @@ const runTests = (strictCheckNodes: boolean): void => {
TestPubsubTopic
);
await subscription.subscribe(
await waku.filter.subscribe(
[decoder],
serviceNodes.messageCollector.callback
);
@ -125,7 +118,7 @@ const runTests = (strictCheckNodes: boolean): void => {
TestPubsubTopic
);
await subscription.subscribe(
await waku.filter.subscribe(
[decoder],
serviceNodes.messageCollector.callback
);
@ -146,7 +139,7 @@ const runTests = (strictCheckNodes: boolean): void => {
});
it("Subscribe and receive messages via waku relay post", async function () {
await subscription.subscribe(
await waku.filter.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
@ -173,7 +166,7 @@ const runTests = (strictCheckNodes: boolean): void => {
});
it("Subscribe and receive 2 messages on the same topic", async function () {
await subscription.subscribe(
await waku.filter.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
@ -208,7 +201,7 @@ const runTests = (strictCheckNodes: boolean): void => {
it("Subscribe and receive messages on 2 different content topics", async function () {
// Subscribe to the first content topic and send a message.
await subscription.subscribe(
await waku.filter.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
@ -231,7 +224,7 @@ const runTests = (strictCheckNodes: boolean): void => {
pubsubTopic: TestPubsubTopic
});
const newDecoder = createDecoder(newContentTopic, TestPubsubTopic);
await subscription.subscribe(
await waku.filter.subscribe(
[newDecoder],
serviceNodes.messageCollector.callback
);
@ -267,7 +260,7 @@ const runTests = (strictCheckNodes: boolean): void => {
// Subscribe to all 20 topics.
for (let i = 0; i < topicCount; i++) {
await subscription.subscribe(
await waku.filter.subscribe(
[td.decoders[i]],
serviceNodes.messageCollector.callback
);
@ -298,7 +291,7 @@ const runTests = (strictCheckNodes: boolean): void => {
const topicCount = 100;
const td = generateTestData(topicCount, { pubsubTopic: TestPubsubTopic });
await subscription.subscribe(
await waku.filter.subscribe(
td.decoders,
serviceNodes.messageCollector.callback
);
@ -328,10 +321,14 @@ const runTests = (strictCheckNodes: boolean): void => {
const td = generateTestData(topicCount, { pubsubTopic: TestPubsubTopic });
try {
const { failures, successes } = await subscription.subscribe(
const { error, results } = await waku.filter.subscribe(
td.decoders,
serviceNodes.messageCollector.callback
);
if (error) {
throw error;
}
const { failures, successes } = results;
if (failures.length === 0 || successes.length > 0) {
throw new Error(
`Subscribe to ${topicCount} topics was successful but was expected to fail with a specific error.`
@ -363,13 +360,13 @@ const runTests = (strictCheckNodes: boolean): void => {
});
// Subscribe to the first set of topics.
await subscription.subscribe(
await waku.filter.subscribe(
td1.decoders,
serviceNodes.messageCollector.callback
);
// Subscribe to the second set of topics which has overlapping topics with the first set.
await subscription.subscribe(
await waku.filter.subscribe(
td2.decoders,
serviceNodes.messageCollector.callback
);
@ -398,14 +395,14 @@ const runTests = (strictCheckNodes: boolean): void => {
});
it("Refresh subscription", async function () {
await subscription.subscribe(
await waku.filter.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
// Resubscribe (refresh) to the same topic and send another message.
await subscription.subscribe(
await waku.filter.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
@ -436,7 +433,7 @@ const runTests = (strictCheckNodes: boolean): void => {
});
const newDecoder = createDecoder(newContentTopic, TestPubsubTopic);
await subscription.subscribe(
await waku.filter.subscribe(
[newDecoder],
serviceNodes.messageCollector.callback
);
@ -454,25 +451,19 @@ const runTests = (strictCheckNodes: boolean): void => {
});
it("Add multiple subscription objects on single nwaku node", async function () {
await subscription.subscribe(
await waku.filter.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
// Create a second subscription on a different topic
const { error, subscription: subscription2 } =
await waku.filter.createSubscription(TestShardInfo);
if (error) {
throw error;
}
const newContentTopic = "/test/2/waku-filter/default";
const newEncoder = createEncoder({
contentTopic: newContentTopic,
pubsubTopic: TestPubsubTopic
});
const newDecoder = createDecoder(newContentTopic, TestPubsubTopic);
await subscription2.subscribe(
await waku.filter.subscribe(
[newDecoder],
serviceNodes.messageCollector.callback
);

View File

@ -1,5 +1,5 @@
import { createDecoder, createEncoder } from "@waku/core";
import { ISubscriptionSDK, LightNode } from "@waku/interfaces";
import { type LightNode } from "@waku/interfaces";
import { utf8ToBytes } from "@waku/sdk";
import { expect } from "chai";
@ -28,22 +28,12 @@ const runTests = (strictCheckNodes: boolean): void => {
this.timeout(10000);
let waku: LightNode;
let serviceNodes: ServiceNodesFleet;
let subscription: ISubscriptionSDK;
beforeEachCustom(this, async () => {
[serviceNodes, waku] = await runMultipleNodes(this.ctx, {
contentTopics: [TestContentTopic],
clusterId: ClusterId
});
const { error, subscription: _subscription } =
await waku.filter.createSubscription({
contentTopics: [TestContentTopic],
clusterId: ClusterId
});
if (!error) {
subscription = _subscription;
}
});
afterEachCustom(this, async () => {
@ -51,10 +41,13 @@ const runTests = (strictCheckNodes: boolean): void => {
});
it("Unsubscribe 1 topic - node subscribed to 1 topic", async function () {
await subscription.subscribe(
const { error, subscription } = await waku.filter.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
if (error) {
throw error;
}
await waku.lightPush.send(TestEncoder, messagePayload);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
@ -79,17 +72,20 @@ const runTests = (strictCheckNodes: boolean): void => {
it("Unsubscribe 1 topic - node subscribed to 2 topics", async function () {
// Subscribe to 2 topics and send messages
await subscription.subscribe(
const { error, subscription } = await waku.filter.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
if (error) {
throw error;
}
const newContentTopic = "/test/2/waku-filter";
const newEncoder = createEncoder({
contentTopic: newContentTopic,
pubsubTopic: TestPubsubTopic
});
const newDecoder = createDecoder(newContentTopic, TestPubsubTopic);
await subscription.subscribe(
await waku.filter.subscribe(
[newDecoder],
serviceNodes.messageCollector.callback
);
@ -114,7 +110,7 @@ const runTests = (strictCheckNodes: boolean): void => {
it("Unsubscribe 2 topics - node subscribed to 2 topics", async function () {
// Subscribe to 2 topics and send messages
await subscription.subscribe(
await waku.filter.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
@ -124,10 +120,13 @@ const runTests = (strictCheckNodes: boolean): void => {
pubsubTopic: TestPubsubTopic
});
const newDecoder = createDecoder(newContentTopic, TestPubsubTopic);
await subscription.subscribe(
const { error, subscription } = await waku.filter.subscribe(
[newDecoder],
serviceNodes.messageCollector.callback
);
if (error) {
throw error;
}
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M2") });
expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq(
@ -149,10 +148,13 @@ const runTests = (strictCheckNodes: boolean): void => {
it("Unsubscribe topics the node is not subscribed to", async function () {
// Subscribe to 1 topic and send message
await subscription.subscribe(
const { error, subscription } = await waku.filter.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
if (error) {
throw error;
}
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
@ -174,10 +176,13 @@ const runTests = (strictCheckNodes: boolean): void => {
});
it("Unsubscribes all - node subscribed to 1 topic", async function () {
await subscription.subscribe(
const { error, subscription } = await waku.filter.subscribe(
[TestDecoder],
serviceNodes.messageCollector.callback
);
if (error) {
throw error;
}
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true
@ -200,10 +205,13 @@ const runTests = (strictCheckNodes: boolean): void => {
// Subscribe to 10 topics and send message
const topicCount = 10;
const td = generateTestData(topicCount, { pubsubTopic: TestPubsubTopic });
await subscription.subscribe(
const { error, subscription } = await waku.filter.subscribe(
td.decoders,
serviceNodes.messageCollector.callback
);
if (error) {
throw error;
}
for (let i = 0; i < topicCount; i++) {
await waku.lightPush.send(td.encoders[i], {
payload: utf8ToBytes(`M${i + 1}`)

View File

@ -10,7 +10,12 @@ import {
ServiceNodesFleet
} from "../../src";
import { messagePayload, TestEncoder, TestShardInfo } from "./utils";
import {
messagePayload,
TestDecoder,
TestEncoder,
TestShardInfo
} from "./utils";
describe("Node Health Status Matrix Tests", function () {
let waku: LightNode;
@ -46,7 +51,7 @@ describe("Node Health Status Matrix Tests", function () {
}
if (filterPeers > 0) {
await waku.filter.createSubscription(TestShardInfo);
await waku.filter.subscribe([TestDecoder], () => {});
}
const lightPushHealth = waku.health.getProtocolStatus(

View File

@ -67,14 +67,12 @@ describe("Health Manager", function () {
num
);
const { error, subscription } =
await waku.filter.createSubscription(TestShardInfo);
const { error } = await waku.filter.subscribe([TestDecoder], () => {});
if (error) {
expect(error).to.not.equal(undefined);
}
await subscription?.subscribe([TestDecoder], () => {});
const health = waku.health.getProtocolStatus(Protocols.Filter);
if (!health) {
expect(health).to.not.equal(undefined);

View File

@ -67,10 +67,10 @@ describe("Waku Relay", function () {
const symDecoder = createSymDecoder(symTopic, symKey, TestPubsubTopic);
const msgs: DecodedMessage[] = [];
void waku2.relay.subscribe([eciesDecoder], (wakuMsg) => {
void waku2.relay.subscribeWithUnsubscribe([eciesDecoder], (wakuMsg) => {
msgs.push(wakuMsg);
});
void waku2.relay.subscribe([symDecoder], (wakuMsg) => {
void waku2.relay.subscribeWithUnsubscribe([symDecoder], (wakuMsg) => {
msgs.push(wakuMsg);
});
@ -97,7 +97,7 @@ describe("Waku Relay", function () {
// The promise **fails** if we receive a message on this observer.
const receivedMsgPromise: Promise<DecodedMessage> = new Promise(
(resolve, reject) => {
const deleteObserver = waku2.relay.subscribe(
const deleteObserver = waku2.relay.subscribeWithUnsubscribe(
[createDecoder(contentTopic)],
reject
) as () => void;

View File

@ -76,8 +76,9 @@ describe("Waku Relay, Interop", function () {
const receivedMsgPromise: Promise<DecodedMessage> = new Promise(
(resolve) => {
void waku.relay.subscribe<DecodedMessage>(TestDecoder, (msg) =>
resolve(msg)
void waku.relay.subscribeWithUnsubscribe<DecodedMessage>(
TestDecoder,
(msg) => resolve(msg)
);
}
);
@ -119,7 +120,7 @@ describe("Waku Relay, Interop", function () {
const waku2ReceivedMsgPromise: Promise<DecodedMessage> = new Promise(
(resolve) => {
void waku2.relay.subscribe(TestDecoder, resolve);
void waku2.relay.subscribeWithUnsubscribe(TestDecoder, resolve);
}
);

View File

@ -124,9 +124,18 @@ describe("Waku Relay, multiple pubsub topics", function () {
waitForRemotePeer(waku3, [Protocols.Relay])
]);
await waku1.relay.subscribe([testItem.decoder], msgCollector1.callback);
await waku2.relay.subscribe([testItem.decoder], msgCollector2.callback);
await waku3.relay.subscribe([testItem.decoder], msgCollector3.callback);
await waku1.relay.subscribeWithUnsubscribe(
[testItem.decoder],
msgCollector1.callback
);
await waku2.relay.subscribeWithUnsubscribe(
[testItem.decoder],
msgCollector2.callback
);
await waku3.relay.subscribeWithUnsubscribe(
[testItem.decoder],
msgCollector3.callback
);
// The nodes are setup in such a way that all messages send should be relayed to the other nodes in the network
const relayResponse1 = await waku1.relay.send(testItem.encoder, {
@ -222,15 +231,18 @@ describe("Waku Relay, multiple pubsub topics", function () {
waitForRemotePeer(waku3, [Protocols.Relay])
]);
await waku1.relay.subscribe(
await waku1.relay.subscribeWithUnsubscribe(
[customDecoder1, customDecoder2],
msgCollector1.callback
);
await waku2.relay.subscribe(
await waku2.relay.subscribeWithUnsubscribe(
[customDecoder1, customDecoder2],
msgCollector2.callback
);
await waku3.relay.subscribe([customDecoder1], msgCollector3.callback);
await waku3.relay.subscribeWithUnsubscribe(
[customDecoder1],
msgCollector3.callback
);
// The nodes are setup in such a way that all messages send should be relayed to the other nodes in the network
// However onlt waku1 and waku2 are receiving messages on the CustomPubSubTopic
@ -290,7 +302,7 @@ describe("Waku Relay, multiple pubsub topics", function () {
const waku2ReceivedMsgPromise: Promise<DecodedMessage> = new Promise(
(resolve) => {
void waku2.relay.subscribe([customDecoder1], resolve);
void waku2.relay.subscribeWithUnsubscribe([customDecoder1], resolve);
}
);
@ -298,7 +310,7 @@ describe("Waku Relay, multiple pubsub topics", function () {
// pubsub topic.
const waku3NoMsgPromise: Promise<DecodedMessage> = new Promise(
(resolve, reject) => {
void waku3.relay.subscribe([TestDecoder], reject);
void waku3.relay.subscribeWithUnsubscribe([TestDecoder], reject);
setTimeout(resolve, 1000);
}
);
@ -417,9 +429,18 @@ describe("Waku Relay (Autosharding), multiple pubsub topics", function () {
waitForRemotePeer(waku3, [Protocols.Relay])
]);
await waku1.relay.subscribe([testItem.decoder], msgCollector1.callback);
await waku2.relay.subscribe([testItem.decoder], msgCollector2.callback);
await waku3.relay.subscribe([testItem.decoder], msgCollector3.callback);
await waku1.relay.subscribeWithUnsubscribe(
[testItem.decoder],
msgCollector1.callback
);
await waku2.relay.subscribeWithUnsubscribe(
[testItem.decoder],
msgCollector2.callback
);
await waku3.relay.subscribeWithUnsubscribe(
[testItem.decoder],
msgCollector3.callback
);
// The nodes are setup in such a way that all messages send should be relayed to the other nodes in the network
const relayResponse1 = await waku1.relay.send(testItem.encoder, {
@ -524,15 +545,18 @@ describe("Waku Relay (Autosharding), multiple pubsub topics", function () {
waitForRemotePeer(waku3, [Protocols.Relay])
]);
await waku1.relay.subscribe(
await waku1.relay.subscribeWithUnsubscribe(
[customDecoder1, customDecoder2],
msgCollector1.callback
);
await waku2.relay.subscribe(
await waku2.relay.subscribeWithUnsubscribe(
[customDecoder1, customDecoder2],
msgCollector2.callback
);
await waku3.relay.subscribe([customDecoder1], msgCollector3.callback);
await waku3.relay.subscribeWithUnsubscribe(
[customDecoder1],
msgCollector3.callback
);
// The nodes are setup in such a way that all messages send should be relayed to the other nodes in the network
// However onlt waku1 and waku2 are receiving messages on the CustomPubSubTopic
@ -619,7 +643,7 @@ describe("Waku Relay (Autosharding), multiple pubsub topics", function () {
const waku2ReceivedMsgPromise: Promise<DecodedMessage> = new Promise(
(resolve) => {
void waku2.relay.subscribe([customDecoder1], resolve);
void waku2.relay.subscribeWithUnsubscribe([customDecoder1], resolve);
}
);
@ -627,7 +651,7 @@ describe("Waku Relay (Autosharding), multiple pubsub topics", function () {
// pubsub topic.
const waku3NoMsgPromise: Promise<DecodedMessage> = new Promise(
(resolve, reject) => {
void waku3.relay.subscribe([TestDecoder], reject);
void waku3.relay.subscribeWithUnsubscribe([TestDecoder], reject);
setTimeout(resolve, 1000);
}
);
@ -725,9 +749,18 @@ describe("Waku Relay (named sharding), multiple pubsub topics", function () {
waitForRemotePeer(waku3, [Protocols.Relay])
]);
await waku1.relay.subscribe([testItem.decoder], msgCollector1.callback);
await waku2.relay.subscribe([testItem.decoder], msgCollector2.callback);
await waku3.relay.subscribe([testItem.decoder], msgCollector3.callback);
await waku1.relay.subscribeWithUnsubscribe(
[testItem.decoder],
msgCollector1.callback
);
await waku2.relay.subscribeWithUnsubscribe(
[testItem.decoder],
msgCollector2.callback
);
await waku3.relay.subscribeWithUnsubscribe(
[testItem.decoder],
msgCollector3.callback
);
// The nodes are setup in such a way that all messages send should be relayed to the other nodes in the network
const relayResponse1 = await waku1.relay.send(testItem.encoder, {
@ -823,15 +856,18 @@ describe("Waku Relay (named sharding), multiple pubsub topics", function () {
waitForRemotePeer(waku3, [Protocols.Relay])
]);
await waku1.relay.subscribe(
await waku1.relay.subscribeWithUnsubscribe(
[customDecoder1, customDecoder2],
msgCollector1.callback
);
await waku2.relay.subscribe(
await waku2.relay.subscribeWithUnsubscribe(
[customDecoder1, customDecoder2],
msgCollector2.callback
);
await waku3.relay.subscribe([customDecoder1], msgCollector3.callback);
await waku3.relay.subscribeWithUnsubscribe(
[customDecoder1],
msgCollector3.callback
);
// The nodes are setup in such a way that all messages send should be relayed to the other nodes in the network
// However onlt waku1 and waku2 are receiving messages on the CustomPubSubTopic
@ -891,7 +927,7 @@ describe("Waku Relay (named sharding), multiple pubsub topics", function () {
const waku2ReceivedMsgPromise: Promise<DecodedMessage> = new Promise(
(resolve) => {
void waku2.relay.subscribe([customDecoder1], resolve);
void waku2.relay.subscribeWithUnsubscribe([customDecoder1], resolve);
}
);
@ -899,7 +935,7 @@ describe("Waku Relay (named sharding), multiple pubsub topics", function () {
// pubsub topic.
const waku3NoMsgPromise: Promise<DecodedMessage> = new Promise(
(resolve, reject) => {
void waku3.relay.subscribe([TestDecoder], reject);
void waku3.relay.subscribeWithUnsubscribe([TestDecoder], reject);
setTimeout(resolve, 1000);
}
);

View File

@ -35,7 +35,10 @@ describe("Waku Relay, Publish", function () {
beforeEachCustom(this, async () => {
[waku1, waku2] = await runJSNodes();
messageCollector = new MessageCollector();
await waku2.relay.subscribe([TestDecoder], messageCollector.callback);
await waku2.relay.subscribeWithUnsubscribe(
[TestDecoder],
messageCollector.callback
);
});
afterEachCustom(this, async () => {

View File

@ -85,7 +85,10 @@ describe("Waku Relay, Subscribe", function () {
});
it("Subscribe and publish message", async function () {
await waku2.relay.subscribe([TestDecoder], messageCollector.callback);
await waku2.relay.subscribeWithUnsubscribe(
[TestDecoder],
messageCollector.callback
);
await waku1.relay.send(TestEncoder, { payload: utf8ToBytes(messageText) });
expect(
await messageCollector.waitForMessages(1, TestWaitMessageOptions)
@ -98,7 +101,10 @@ describe("Waku Relay, Subscribe", function () {
it("Subscribe and publish 10000 messages on the same topic", async function () {
const messageCount = 10000;
await waku2.relay.subscribe([TestDecoder], messageCollector.callback);
await waku2.relay.subscribeWithUnsubscribe(
[TestDecoder],
messageCollector.callback
);
// Send a unique message on each topic.
for (let i = 0; i < messageCount; i++) {
await waku1.relay.send(TestEncoder, {
@ -131,8 +137,14 @@ describe("Waku Relay, Subscribe", function () {
});
const secondDecoder = createDecoder(secondContentTopic, TestPubsubTopic);
await waku2.relay.subscribe([TestDecoder], messageCollector.callback);
await waku2.relay.subscribe([secondDecoder], messageCollector.callback);
await waku2.relay.subscribeWithUnsubscribe(
[TestDecoder],
messageCollector.callback
);
await waku2.relay.subscribeWithUnsubscribe(
[secondDecoder],
messageCollector.callback
);
await waku1.relay.send(TestEncoder, { payload: utf8ToBytes("M1") });
await waku1.relay.send(secondEncoder, { payload: utf8ToBytes("M2") });
expect(
@ -158,7 +170,10 @@ describe("Waku Relay, Subscribe", function () {
// Subscribe to topics one by one
for (let i = 0; i < topicCount; i++) {
await waku2.relay.subscribe([td.decoders[i]], messageCollector.callback);
await waku2.relay.subscribeWithUnsubscribe(
[td.decoders[i]],
messageCollector.callback
);
}
// Send a unique message on each topic.
@ -189,7 +204,10 @@ describe("Waku Relay, Subscribe", function () {
const td = generateTestData(topicCount, TestWaitMessageOptions);
// Subscribe to all topics at once
await waku2.relay.subscribe(td.decoders, messageCollector.callback);
await waku2.relay.subscribeWithUnsubscribe(
td.decoders,
messageCollector.callback
);
// Send a unique message on each topic.
for (let i = 0; i < topicCount; i++) {
@ -217,8 +235,14 @@ describe("Waku Relay, Subscribe", function () {
// Will be skipped until https://github.com/waku-org/js-waku/issues/1678 is fixed
it.skip("Refresh subscription", async function () {
await waku2.relay.subscribe([TestDecoder], messageCollector.callback);
await waku2.relay.subscribe([TestDecoder], messageCollector.callback);
await waku2.relay.subscribeWithUnsubscribe(
[TestDecoder],
messageCollector.callback
);
await waku2.relay.subscribeWithUnsubscribe(
[TestDecoder],
messageCollector.callback
);
await waku1.relay.send(TestEncoder, { payload: utf8ToBytes("M1") });
@ -239,9 +263,15 @@ describe("Waku Relay, Subscribe", function () {
const td2 = generateTestData(topicCount2, TestWaitMessageOptions);
// Subscribe to the first set of topics.
await waku2.relay.subscribe(td1.decoders, messageCollector.callback);
await waku2.relay.subscribeWithUnsubscribe(
td1.decoders,
messageCollector.callback
);
// Subscribe to the second set of topics which has overlapping topics with the first set.
await waku2.relay.subscribe(td2.decoders, messageCollector.callback);
await waku2.relay.subscribeWithUnsubscribe(
td2.decoders,
messageCollector.callback
);
// Send messages to the first set of topics.
for (let i = 0; i < topicCount1; i++) {
@ -278,7 +308,10 @@ describe("Waku Relay, Subscribe", function () {
});
const newDecoder = createDecoder(newContentTopic, TestPubsubTopic);
await waku2.relay.subscribe([newDecoder], messageCollector.callback);
await waku2.relay.subscribeWithUnsubscribe(
[newDecoder],
messageCollector.callback
);
await waku1.relay.send(newEncoder, {
payload: utf8ToBytes(messageText)
});

View File

@ -223,7 +223,7 @@ describe("Decryption Keys", function () {
const receivedMsgPromise: Promise<DecodedMessage> = new Promise(
(resolve) => {
void waku2.relay.subscribe([decoder], resolve);
void waku2.relay.subscribeWithUnsubscribe([decoder], resolve);
}
);

View File

@ -37,9 +37,12 @@ export async function toAsyncIterator<T extends IDecodedMessage>(
const messages: T[] = [];
let unsubscribe: undefined | Unsubscribe;
unsubscribe = await receiver.subscribe(decoder, (message: T) => {
messages.push(message);
});
unsubscribe = await receiver.subscribeWithUnsubscribe(
decoder,
(message: T) => {
messages.push(message);
}
);
const isWithTimeout = Number.isInteger(iteratorOptions?.timeoutMs);
const timeoutMs = iteratorOptions?.timeoutMs ?? 0;