diff --git a/packages/interfaces/src/filter.ts b/packages/interfaces/src/filter.ts index b4071a4d26..4a315d7279 100644 --- a/packages/interfaces/src/filter.ts +++ b/packages/interfaces/src/filter.ts @@ -5,6 +5,7 @@ import type { IBaseProtocolCore, IBaseProtocolSDK, ProtocolError, + ProtocolUseOptions, SDKProtocolResult, ShardingParams } from "./protocols.js"; @@ -34,7 +35,7 @@ export type IFilterSDK = IReceiver & IBaseProtocolSDK & { protocol: IBaseProtocolCore } & { createSubscription( pubsubTopicShardInfo?: ShardingParams | PubsubTopic, - options?: SubscribeOptions + options?: ProtocolUseOptions ): Promise; }; diff --git a/packages/interfaces/src/protocols.ts b/packages/interfaces/src/protocols.ts index 0371587ae7..9aeab47285 100644 --- a/packages/interfaces/src/protocols.ts +++ b/packages/interfaces/src/protocols.ts @@ -43,6 +43,33 @@ export type ApplicationInfo = { export type ShardingParams = ShardInfo | ContentTopicInfo | ApplicationInfo; +//TODO: merge this with ProtocolCreateOptions or establish distinction: https://github.com/waku-org/js-waku/issues/2048 +/** + * Options for using LightPush and Filter + */ +export type ProtocolUseOptions = { + /** + * Optional flag to enable auto-retry with exponential backoff + */ + autoRetry?: boolean; + /** + * Optional flag to force using all available peers + */ + forceUseAllPeers?: boolean; + /** + * Optional maximum number of attempts for exponential backoff + */ + maxAttempts?: number; + /** + * Optional initial delay in milliseconds for exponential backoff + */ + initialDelay?: number; + /** + * Optional maximum delay in milliseconds for exponential backoff + */ + maxDelay?: number; +}; + export type ProtocolCreateOptions = { /** * @deprecated diff --git a/packages/interfaces/src/sender.ts b/packages/interfaces/src/sender.ts index 672395e84c..c195403a7e 100644 --- a/packages/interfaces/src/sender.ts +++ b/packages/interfaces/src/sender.ts @@ -1,36 +1,10 @@ import type { IEncoder, IMessage } from "./message.js"; -import { SDKProtocolResult } from "./protocols.js"; +import { ProtocolUseOptions, SDKProtocolResult } from "./protocols.js"; export interface ISender { send: ( encoder: IEncoder, message: IMessage, - sendOptions?: SendOptions + sendOptions?: ProtocolUseOptions ) => Promise; } - -/** - * Options for using LightPush - */ -export type SendOptions = { - /** - * Optional flag to enable auto-retry with exponential backoff - */ - autoRetry?: boolean; - /** - * Optional flag to force using all available peers - */ - forceUseAllPeers?: boolean; - /** - * Optional maximum number of attempts for exponential backoff - */ - maxAttempts?: number; - /** - * Optional initial delay in milliseconds for exponential backoff - */ - initialDelay?: number; - /** - * Optional maximum delay in milliseconds for exponential backoff - */ - maxDelay?: number; -}; diff --git a/packages/sdk/src/protocols/base_protocol.ts b/packages/sdk/src/protocols/base_protocol.ts index 9688859cea..b1a845df19 100644 --- a/packages/sdk/src/protocols/base_protocol.ts +++ b/packages/sdk/src/protocols/base_protocol.ts @@ -1,7 +1,7 @@ import type { Peer, PeerId } from "@libp2p/interface"; import { ConnectionManager } from "@waku/core"; import { BaseProtocol } from "@waku/core/lib/base_protocol"; -import { IBaseProtocolSDK, SendOptions } from "@waku/interfaces"; +import { IBaseProtocolSDK, ProtocolUseOptions } from "@waku/interfaces"; import { delay, Logger } from "@waku/utils"; interface Options { @@ -86,7 +86,7 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { * @param options.maxDelay Optional maximum delay in milliseconds for exponential backoff (default: 100) */ protected hasPeers = async ( - options: Partial = {} + options: Partial = {} ): Promise => { const { autoRetry = false, diff --git a/packages/sdk/src/protocols/filter.ts b/packages/sdk/src/protocols/filter.ts index 559279c314..cd3db714e2 100644 --- a/packages/sdk/src/protocols/filter.ts +++ b/packages/sdk/src/protocols/filter.ts @@ -14,6 +14,7 @@ import { type Libp2p, type ProtocolCreateOptions, ProtocolError, + ProtocolUseOptions, type PubsubTopic, SDKProtocolResult, type ShardingParams, @@ -45,7 +46,6 @@ const DEFAULT_SUBSCRIBE_OPTIONS = { }; export class SubscriptionManager implements ISubscriptionSDK { private readonly pubsubTopic: PubsubTopic; - readonly peers: Peer[]; readonly receivedMessagesHashStr: string[] = []; private keepAliveTimer: number | null = null; @@ -56,10 +56,9 @@ export class SubscriptionManager implements ISubscriptionSDK { constructor( pubsubTopic: PubsubTopic, - remotePeers: Peer[], + private peers: Peer[], private protocol: FilterCore ) { - this.peers = remotePeers; this.pubsubTopic = pubsubTopic; this.subscriptionCallbacks = new Map(); } @@ -314,8 +313,14 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK { * @returns The subscription object. */ async createSubscription( - pubsubTopicShardInfo: ShardingParams | PubsubTopic + pubsubTopicShardInfo: ShardingParams | PubsubTopic, + options?: ProtocolUseOptions ): Promise { + options = { + autoRetry: true, + ...options + } as ProtocolUseOptions; + const pubsubTopic = typeof pubsubTopicShardInfo == "string" ? pubsubTopicShardInfo @@ -323,17 +328,8 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK { ensurePubsubTopicIsConfigured(pubsubTopic, this.protocol.pubsubTopics); - let peers: Peer[] = []; - try { - peers = await this.protocol.getPeers(); - } catch (error) { - log.error("Error getting peers to initiate subscription: ", error); - return { - error: ProtocolError.GENERIC_FAIL, - subscription: null - }; - } - if (peers.length === 0) { + const hasPeers = await this.hasPeers(options); + if (!hasPeers) { return { error: ProtocolError.NO_PEER_AVAILABLE, subscription: null @@ -341,15 +337,15 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK { } log.info( - `Creating filter subscription with ${peers.length} peers: `, - peers.map((peer) => peer.id.toString()) + `Creating filter subscription with ${this.connectedPeers.length} peers: `, + this.connectedPeers.map((peer) => peer.id.toString()) ); const subscription = this.getActiveSubscription(pubsubTopic) ?? this.setActiveSubscription( pubsubTopic, - new SubscriptionManager(pubsubTopic, peers, this.protocol) + new SubscriptionManager(pubsubTopic, this.connectedPeers, this.protocol) ); return { diff --git a/packages/sdk/src/protocols/light_push.ts b/packages/sdk/src/protocols/light_push.ts index ae52e06e49..0ae05056dd 100644 --- a/packages/sdk/src/protocols/light_push.ts +++ b/packages/sdk/src/protocols/light_push.ts @@ -8,8 +8,8 @@ import { type Libp2p, type ProtocolCreateOptions, ProtocolError, - SDKProtocolResult, - SendOptions + ProtocolUseOptions, + SDKProtocolResult } from "@waku/interfaces"; import { ensurePubsubTopicIsConfigured, Logger } from "@waku/utils"; @@ -35,12 +35,12 @@ class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK { async send( encoder: IEncoder, message: IMessage, - _options?: SendOptions + _options?: ProtocolUseOptions ): Promise { const options = { autoRetry: true, ..._options - } as SendOptions; + } as ProtocolUseOptions; const successes: PeerId[] = []; const failures: Failure[] = []; diff --git a/packages/tests/tests/filter/peer_management.spec.ts b/packages/tests/tests/filter/peer_management.spec.ts new file mode 100644 index 0000000000..1d4354aeba --- /dev/null +++ b/packages/tests/tests/filter/peer_management.spec.ts @@ -0,0 +1,77 @@ +import { DefaultPubsubTopic, LightNode } from "@waku/interfaces"; +import { + createDecoder, + createEncoder, + DecodedMessage, + utf8ToBytes +} from "@waku/sdk"; +import { expect } from "chai"; +import { describe } from "mocha"; + +import { + afterEachCustom, + beforeEachCustom, + ServiceNodesFleet +} from "../../src/index.js"; +import { + runMultipleNodes, + teardownNodesWithRedundancy +} from "../filter/utils.js"; + +//TODO: add unit tests, + +describe("Waku Filter: Peer Management: E2E", function () { + this.timeout(15000); + let waku: LightNode; + let serviceNodes: ServiceNodesFleet; + + beforeEachCustom(this, async () => { + [serviceNodes, waku] = await runMultipleNodes( + this.ctx, + undefined, + undefined, + 5 + ); + }); + + afterEachCustom(this, async () => { + await teardownNodesWithRedundancy(serviceNodes, waku); + }); + + const pubsubTopic = DefaultPubsubTopic; + const contentTopic = "/test"; + + const encoder = createEncoder({ + pubsubTopic, + contentTopic + }); + + const decoder = createDecoder(contentTopic, pubsubTopic); + + it("Number of peers are maintained correctly", async function () { + const { error, subscription } = + await waku.filter.createSubscription(pubsubTopic); + if (!subscription || error) { + expect.fail("Could not create subscription"); + } + + const messages: DecodedMessage[] = []; + const { failures, successes } = await subscription.subscribe( + [decoder], + (msg) => { + messages.push(msg); + } + ); + + await waku.lightPush.send(encoder, { + payload: utf8ToBytes("Hello_World") + }); + + expect(successes.length).to.be.greaterThan(0); + expect(successes.length).to.be.equal(waku.filter.numPeersToUse); + + if (failures) { + expect(failures.length).to.equal(0); + } + }); +});