diff --git a/packages/core/src/lib/filter/v1/index.ts b/packages/core/src/lib/filter/v1/index.ts index 814a6192a6..a08e49da15 100644 --- a/packages/core/src/lib/filter/v1/index.ts +++ b/packages/core/src/lib/filter/v1/index.ts @@ -6,7 +6,7 @@ import type { Callback, IDecodedMessage, IDecoder, - IFilter, + IFilterV1, ProtocolCreateOptions, ProtocolOptions, } from "@waku/interfaces"; @@ -45,7 +45,7 @@ type Subscription = { * - https://github.com/status-im/go-waku/issues/245 * - https://github.com/status-im/nwaku/issues/948 */ -class Filter extends BaseProtocol implements IFilter { +class Filter extends BaseProtocol implements IFilterV1 { options: ProtocolCreateOptions; private subscriptions: Map; @@ -235,6 +235,6 @@ class Filter extends BaseProtocol implements IFilter { export function wakuFilter( init: Partial = {} -): (libp2p: Libp2p) => IFilter { +): (libp2p: Libp2p) => IFilterV1 { return (libp2p: Libp2p) => new Filter(libp2p, init); } diff --git a/packages/core/src/lib/filter/v2/index.ts b/packages/core/src/lib/filter/v2/index.ts index c53b4fa925..b8f4a1758e 100644 --- a/packages/core/src/lib/filter/v2/index.ts +++ b/packages/core/src/lib/filter/v2/index.ts @@ -6,7 +6,7 @@ import type { Callback, IDecodedMessage, IDecoder, - IFilter, + IFilterV2, IProtoMessage, ProtocolCreateOptions, ProtocolOptions, @@ -50,7 +50,7 @@ const FilterV2Codecs = { * - https://github.com/status-im/go-waku/issues/245 * - https://github.com/status-im/nwaku/issues/948 */ -class FilterV2 extends BaseProtocol implements IFilter { +class FilterV2 extends BaseProtocol implements IFilterV2 { options: ProtocolCreateOptions; private subscriptions: Map; @@ -143,6 +143,74 @@ class FilterV2 extends BaseProtocol implements IFilter { }; } + public async unsubscribeAll(): Promise { + const { pubSubTopic = DefaultPubSubTopic } = this.options; + + const request = FilterSubscribeRpc.createUnsubscribeAllRequest(pubSubTopic); + + const peer = await this.getPeer(); + const stream = await this.newStream(peer); + + try { + const res = await pipe( + [request.encode()], + lp.encode(), + stream, + lp.decode(), + async (source) => await all(source) + ); + + const { statusCode, requestId } = FilterSubscribeResponse.decode( + res[0].slice() + ); + + if (statusCode < 200 || statusCode >= 300) { + throw new Error( + `Filter unsubscribe all request ${requestId} failed with status code ${statusCode}` + ); + } + + log("Unsubscribed from all content topics"); + } catch (error) { + log("Error unsubscribing from all content topics: ", error); + throw error; + } + } + + public async ping(): Promise { + const { pubSubTopic = DefaultPubSubTopic } = this.options; + + const request = FilterSubscribeRpc.createSubscriberPingRequest(pubSubTopic); + + const peer = await this.getPeer(); + const stream = await this.newStream(peer); + + try { + const res = await pipe( + [request.encode()], + lp.encode(), + stream, + lp.decode(), + async (source) => await all(source) + ); + + const { statusCode, requestId } = FilterSubscribeResponse.decode( + res[0].slice() + ); + + if (statusCode < 200 || statusCode >= 300) { + throw new Error( + `Filter ping request ${requestId} failed with status code ${statusCode}` + ); + } + + log("Ping successful"); + } catch (error) { + log("Error pinging: ", error); + throw error; + } + } + public getActiveSubscriptions(): ActiveSubscriptions { const map: ActiveSubscriptions = new Map(); const subscriptions = this.subscriptions as Map< @@ -261,6 +329,6 @@ class FilterV2 extends BaseProtocol implements IFilter { export function wakuFilterV2( init: Partial = {} -): (libp2p: Libp2p) => IFilter { +): (libp2p: Libp2p) => IFilterV2 { return (libp2p: Libp2p) => new FilterV2(libp2p, init); } diff --git a/packages/core/src/lib/waku.ts b/packages/core/src/lib/waku.ts index 8728b3a958..e920c2a707 100644 --- a/packages/core/src/lib/waku.ts +++ b/packages/core/src/lib/waku.ts @@ -3,7 +3,8 @@ import type { Libp2p } from "@libp2p/interface-libp2p"; import type { PeerId } from "@libp2p/interface-peer-id"; import type { Multiaddr } from "@multiformats/multiaddr"; import type { - IFilter, + IFilterV1, + IFilterV2, ILightPush, IRelay, IStore, @@ -46,7 +47,7 @@ export class WakuNode implements Waku { public libp2p: Libp2p; public relay?: IRelay; public store?: IStore; - public filter?: IFilter; + public filter?: IFilterV1 | IFilterV2; public lightPush?: ILightPush; public connectionManager: ConnectionManager; @@ -55,7 +56,7 @@ export class WakuNode implements Waku { libp2p: Libp2p, store?: (libp2p: Libp2p) => IStore, lightPush?: (libp2p: Libp2p) => ILightPush, - filter?: (libp2p: Libp2p) => IFilter, + filter?: (libp2p: Libp2p) => IFilterV1 | IFilterV2, relay?: (libp2p: Libp2p) => IRelay ) { this.libp2p = libp2p; diff --git a/packages/create/src/index.ts b/packages/create/src/index.ts index 2ea9689f86..605f6b4874 100644 --- a/packages/create/src/index.ts +++ b/packages/create/src/index.ts @@ -20,7 +20,8 @@ import { import { enrTree, wakuDnsDiscovery } from "@waku/dns-discovery"; import type { FullNode, - IFilter, + IFilterV1, + IFilterV2, LightNode, ProtocolCreateOptions, RelayNode, @@ -63,7 +64,7 @@ export async function createLightNode( const store = wakuStore(options); const lightPush = wakuLightPush(options); - let filter: (libp2p: Libp2p) => IFilter; + let filter: (libp2p: Libp2p) => IFilterV1 | IFilterV2; if (!options?.useFilterV2) { filter = wakuFilter(options); } else { @@ -143,7 +144,7 @@ export async function createFullNode( const store = wakuStore(options); const lightPush = wakuLightPush(options); - let filter: (libp2p: Libp2p) => IFilter; + let filter: (libp2p: Libp2p) => IFilterV1 | IFilterV2; if (!options?.useFilterV2) { filter = wakuFilter(options); } else { diff --git a/packages/interfaces/src/filter.ts b/packages/interfaces/src/filter.ts index c3bd251068..881b681021 100644 --- a/packages/interfaces/src/filter.ts +++ b/packages/interfaces/src/filter.ts @@ -1,4 +1,5 @@ import type { PointToPointProtocol } from "./protocols.js"; -import type { IReceiver } from "./receiver.js"; +import type { IReceiverV1, IReceiverV2 } from "./receiver.js"; -export type IFilter = IReceiver & PointToPointProtocol; +export type IFilterV1 = IReceiverV1 & PointToPointProtocol; +export type IFilterV2 = IReceiverV2 & PointToPointProtocol; diff --git a/packages/interfaces/src/receiver.ts b/packages/interfaces/src/receiver.ts index 055854ca98..2cc07f2055 100644 --- a/packages/interfaces/src/receiver.ts +++ b/packages/interfaces/src/receiver.ts @@ -7,7 +7,7 @@ type ContentTopic = string; export type ActiveSubscriptions = Map; -export interface IReceiver { +export interface IReceiverV1 { subscribe: ( decoders: IDecoder | IDecoder[], callback: Callback, @@ -15,3 +15,8 @@ export interface IReceiver { ) => Unsubscribe | Promise; getActiveSubscriptions: () => ActiveSubscriptions; } + +export interface IReceiverV2 extends IReceiverV1 { + ping: () => Promise; + unsubscribeAll: () => Promise; +} diff --git a/packages/interfaces/src/relay.ts b/packages/interfaces/src/relay.ts index 5683b13b44..b423382918 100644 --- a/packages/interfaces/src/relay.ts +++ b/packages/interfaces/src/relay.ts @@ -1,7 +1,7 @@ import type { GossipSub } from "@chainsafe/libp2p-gossipsub"; import type { PeerIdStr, TopicStr } from "@chainsafe/libp2p-gossipsub/types"; -import { IReceiver } from "./receiver.js"; +import { IReceiverV1 } from "./receiver.js"; import type { ISender } from "./sender.js"; interface IRelayAPI { @@ -10,4 +10,4 @@ interface IRelayAPI { getMeshPeers: (topic?: TopicStr) => PeerIdStr[]; } -export type IRelay = IRelayAPI & ISender & IReceiver; +export type IRelay = IRelayAPI & ISender & IReceiverV1; diff --git a/packages/interfaces/src/waku.ts b/packages/interfaces/src/waku.ts index c03652dbd1..6d2d5504aa 100644 --- a/packages/interfaces/src/waku.ts +++ b/packages/interfaces/src/waku.ts @@ -3,7 +3,7 @@ import type { Libp2p } from "@libp2p/interface-libp2p"; import type { PeerId } from "@libp2p/interface-peer-id"; import type { Multiaddr } from "@multiformats/multiaddr"; -import type { IFilter } from "./filter.js"; +import type { IFilterV1, IFilterV2 } from "./filter.js"; import type { ILightPush } from "./light_push.js"; import { Protocols } from "./protocols.js"; import type { IRelay } from "./relay.js"; @@ -13,7 +13,7 @@ export interface Waku { libp2p: Libp2p; relay?: IRelay; store?: IStore; - filter?: IFilter; + filter?: IFilterV1 | IFilterV2; lightPush?: ILightPush; dial(peer: PeerId | Multiaddr, protocols?: Protocols[]): Promise; @@ -28,7 +28,7 @@ export interface Waku { export interface LightNode extends Waku { relay: undefined; store: IStore; - filter: IFilter; + filter: IFilterV1 | IFilterV2; lightPush: ILightPush; } @@ -42,6 +42,6 @@ export interface RelayNode extends Waku { export interface FullNode extends Waku { relay: IRelay; store: IStore; - filter: IFilter; + filter: IFilterV1 | IFilterV2; lightPush: ILightPush; }