From d483644a4bb4350df380719b9bcfbdd0b1439482 Mon Sep 17 00:00:00 2001 From: Danish Arora <35004822+danisharora099@users.noreply.github.com> Date: Wed, 26 Jul 2023 11:30:48 +0530 Subject: [PATCH] chore!: remove filter v1 (#1433) * rm: v1 * fix v2 imports * remove tests for filter v1 * set filter v1 as default and rm v2 completely * change import name for filter v2 * rename FilterV2 to Filter completely * fix run check --- .cspell.json | 2 - .size-limit.cjs | 2 +- packages/core/src/index.ts | 7 +- .../src/lib/filter/{v2 => }/filter_rpc.ts | 0 .../core/src/lib/filter/{v2 => }/index.ts | 22 +- packages/core/src/lib/filter/v1/filter_rpc.ts | 53 ---- packages/core/src/lib/filter/v1/index.ts | 248 ------------------ packages/core/src/lib/waku.ts | 5 +- packages/interfaces/src/filter.ts | 8 +- packages/interfaces/src/protocols.ts | 6 - packages/interfaces/src/waku.ts | 8 +- packages/sdk/src/create.ts | 33 +-- packages/tests/tests/ephemeral.node.spec.ts | 6 +- ...er_v2.node.spec.ts => filter.node.spec.ts} | 10 +- packages/tests/tests/filter_v1.node.spec.ts | 128 --------- packages/tests/tests/utils.spec.ts | 120 +-------- 16 files changed, 38 insertions(+), 620 deletions(-) rename packages/core/src/lib/filter/{v2 => }/filter_rpc.ts (100%) rename packages/core/src/lib/filter/{v2 => }/index.ts (95%) delete mode 100644 packages/core/src/lib/filter/v1/filter_rpc.ts delete mode 100644 packages/core/src/lib/filter/v1/index.ts rename packages/tests/tests/{filter_v2.node.spec.ts => filter.node.spec.ts} (96%) delete mode 100644 packages/tests/tests/filter_v1.node.spec.ts diff --git a/.cspell.json b/.cspell.json index a3b2e437e1..5f5de115bf 100644 --- a/.cspell.json +++ b/.cspell.json @@ -39,8 +39,6 @@ "exponentiate", "extip", "fanout", - "Filterv1", - "Filterv2", "floodsub", "fontsource", "globby", diff --git a/.size-limit.cjs b/.size-limit.cjs index 9b2554e32f..9be27499fc 100644 --- a/.size-limit.cjs +++ b/.size-limit.cjs @@ -35,7 +35,7 @@ module.exports = [ { name: "Light protocols", path: "packages/core/bundle/index.js", - import: "{ wakuLightPush, wakuFilterV1, wakuFilterV2 }", + import: "{ wakuLightPush, wakuFilter }", }, { name: "History retrieval protocols", diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index a832cc0506..61af8154bc 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -11,11 +11,8 @@ export * as message from "./lib/message/index.js"; export * as waku from "./lib/waku.js"; export { WakuNode, WakuOptions } from "./lib/waku.js"; -export * as waku_filter_v1 from "./lib/filter/v1/index.js"; -export { wakuFilter as wakuFilterV1 } from "./lib/filter/v1/index.js"; - -export * as waku_filter_v2 from "./lib/filter/v2/index.js"; -export { wakuFilterV2 } from "./lib/filter/v2/index.js"; +export * as waku_filter from "./lib/filter/index.js"; +export { wakuFilter } from "./lib/filter/index.js"; export * as waku_light_push from "./lib/light_push/index.js"; export { wakuLightPush, LightPushCodec } from "./lib/light_push/index.js"; diff --git a/packages/core/src/lib/filter/v2/filter_rpc.ts b/packages/core/src/lib/filter/filter_rpc.ts similarity index 100% rename from packages/core/src/lib/filter/v2/filter_rpc.ts rename to packages/core/src/lib/filter/filter_rpc.ts diff --git a/packages/core/src/lib/filter/v2/index.ts b/packages/core/src/lib/filter/index.ts similarity index 95% rename from packages/core/src/lib/filter/v2/index.ts rename to packages/core/src/lib/filter/index.ts index 313f82f5a8..e17654a55a 100644 --- a/packages/core/src/lib/filter/v2/index.ts +++ b/packages/core/src/lib/filter/index.ts @@ -8,7 +8,7 @@ import type { IAsyncIterator, IDecodedMessage, IDecoder, - IFilterV2, + IFilter, IProtoMessage, IReceiver, Libp2p, @@ -25,8 +25,8 @@ import all from "it-all"; import * as lp from "it-length-prefixed"; import { pipe } from "it-pipe"; -import { BaseProtocol } from "../../base_protocol.js"; -import { DefaultPubSubTopic } from "../../constants.js"; +import { BaseProtocol } from "../base_protocol.js"; +import { DefaultPubSubTopic } from "../constants.js"; import { FilterPushRpc, @@ -41,7 +41,7 @@ type SubscriptionCallback = { callback: Callback; }; -const FilterV2Codecs = { +const FilterCodecs = { SUBSCRIBE: "/vac/waku/filter-subscribe/2.0.0-beta1", PUSH: "/vac/waku/filter-push/2.0.0-beta1", }; @@ -225,7 +225,7 @@ class Subscription { } } -class FilterV2 extends BaseProtocol implements IReceiver { +class Filter extends BaseProtocol implements IReceiver { private readonly options: ProtocolCreateOptions; private activeSubscriptions = new Map(); @@ -246,10 +246,10 @@ class FilterV2 extends BaseProtocol implements IReceiver { } constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) { - super(FilterV2Codecs.SUBSCRIBE, libp2p.components); + super(FilterCodecs.SUBSCRIBE, libp2p.components); - libp2p.handle(FilterV2Codecs.PUSH, this.onRequest.bind(this)).catch((e) => { - log("Failed to register ", FilterV2Codecs.PUSH, e); + libp2p.handle(FilterCodecs.PUSH, this.onRequest.bind(this)).catch((e) => { + log("Failed to register ", FilterCodecs.PUSH, e); }); this.activeSubscriptions = new Map(); @@ -365,10 +365,10 @@ class FilterV2 extends BaseProtocol implements IReceiver { } } -export function wakuFilterV2( +export function wakuFilter( init: Partial = {} -): (libp2p: Libp2p) => IFilterV2 { - return (libp2p: Libp2p) => new FilterV2(libp2p, init); +): (libp2p: Libp2p) => IFilter { + return (libp2p: Libp2p) => new Filter(libp2p, init); } async function pushMessage( diff --git a/packages/core/src/lib/filter/v1/filter_rpc.ts b/packages/core/src/lib/filter/v1/filter_rpc.ts deleted file mode 100644 index 625d66072c..0000000000 --- a/packages/core/src/lib/filter/v1/filter_rpc.ts +++ /dev/null @@ -1,53 +0,0 @@ -import { ContentFilter } from "@waku/interfaces"; -import { proto_filter as proto } from "@waku/proto"; -import { v4 as uuid } from "uuid"; - -/** - * FilterRPC represents a message conforming to the Waku Filter protocol - */ -export class FilterRpc { - public constructor(public proto: proto.FilterRpc) {} - - static createRequest( - topic: string, - contentFilters: ContentFilter[], - requestId?: string, - subscribe = true - ): FilterRpc { - return new FilterRpc({ - requestId: requestId || uuid(), - request: { - subscribe, - topic, - contentFilters, - }, - push: undefined, - }); - } - - /** - * - * @param bytes Uint8Array of bytes from a FilterRPC message - * @returns FilterRpc - */ - static decode(bytes: Uint8Array): FilterRpc { - const res = proto.FilterRpc.decode(bytes); - return new FilterRpc(res); - } - - /** - * Encode the current FilterRPC request to bytes - * @returns Uint8Array - */ - encode(): Uint8Array { - return proto.FilterRpc.encode(this.proto); - } - - get push(): proto.MessagePush | undefined { - return this.proto.push; - } - - get requestId(): string { - return this.proto.requestId; - } -} diff --git a/packages/core/src/lib/filter/v1/index.ts b/packages/core/src/lib/filter/v1/index.ts deleted file mode 100644 index 854b553076..0000000000 --- a/packages/core/src/lib/filter/v1/index.ts +++ /dev/null @@ -1,248 +0,0 @@ -import type { Peer } from "@libp2p/interface-peer-store"; -import type { IncomingStreamData } from "@libp2p/interface-registrar"; -import type { - ActiveSubscriptions, - Callback, - ContentFilter, - IAsyncIterator, - IDecodedMessage, - IDecoder, - IFilter, - Libp2p, - ProtocolCreateOptions, - ProtocolOptions, -} from "@waku/interfaces"; -import { WakuMessage as WakuMessageProto } from "@waku/proto"; -import { groupByContentTopic } from "@waku/utils"; -import { toAsyncIterator } from "@waku/utils"; -import debug from "debug"; -import all from "it-all"; -import * as lp from "it-length-prefixed"; -import { pipe } from "it-pipe"; - -import { BaseProtocol } from "../../base_protocol.js"; -import { DefaultPubSubTopic } from "../../constants.js"; -import { toProtoMessage } from "../../to_proto_message.js"; - -import { FilterRpc } from "./filter_rpc.js"; - -export const FilterCodec = "/vac/waku/filter/2.0.0-beta1"; - -const log = debug("waku:filter"); - -export type UnsubscribeFunction = () => Promise; -export type RequestID = string; - -type Subscription = { - decoders: IDecoder[]; - callback: Callback; - pubSubTopic: string; -}; - -/** - * Implements client side of the [Waku v2 Filter protocol](https://rfc.vac.dev/spec/12/). - * - * Note this currently only works in NodeJS when the Waku node is listening on a port, see: - * - https://github.com/status-im/go-waku/issues/245 - * - https://github.com/status-im/nwaku/issues/948 - */ -class Filter extends BaseProtocol implements IFilter { - options: ProtocolCreateOptions; - private subscriptions: Map; - - constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) { - super(FilterCodec, libp2p.components); - this.options = options ?? {}; - this.subscriptions = new Map(); - libp2p - .handle(this.multicodec, this.onRequest.bind(this)) - .catch((e) => log("Failed to register filter protocol", e)); - } - - /** - * @param decoders Decoder or array of Decoders to use to decode messages, it also specifies the content topics. - * @param callback A function that will be called on each message returned by the filter. - * @param opts The FilterSubscriptionOpts used to narrow which messages are returned, and which peer to connect to. - * @returns Unsubscribe function that can be used to end the subscription. - */ - async subscribe( - decoders: IDecoder | IDecoder[], - callback: Callback, - opts?: ProtocolOptions - ): Promise { - const decodersArray = Array.isArray(decoders) ? decoders : [decoders]; - const { pubSubTopic = DefaultPubSubTopic } = this.options; - - const contentTopics = Array.from(groupByContentTopic(decodersArray).keys()); - - const contentFilters = contentTopics.map((contentTopic) => ({ - contentTopic, - })); - const request = FilterRpc.createRequest( - pubSubTopic, - contentFilters, - undefined, - true - ); - - const requestId = request.requestId; - - const peer = await this.getPeer(opts?.peerId); - const stream = await this.newStream(peer); - - try { - const res = await pipe( - [request.encode()], - lp.encode, - stream, - lp.decode, - async (source) => await all(source) - ); - - log("response", res); - } catch (e) { - log( - "Error subscribing to peer ", - peer.id.toString(), - "for content topics", - contentTopics, - ": ", - e - ); - throw e; - } - - const subscription: Subscription = { - callback, - decoders: decodersArray, - pubSubTopic, - }; - this.subscriptions.set(requestId, subscription); - - return async () => { - await this.unsubscribe(pubSubTopic, contentFilters, requestId, peer); - this.subscriptions.delete(requestId); - }; - } - - public toSubscriptionIterator( - decoders: IDecoder | IDecoder[], - opts?: ProtocolOptions | undefined - ): Promise> { - return toAsyncIterator(this, decoders, opts); - } - - public getActiveSubscriptions(): ActiveSubscriptions { - const map: ActiveSubscriptions = new Map(); - const subscriptions = this.subscriptions as Map< - RequestID, - Subscription - >; - - for (const item of subscriptions.values()) { - const values = map.get(item.pubSubTopic) || []; - const nextValues = item.decoders.map((decoder) => decoder.contentTopic); - map.set(item.pubSubTopic, [...values, ...nextValues]); - } - - return map; - } - - private onRequest(streamData: IncomingStreamData): void { - log("Receiving message push"); - try { - pipe(streamData.stream, lp.decode, async (source) => { - for await (const bytes of source) { - const res = FilterRpc.decode(bytes.slice()); - if (res.requestId && res.push?.messages?.length) { - await this.pushMessages(res.requestId, res.push.messages); - } - } - }).then( - () => { - log("Receiving pipe closed."); - }, - (e) => { - log("Error with receiving pipe", e); - } - ); - } catch (e) { - log("Error decoding message", e); - } - } - - private async pushMessages( - requestId: string, - messages: WakuMessageProto[] - ): Promise { - const subscription = this.subscriptions.get(requestId) as - | Subscription - | undefined; - if (!subscription) { - log(`No subscription locally registered for request ID ${requestId}`); - return; - } - const { decoders, callback, pubSubTopic } = subscription; - - if (!decoders || !decoders.length) { - log(`No decoder registered for request ID ${requestId}`); - return; - } - - for (const protoMessage of messages) { - const contentTopic = protoMessage.contentTopic; - if (!contentTopic) { - log("Message has no content topic, skipping"); - return; - } - - let didDecodeMsg = false; - // We don't want to wait for decoding failure, just attempt to decode - // all messages and do the call back on the one that works - // noinspection ES6MissingAwait - for (const dec of decoders) { - if (didDecodeMsg) return; - const decoded = await dec.fromProtoObj( - pubSubTopic, - toProtoMessage(protoMessage) - ); - if (!decoded) { - log("Not able to decode message"); - continue; - } - // This is just to prevent more decoding attempt - // TODO: Could be better if we were to abort promises - didDecodeMsg = Boolean(decoded); - await callback(decoded); - } - } - } - - private async unsubscribe( - topic: string, - contentFilters: ContentFilter[], - requestId: string, - peer: Peer - ): Promise { - const unsubscribeRequest = FilterRpc.createRequest( - topic, - contentFilters, - requestId, - false - ); - - const stream = await this.newStream(peer); - try { - await pipe([unsubscribeRequest.encode()], lp.encode, stream.sink); - } catch (e) { - log("Error unsubscribing", e); - throw e; - } - } -} - -export function wakuFilter( - init: Partial = {} -): (libp2p: Libp2p) => IFilter { - return (libp2p: Libp2p) => new Filter(libp2p, init); -} diff --git a/packages/core/src/lib/waku.ts b/packages/core/src/lib/waku.ts index 8b144b7537..9e13db13e9 100644 --- a/packages/core/src/lib/waku.ts +++ b/packages/core/src/lib/waku.ts @@ -3,7 +3,6 @@ import { isPeerId, PeerId } from "@libp2p/interface-peer-id"; import { multiaddr, Multiaddr, MultiaddrInput } from "@multiformats/multiaddr"; import type { IFilter, - IFilterV2, ILightPush, IRelay, IStore, @@ -47,7 +46,7 @@ export class WakuNode implements Waku { public libp2p: Libp2p; public relay?: IRelay; public store?: IStore; - public filter?: IFilter | IFilterV2; + public filter?: IFilter; public lightPush?: ILightPush; public connectionManager: ConnectionManager; @@ -56,7 +55,7 @@ export class WakuNode implements Waku { libp2p: Libp2p, store?: (libp2p: Libp2p) => IStore, lightPush?: (libp2p: Libp2p) => ILightPush, - filter?: (libp2p: Libp2p) => IFilter | IFilterV2, + filter?: (libp2p: Libp2p) => IFilter, relay?: (libp2p: Libp2p) => IRelay ) { this.libp2p = libp2p; diff --git a/packages/interfaces/src/filter.ts b/packages/interfaces/src/filter.ts index cd753d979f..9896f9d573 100644 --- a/packages/interfaces/src/filter.ts +++ b/packages/interfaces/src/filter.ts @@ -9,9 +9,7 @@ export type ContentFilter = { contentTopic: string; }; -export type IFilter = IReceiver & IBaseProtocol; - -export interface IFilterV2Subscription { +export interface IFilterSubscription { subscribe( decoders: IDecoder | IDecoder[], callback: Callback @@ -24,10 +22,10 @@ export interface IFilterV2Subscription { unsubscribeAll(): Promise; } -export type IFilterV2 = IReceiver & +export type IFilter = IReceiver & IBaseProtocol & { createSubscription( pubSubTopic?: string, peerId?: PeerId - ): Promise; + ): Promise; }; diff --git a/packages/interfaces/src/protocols.ts b/packages/interfaces/src/protocols.ts index aa9708c347..8194b3f22b 100644 --- a/packages/interfaces/src/protocols.ts +++ b/packages/interfaces/src/protocols.ts @@ -52,12 +52,6 @@ export type ProtocolCreateOptions = { * Use recommended bootstrap method to discovery and connect to new nodes. */ defaultBootstrap?: boolean; - /** - * FilterV2 has been set to default - * Use this flag to enable the previous version of the filter protocol - * See [Improved Filter protocol specifications](https://github.com/vacp2p/rfc/pull/562) - */ - useFilterV1?: boolean; }; export type ProtocolOptions = { diff --git a/packages/interfaces/src/waku.ts b/packages/interfaces/src/waku.ts index b39af599d1..058e3d2193 100644 --- a/packages/interfaces/src/waku.ts +++ b/packages/interfaces/src/waku.ts @@ -2,7 +2,7 @@ import type { Stream } from "@libp2p/interface-connection"; import type { PeerId } from "@libp2p/interface-peer-id"; import type { Multiaddr } from "@multiformats/multiaddr"; -import type { IFilter, IFilterV2 } from "./filter.js"; +import type { IFilter } from "./filter.js"; import type { Libp2p } from "./libp2p.js"; import type { ILightPush } from "./light_push.js"; import { Protocols } from "./protocols.js"; @@ -13,7 +13,7 @@ export interface Waku { libp2p: Libp2p; relay?: IRelay; store?: IStore; - filter?: IFilter | IFilterV2; + filter?: IFilter; lightPush?: ILightPush; dial(peer: PeerId | Multiaddr, protocols?: Protocols[]): Promise; @@ -28,7 +28,7 @@ export interface Waku { export interface LightNode extends Waku { relay: undefined; store: IStore; - filter: IFilter | IFilterV2; + filter: IFilter; lightPush: ILightPush; } @@ -42,6 +42,6 @@ export interface RelayNode extends Waku { export interface FullNode extends Waku { relay: IRelay; store: IStore; - filter: IFilter | IFilterV2; + filter: IFilter; lightPush: ILightPush; } diff --git a/packages/sdk/src/create.ts b/packages/sdk/src/create.ts index 4563d83d49..8a10abf9a2 100644 --- a/packages/sdk/src/create.ts +++ b/packages/sdk/src/create.ts @@ -6,8 +6,7 @@ import { webSockets } from "@libp2p/websockets"; import { all as filterAll } from "@libp2p/websockets/filters"; import { DefaultUserAgent, - wakuFilterV1, - wakuFilterV2, + wakuFilter, wakuLightPush, WakuNode, WakuOptions, @@ -16,8 +15,6 @@ import { import { enrTree, wakuDnsDiscovery } from "@waku/dns-discovery"; import type { FullNode, - IFilter, - IFilterV2, Libp2p, Libp2pComponents, LightNode, @@ -40,12 +37,7 @@ export { Libp2pComponents }; /** * Create a Waku node that uses Waku Light Push, Filter and Store to send and * receive messages, enabling low resource consumption. - * If `useFilterV1` is set to true, the node will use Filter V1 protocol. - * If `useFilterV1` is set to false or undefined, the node will use Filter V2 protocol. (default behavior) - * - * **Note: This is NOT compatible with nwaku v0.11** - * - * @see https://github.com/status-im/nwaku/issues/1085 + * Uses Waku Filter V2 by default. */ export async function createLightNode( options?: ProtocolCreateOptions & WakuOptions @@ -65,14 +57,7 @@ export async function createLightNode( const store = wakuStore(options); const lightPush = wakuLightPush(options); - - let filter: (libp2p: Libp2p) => IFilter | IFilterV2; - - if (options?.useFilterV1) { - filter = wakuFilterV1(options) as (libp2p: Libp2p) => IFilter; - } else { - filter = wakuFilterV2() as (libp2p: Libp2p) => IFilterV2; - } + const filter = wakuFilter(options); return new WakuNode( options ?? {}, @@ -117,9 +102,6 @@ export async function createRelayNode( /** * Create a Waku node that uses all Waku protocols. - * Implements generics to allow for conditional type checking for Filter V1 and V2 protocols. - * If `useFilterV1` is set to true, the node will use Filter V1 protocol. - * If `useFilterV1` is set to false or undefined, the node will use Filter V2 protocol. (default behavior) * * This helper is not recommended except if: * - you are interfacing with nwaku v0.11 or below @@ -149,14 +131,7 @@ export async function createFullNode( const store = wakuStore(options); const lightPush = wakuLightPush(options); - - let filter: (libp2p: Libp2p) => IFilter | IFilterV2; - if (!options?.useFilterV1) { - filter = wakuFilterV2(); - } else { - filter = wakuFilterV1(options); - } - + const filter = wakuFilter(options); const relay = wakuRelay(options); return new WakuNode( diff --git a/packages/tests/tests/ephemeral.node.spec.ts b/packages/tests/tests/ephemeral.node.spec.ts index 7dce782a06..44724b85df 100644 --- a/packages/tests/tests/ephemeral.node.spec.ts +++ b/packages/tests/tests/ephemeral.node.spec.ts @@ -4,7 +4,7 @@ import { DecodedMessage, waitForRemotePeer, } from "@waku/core"; -import { IFilterV2, IFilterV2Subscription, Protocols } from "@waku/interfaces"; +import { IFilterSubscription, Protocols } from "@waku/interfaces"; import type { LightNode } from "@waku/interfaces"; import { createDecoder as eciesDecoder, @@ -42,7 +42,7 @@ describe("Waku Message Ephemeral field", () => { let waku: LightNode; let nwaku: NimGoNode; - let subscription: IFilterV2Subscription; + let subscription: IFilterSubscription; afterEach(async function () { !!nwaku && @@ -72,7 +72,7 @@ describe("Waku Message Ephemeral field", () => { Protocols.Store, ]); - subscription = await (waku.filter as IFilterV2).createSubscription(); + subscription = await waku.filter.createSubscription(); }); it("Ephemeral messages are not stored", async function () { diff --git a/packages/tests/tests/filter_v2.node.spec.ts b/packages/tests/tests/filter.node.spec.ts similarity index 96% rename from packages/tests/tests/filter_v2.node.spec.ts rename to packages/tests/tests/filter.node.spec.ts index 7fd887f301..3efa31d9af 100644 --- a/packages/tests/tests/filter_v2.node.spec.ts +++ b/packages/tests/tests/filter.node.spec.ts @@ -5,11 +5,7 @@ import { DefaultPubSubTopic, waitForRemotePeer, } from "@waku/core"; -import type { - IFilterV2, - IFilterV2Subscription, - LightNode, -} from "@waku/interfaces"; +import type { IFilterSubscription, LightNode } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces"; import { createLightNode } from "@waku/sdk"; import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes"; @@ -33,7 +29,7 @@ describe("Waku Filter: V2", () => { let waku: LightNode; let nwaku: NimGoNode; - let subscription: IFilterV2Subscription; + let subscription: IFilterSubscription; afterEach(async function () { !!nwaku && @@ -56,7 +52,7 @@ describe("Waku Filter: V2", () => { await waku.start(); await waku.dial(await nwaku.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]); - subscription = await (waku.filter as IFilterV2).createSubscription(); + subscription = await waku.filter.createSubscription(); }); it("creates a subscription", async function () { diff --git a/packages/tests/tests/filter_v1.node.spec.ts b/packages/tests/tests/filter_v1.node.spec.ts deleted file mode 100644 index 6105582bcb..0000000000 --- a/packages/tests/tests/filter_v1.node.spec.ts +++ /dev/null @@ -1,128 +0,0 @@ -import { - createDecoder, - createEncoder, - DecodedMessage, - DefaultPubSubTopic, - waitForRemotePeer, -} from "@waku/core"; -import type { IFilter, LightNode } from "@waku/interfaces"; -import { Protocols } from "@waku/interfaces"; -import { createLightNode } from "@waku/sdk"; -import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes"; -import { expect } from "chai"; -import debug from "debug"; - -import { delay, makeLogFileName, NOISE_KEY_1 } from "../src/index.js"; -import { NimGoNode } from "../src/node/node.js"; - -const log = debug("waku:test"); - -const TestContentTopic = "/test/1/waku-filter"; -const TestEncoder = createEncoder({ contentTopic: TestContentTopic }); -const TestDecoder = createDecoder(TestContentTopic); - -describe("Waku Filter: V1", () => { - let waku: LightNode; - let nwaku: NimGoNode; - - let filter: IFilter; - - afterEach(async function () { - !!nwaku && - nwaku.stop().catch((e) => console.log("Nwaku failed to stop", e)); - !!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e)); - }); - - beforeEach(async function () { - this.timeout(15000); - nwaku = new NimGoNode(makeLogFileName(this)); - await nwaku.start({ - filter: true, - lightpush: true, - relay: true, - legacyFilter: true, - }); - waku = await createLightNode({ - useFilterV1: true, - staticNoiseKey: NOISE_KEY_1, - libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }, - }); - await waku.start(); - await waku.dial(await nwaku.getMultiaddrWithId()); - await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]); - - filter = waku.filter as IFilter; - }); - - it("creates a subscription", async function () { - this.timeout(10000); - - let messageCount = 0; - const messageText = "Filtering works!"; - const message = { payload: utf8ToBytes(messageText) }; - - const callback = (msg: DecodedMessage): void => { - log("Got a message"); - messageCount++; - expect(msg.contentTopic).to.eq(TestContentTopic); - expect(msg.pubSubTopic).to.eq(DefaultPubSubTopic); - expect(bytesToUtf8(msg.payload)).to.eq(messageText); - }; - - await filter.subscribe([TestDecoder], callback); - // As the filter protocol does not cater for an ack of subscription - // we cannot know whether the subscription happened. Something we want to - // correct in future versions of the protocol. - await delay(200); - - await waku.lightPush.send(TestEncoder, message); - while (messageCount === 0) { - await delay(250); - } - expect(messageCount).to.eq(1); - }); - - it("handles multiple messages", async function () { - this.timeout(10000); - - let messageCount = 0; - const callback = (msg: DecodedMessage): void => { - messageCount++; - expect(msg.contentTopic).to.eq(TestContentTopic); - }; - await filter.subscribe(TestDecoder, callback); - - await delay(200); - await waku.lightPush.send(TestEncoder, { - payload: utf8ToBytes("Filtering works!"), - }); - await waku.lightPush.send(TestEncoder, { - payload: utf8ToBytes("Filtering still works!"), - }); - while (messageCount < 2) { - await delay(250); - } - expect(messageCount).to.eq(2); - }); - - it("unsubscribes", async function () { - let messageCount = 0; - const callback = (): void => { - messageCount++; - }; - const unsubscribe = await filter.subscribe([TestDecoder], callback); - - await delay(200); - await waku.lightPush.send(TestEncoder, { - payload: utf8ToBytes("This should be received"), - }); - await delay(100); - await unsubscribe(); - await delay(200); - await waku.lightPush.send(TestEncoder, { - payload: utf8ToBytes("This should not be received"), - }); - await delay(100); - expect(messageCount).to.eq(1); - }); -}); diff --git a/packages/tests/tests/utils.spec.ts b/packages/tests/tests/utils.spec.ts index 55c0a23b54..6a9426cb39 100644 --- a/packages/tests/tests/utils.spec.ts +++ b/packages/tests/tests/utils.spec.ts @@ -4,7 +4,7 @@ import { DefaultPubSubTopic, waitForRemotePeer, } from "@waku/core"; -import type { IFilter, IFilterV2, LightNode } from "@waku/interfaces"; +import type { LightNode } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces"; import { createLightNode } from "@waku/sdk"; import { toAsyncIterator } from "@waku/utils"; @@ -18,119 +18,10 @@ const TestContentTopic = "/test/1/waku-filter"; const TestEncoder = createEncoder({ contentTopic: TestContentTopic }); const TestDecoder = createDecoder(TestContentTopic); -describe("Util: toAsyncIterator: FilterV1", () => { +describe("Util: toAsyncIterator: Filter", () => { let waku: LightNode; let nwaku: NimGoNode; - let filter: IFilter; - - beforeEach(async function () { - this.timeout(15000); - nwaku = new NimGoNode(makeLogFileName(this)); - await nwaku.start({ - filter: true, - lightpush: true, - relay: true, - legacyFilter: true, - }); - waku = await createLightNode({ - useFilterV1: true, - staticNoiseKey: NOISE_KEY_1, - libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }, - }); - await waku.start(); - await waku.dial(await nwaku.getMultiaddrWithId()); - await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]); - filter = waku.filter as IFilter; - }); - - afterEach(async () => { - try { - await nwaku.stop(); - await waku.stop(); - } catch (err) { - console.log("Failed to stop", err); - } - }); - - it("creates an iterator", async function () { - this.timeout(10000); - const messageText = "hey, what's up?"; - const sent = { payload: utf8ToBytes(messageText) }; - - const { iterator } = await toAsyncIterator( - filter, - TestDecoder, - {}, - { timeoutMs: 1000 } - ); - - await waku.lightPush.send(TestEncoder, sent); - const { value } = await iterator.next(); - - expect(value.contentTopic).to.eq(TestContentTopic); - expect(value.pubSubTopic).to.eq(DefaultPubSubTopic); - expect(bytesToUtf8(value.payload)).to.eq(messageText); - }); - - it("handles multiple messages", async function () { - this.timeout(10000); - const { iterator } = await toAsyncIterator( - filter, - TestDecoder, - {}, - { timeoutMs: 1000 } - ); - - await waku.lightPush.send(TestEncoder, { - payload: utf8ToBytes("Filtering works!"), - }); - await waku.lightPush.send(TestEncoder, { - payload: utf8ToBytes("Filtering still works!"), - }); - - let result = await iterator.next(); - expect(bytesToUtf8(result.value.payload)).to.eq("Filtering works!"); - - result = await iterator.next(); - expect(bytesToUtf8(result.value.payload)).to.eq("Filtering still works!"); - }); - - it("unsubscribes", async function () { - this.timeout(10000); - const { iterator, stop } = await toAsyncIterator( - filter, - TestDecoder, - {}, - { timeoutMs: 1000 } - ); - - await waku.lightPush.send(TestEncoder, { - payload: utf8ToBytes("This should be received"), - }); - - await stop(); - - await waku.lightPush.send(TestEncoder, { - payload: utf8ToBytes("This should not be received"), - }); - - let result = await iterator.next(); - expect(result.done).to.eq(true); - expect(bytesToUtf8(result.value.payload)).to.eq("This should be received"); - - result = await iterator.next(); - expect(result.value).to.eq(undefined); - expect(result.done).to.eq(true); - }); -}); - -describe("Util: toAsyncIterator: FilterV2", () => { - let waku: LightNode; - let nwaku: NimGoNode; - - let filter: IFilterV2; - beforeEach(async function () { this.timeout(15000); nwaku = new NimGoNode(makeLogFileName(this)); @@ -142,7 +33,6 @@ describe("Util: toAsyncIterator: FilterV2", () => { await waku.start(); await waku.dial(await nwaku.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]); - filter = waku.filter as IFilterV2; }); afterEach(async () => { @@ -160,7 +50,7 @@ describe("Util: toAsyncIterator: FilterV2", () => { const sent = { payload: utf8ToBytes(messageText) }; const { iterator } = await toAsyncIterator( - filter, + waku.filter, TestDecoder, {}, { timeoutMs: 1000 } @@ -177,7 +67,7 @@ describe("Util: toAsyncIterator: FilterV2", () => { it("handles multiple messages", async function () { this.timeout(10000); const { iterator } = await toAsyncIterator( - filter, + waku.filter, TestDecoder, {}, { timeoutMs: 1000 } @@ -200,7 +90,7 @@ describe("Util: toAsyncIterator: FilterV2", () => { it("unsubscribes", async function () { this.timeout(10000); const { iterator, stop } = await toAsyncIterator( - filter, + waku.filter, TestDecoder, {}, { timeoutMs: 1000 }