chore!: remove filter v1 (#1433)

* rm: v1

* fix v2 imports

* remove tests for filter v1

* set filter v1 as default and rm v2 completely

* change import name for filter v2

* rename FilterV2 to Filter completely

* fix run check
This commit is contained in:
Danish Arora 2023-07-26 11:30:48 +05:30 committed by GitHub
parent 92bb35a4f8
commit d483644a4b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 38 additions and 620 deletions

View File

@ -39,8 +39,6 @@
"exponentiate",
"extip",
"fanout",
"Filterv1",
"Filterv2",
"floodsub",
"fontsource",
"globby",

View File

@ -35,7 +35,7 @@ module.exports = [
{
name: "Light protocols",
path: "packages/core/bundle/index.js",
import: "{ wakuLightPush, wakuFilterV1, wakuFilterV2 }",
import: "{ wakuLightPush, wakuFilter }",
},
{
name: "History retrieval protocols",

View File

@ -11,11 +11,8 @@ export * as message from "./lib/message/index.js";
export * as waku from "./lib/waku.js";
export { WakuNode, WakuOptions } from "./lib/waku.js";
export * as waku_filter_v1 from "./lib/filter/v1/index.js";
export { wakuFilter as wakuFilterV1 } from "./lib/filter/v1/index.js";
export * as waku_filter_v2 from "./lib/filter/v2/index.js";
export { wakuFilterV2 } from "./lib/filter/v2/index.js";
export * as waku_filter from "./lib/filter/index.js";
export { wakuFilter } from "./lib/filter/index.js";
export * as waku_light_push from "./lib/light_push/index.js";
export { wakuLightPush, LightPushCodec } from "./lib/light_push/index.js";

View File

@ -8,7 +8,7 @@ import type {
IAsyncIterator,
IDecodedMessage,
IDecoder,
IFilterV2,
IFilter,
IProtoMessage,
IReceiver,
Libp2p,
@ -25,8 +25,8 @@ import all from "it-all";
import * as lp from "it-length-prefixed";
import { pipe } from "it-pipe";
import { BaseProtocol } from "../../base_protocol.js";
import { DefaultPubSubTopic } from "../../constants.js";
import { BaseProtocol } from "../base_protocol.js";
import { DefaultPubSubTopic } from "../constants.js";
import {
FilterPushRpc,
@ -41,7 +41,7 @@ type SubscriptionCallback<T extends IDecodedMessage> = {
callback: Callback<T>;
};
const FilterV2Codecs = {
const FilterCodecs = {
SUBSCRIBE: "/vac/waku/filter-subscribe/2.0.0-beta1",
PUSH: "/vac/waku/filter-push/2.0.0-beta1",
};
@ -225,7 +225,7 @@ class Subscription {
}
}
class FilterV2 extends BaseProtocol implements IReceiver {
class Filter extends BaseProtocol implements IReceiver {
private readonly options: ProtocolCreateOptions;
private activeSubscriptions = new Map<string, Subscription>();
@ -246,10 +246,10 @@ class FilterV2 extends BaseProtocol implements IReceiver {
}
constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(FilterV2Codecs.SUBSCRIBE, libp2p.components);
super(FilterCodecs.SUBSCRIBE, libp2p.components);
libp2p.handle(FilterV2Codecs.PUSH, this.onRequest.bind(this)).catch((e) => {
log("Failed to register ", FilterV2Codecs.PUSH, e);
libp2p.handle(FilterCodecs.PUSH, this.onRequest.bind(this)).catch((e) => {
log("Failed to register ", FilterCodecs.PUSH, e);
});
this.activeSubscriptions = new Map();
@ -365,10 +365,10 @@ class FilterV2 extends BaseProtocol implements IReceiver {
}
}
export function wakuFilterV2(
export function wakuFilter(
init: Partial<ProtocolCreateOptions> = {}
): (libp2p: Libp2p) => IFilterV2 {
return (libp2p: Libp2p) => new FilterV2(libp2p, init);
): (libp2p: Libp2p) => IFilter {
return (libp2p: Libp2p) => new Filter(libp2p, init);
}
async function pushMessage<T extends IDecodedMessage>(

View File

@ -1,53 +0,0 @@
import { ContentFilter } from "@waku/interfaces";
import { proto_filter as proto } from "@waku/proto";
import { v4 as uuid } from "uuid";
/**
* FilterRPC represents a message conforming to the Waku Filter protocol
*/
export class FilterRpc {
public constructor(public proto: proto.FilterRpc) {}
static createRequest(
topic: string,
contentFilters: ContentFilter[],
requestId?: string,
subscribe = true
): FilterRpc {
return new FilterRpc({
requestId: requestId || uuid(),
request: {
subscribe,
topic,
contentFilters,
},
push: undefined,
});
}
/**
*
* @param bytes Uint8Array of bytes from a FilterRPC message
* @returns FilterRpc
*/
static decode(bytes: Uint8Array): FilterRpc {
const res = proto.FilterRpc.decode(bytes);
return new FilterRpc(res);
}
/**
* Encode the current FilterRPC request to bytes
* @returns Uint8Array
*/
encode(): Uint8Array {
return proto.FilterRpc.encode(this.proto);
}
get push(): proto.MessagePush | undefined {
return this.proto.push;
}
get requestId(): string {
return this.proto.requestId;
}
}

View File

@ -1,248 +0,0 @@
import type { Peer } from "@libp2p/interface-peer-store";
import type { IncomingStreamData } from "@libp2p/interface-registrar";
import type {
ActiveSubscriptions,
Callback,
ContentFilter,
IAsyncIterator,
IDecodedMessage,
IDecoder,
IFilter,
Libp2p,
ProtocolCreateOptions,
ProtocolOptions,
} from "@waku/interfaces";
import { WakuMessage as WakuMessageProto } from "@waku/proto";
import { groupByContentTopic } from "@waku/utils";
import { toAsyncIterator } from "@waku/utils";
import debug from "debug";
import all from "it-all";
import * as lp from "it-length-prefixed";
import { pipe } from "it-pipe";
import { BaseProtocol } from "../../base_protocol.js";
import { DefaultPubSubTopic } from "../../constants.js";
import { toProtoMessage } from "../../to_proto_message.js";
import { FilterRpc } from "./filter_rpc.js";
export const FilterCodec = "/vac/waku/filter/2.0.0-beta1";
const log = debug("waku:filter");
export type UnsubscribeFunction = () => Promise<void>;
export type RequestID = string;
type Subscription<T extends IDecodedMessage> = {
decoders: IDecoder<T>[];
callback: Callback<T>;
pubSubTopic: string;
};
/**
* Implements client side of the [Waku v2 Filter protocol](https://rfc.vac.dev/spec/12/).
*
* Note this currently only works in NodeJS when the Waku node is listening on a port, see:
* - https://github.com/status-im/go-waku/issues/245
* - https://github.com/status-im/nwaku/issues/948
*/
class Filter extends BaseProtocol implements IFilter {
options: ProtocolCreateOptions;
private subscriptions: Map<RequestID, unknown>;
constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(FilterCodec, libp2p.components);
this.options = options ?? {};
this.subscriptions = new Map();
libp2p
.handle(this.multicodec, this.onRequest.bind(this))
.catch((e) => log("Failed to register filter protocol", e));
}
/**
* @param decoders Decoder or array of Decoders to use to decode messages, it also specifies the content topics.
* @param callback A function that will be called on each message returned by the filter.
* @param opts The FilterSubscriptionOpts used to narrow which messages are returned, and which peer to connect to.
* @returns Unsubscribe function that can be used to end the subscription.
*/
async subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>,
opts?: ProtocolOptions
): Promise<UnsubscribeFunction> {
const decodersArray = Array.isArray(decoders) ? decoders : [decoders];
const { pubSubTopic = DefaultPubSubTopic } = this.options;
const contentTopics = Array.from(groupByContentTopic(decodersArray).keys());
const contentFilters = contentTopics.map((contentTopic) => ({
contentTopic,
}));
const request = FilterRpc.createRequest(
pubSubTopic,
contentFilters,
undefined,
true
);
const requestId = request.requestId;
const peer = await this.getPeer(opts?.peerId);
const stream = await this.newStream(peer);
try {
const res = await pipe(
[request.encode()],
lp.encode,
stream,
lp.decode,
async (source) => await all(source)
);
log("response", res);
} catch (e) {
log(
"Error subscribing to peer ",
peer.id.toString(),
"for content topics",
contentTopics,
": ",
e
);
throw e;
}
const subscription: Subscription<T> = {
callback,
decoders: decodersArray,
pubSubTopic,
};
this.subscriptions.set(requestId, subscription);
return async () => {
await this.unsubscribe(pubSubTopic, contentFilters, requestId, peer);
this.subscriptions.delete(requestId);
};
}
public toSubscriptionIterator<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
opts?: ProtocolOptions | undefined
): Promise<IAsyncIterator<T>> {
return toAsyncIterator(this, decoders, opts);
}
public getActiveSubscriptions(): ActiveSubscriptions {
const map: ActiveSubscriptions = new Map();
const subscriptions = this.subscriptions as Map<
RequestID,
Subscription<IDecodedMessage>
>;
for (const item of subscriptions.values()) {
const values = map.get(item.pubSubTopic) || [];
const nextValues = item.decoders.map((decoder) => decoder.contentTopic);
map.set(item.pubSubTopic, [...values, ...nextValues]);
}
return map;
}
private onRequest(streamData: IncomingStreamData): void {
log("Receiving message push");
try {
pipe(streamData.stream, lp.decode, async (source) => {
for await (const bytes of source) {
const res = FilterRpc.decode(bytes.slice());
if (res.requestId && res.push?.messages?.length) {
await this.pushMessages(res.requestId, res.push.messages);
}
}
}).then(
() => {
log("Receiving pipe closed.");
},
(e) => {
log("Error with receiving pipe", e);
}
);
} catch (e) {
log("Error decoding message", e);
}
}
private async pushMessages<T extends IDecodedMessage>(
requestId: string,
messages: WakuMessageProto[]
): Promise<void> {
const subscription = this.subscriptions.get(requestId) as
| Subscription<T>
| undefined;
if (!subscription) {
log(`No subscription locally registered for request ID ${requestId}`);
return;
}
const { decoders, callback, pubSubTopic } = subscription;
if (!decoders || !decoders.length) {
log(`No decoder registered for request ID ${requestId}`);
return;
}
for (const protoMessage of messages) {
const contentTopic = protoMessage.contentTopic;
if (!contentTopic) {
log("Message has no content topic, skipping");
return;
}
let didDecodeMsg = false;
// We don't want to wait for decoding failure, just attempt to decode
// all messages and do the call back on the one that works
// noinspection ES6MissingAwait
for (const dec of decoders) {
if (didDecodeMsg) return;
const decoded = await dec.fromProtoObj(
pubSubTopic,
toProtoMessage(protoMessage)
);
if (!decoded) {
log("Not able to decode message");
continue;
}
// This is just to prevent more decoding attempt
// TODO: Could be better if we were to abort promises
didDecodeMsg = Boolean(decoded);
await callback(decoded);
}
}
}
private async unsubscribe(
topic: string,
contentFilters: ContentFilter[],
requestId: string,
peer: Peer
): Promise<void> {
const unsubscribeRequest = FilterRpc.createRequest(
topic,
contentFilters,
requestId,
false
);
const stream = await this.newStream(peer);
try {
await pipe([unsubscribeRequest.encode()], lp.encode, stream.sink);
} catch (e) {
log("Error unsubscribing", e);
throw e;
}
}
}
export function wakuFilter(
init: Partial<ProtocolCreateOptions> = {}
): (libp2p: Libp2p) => IFilter {
return (libp2p: Libp2p) => new Filter(libp2p, init);
}

View File

@ -3,7 +3,6 @@ import { isPeerId, PeerId } from "@libp2p/interface-peer-id";
import { multiaddr, Multiaddr, MultiaddrInput } from "@multiformats/multiaddr";
import type {
IFilter,
IFilterV2,
ILightPush,
IRelay,
IStore,
@ -47,7 +46,7 @@ export class WakuNode implements Waku {
public libp2p: Libp2p;
public relay?: IRelay;
public store?: IStore;
public filter?: IFilter | IFilterV2;
public filter?: IFilter;
public lightPush?: ILightPush;
public connectionManager: ConnectionManager;
@ -56,7 +55,7 @@ export class WakuNode implements Waku {
libp2p: Libp2p,
store?: (libp2p: Libp2p) => IStore,
lightPush?: (libp2p: Libp2p) => ILightPush,
filter?: (libp2p: Libp2p) => IFilter | IFilterV2,
filter?: (libp2p: Libp2p) => IFilter,
relay?: (libp2p: Libp2p) => IRelay
) {
this.libp2p = libp2p;

View File

@ -9,9 +9,7 @@ export type ContentFilter = {
contentTopic: string;
};
export type IFilter = IReceiver & IBaseProtocol;
export interface IFilterV2Subscription {
export interface IFilterSubscription {
subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>
@ -24,10 +22,10 @@ export interface IFilterV2Subscription {
unsubscribeAll(): Promise<void>;
}
export type IFilterV2 = IReceiver &
export type IFilter = IReceiver &
IBaseProtocol & {
createSubscription(
pubSubTopic?: string,
peerId?: PeerId
): Promise<IFilterV2Subscription>;
): Promise<IFilterSubscription>;
};

View File

@ -52,12 +52,6 @@ export type ProtocolCreateOptions = {
* Use recommended bootstrap method to discovery and connect to new nodes.
*/
defaultBootstrap?: boolean;
/**
* FilterV2 has been set to default
* Use this flag to enable the previous version of the filter protocol
* See [Improved Filter protocol specifications](https://github.com/vacp2p/rfc/pull/562)
*/
useFilterV1?: boolean;
};
export type ProtocolOptions = {

View File

@ -2,7 +2,7 @@ import type { Stream } from "@libp2p/interface-connection";
import type { PeerId } from "@libp2p/interface-peer-id";
import type { Multiaddr } from "@multiformats/multiaddr";
import type { IFilter, IFilterV2 } from "./filter.js";
import type { IFilter } from "./filter.js";
import type { Libp2p } from "./libp2p.js";
import type { ILightPush } from "./light_push.js";
import { Protocols } from "./protocols.js";
@ -13,7 +13,7 @@ export interface Waku {
libp2p: Libp2p;
relay?: IRelay;
store?: IStore;
filter?: IFilter | IFilterV2;
filter?: IFilter;
lightPush?: ILightPush;
dial(peer: PeerId | Multiaddr, protocols?: Protocols[]): Promise<Stream>;
@ -28,7 +28,7 @@ export interface Waku {
export interface LightNode extends Waku {
relay: undefined;
store: IStore;
filter: IFilter | IFilterV2;
filter: IFilter;
lightPush: ILightPush;
}
@ -42,6 +42,6 @@ export interface RelayNode extends Waku {
export interface FullNode extends Waku {
relay: IRelay;
store: IStore;
filter: IFilter | IFilterV2;
filter: IFilter;
lightPush: ILightPush;
}

View File

@ -6,8 +6,7 @@ import { webSockets } from "@libp2p/websockets";
import { all as filterAll } from "@libp2p/websockets/filters";
import {
DefaultUserAgent,
wakuFilterV1,
wakuFilterV2,
wakuFilter,
wakuLightPush,
WakuNode,
WakuOptions,
@ -16,8 +15,6 @@ import {
import { enrTree, wakuDnsDiscovery } from "@waku/dns-discovery";
import type {
FullNode,
IFilter,
IFilterV2,
Libp2p,
Libp2pComponents,
LightNode,
@ -40,12 +37,7 @@ export { Libp2pComponents };
/**
* Create a Waku node that uses Waku Light Push, Filter and Store to send and
* receive messages, enabling low resource consumption.
* If `useFilterV1` is set to true, the node will use Filter V1 protocol.
* If `useFilterV1` is set to false or undefined, the node will use Filter V2 protocol. (default behavior)
*
* **Note: This is NOT compatible with nwaku v0.11**
*
* @see https://github.com/status-im/nwaku/issues/1085
* Uses Waku Filter V2 by default.
*/
export async function createLightNode(
options?: ProtocolCreateOptions & WakuOptions
@ -65,14 +57,7 @@ export async function createLightNode(
const store = wakuStore(options);
const lightPush = wakuLightPush(options);
let filter: (libp2p: Libp2p) => IFilter | IFilterV2;
if (options?.useFilterV1) {
filter = wakuFilterV1(options) as (libp2p: Libp2p) => IFilter;
} else {
filter = wakuFilterV2() as (libp2p: Libp2p) => IFilterV2;
}
const filter = wakuFilter(options);
return new WakuNode(
options ?? {},
@ -117,9 +102,6 @@ export async function createRelayNode(
/**
* Create a Waku node that uses all Waku protocols.
* Implements generics to allow for conditional type checking for Filter V1 and V2 protocols.
* If `useFilterV1` is set to true, the node will use Filter V1 protocol.
* If `useFilterV1` is set to false or undefined, the node will use Filter V2 protocol. (default behavior)
*
* This helper is not recommended except if:
* - you are interfacing with nwaku v0.11 or below
@ -149,14 +131,7 @@ export async function createFullNode(
const store = wakuStore(options);
const lightPush = wakuLightPush(options);
let filter: (libp2p: Libp2p) => IFilter | IFilterV2;
if (!options?.useFilterV1) {
filter = wakuFilterV2();
} else {
filter = wakuFilterV1(options);
}
const filter = wakuFilter(options);
const relay = wakuRelay(options);
return new WakuNode(

View File

@ -4,7 +4,7 @@ import {
DecodedMessage,
waitForRemotePeer,
} from "@waku/core";
import { IFilterV2, IFilterV2Subscription, Protocols } from "@waku/interfaces";
import { IFilterSubscription, Protocols } from "@waku/interfaces";
import type { LightNode } from "@waku/interfaces";
import {
createDecoder as eciesDecoder,
@ -42,7 +42,7 @@ describe("Waku Message Ephemeral field", () => {
let waku: LightNode;
let nwaku: NimGoNode;
let subscription: IFilterV2Subscription;
let subscription: IFilterSubscription;
afterEach(async function () {
!!nwaku &&
@ -72,7 +72,7 @@ describe("Waku Message Ephemeral field", () => {
Protocols.Store,
]);
subscription = await (waku.filter as IFilterV2).createSubscription();
subscription = await waku.filter.createSubscription();
});
it("Ephemeral messages are not stored", async function () {

View File

@ -5,11 +5,7 @@ import {
DefaultPubSubTopic,
waitForRemotePeer,
} from "@waku/core";
import type {
IFilterV2,
IFilterV2Subscription,
LightNode,
} from "@waku/interfaces";
import type { IFilterSubscription, LightNode } from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import { createLightNode } from "@waku/sdk";
import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes";
@ -33,7 +29,7 @@ describe("Waku Filter: V2", () => {
let waku: LightNode;
let nwaku: NimGoNode;
let subscription: IFilterV2Subscription;
let subscription: IFilterSubscription;
afterEach(async function () {
!!nwaku &&
@ -56,7 +52,7 @@ describe("Waku Filter: V2", () => {
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]);
subscription = await (waku.filter as IFilterV2).createSubscription();
subscription = await waku.filter.createSubscription();
});
it("creates a subscription", async function () {

View File

@ -1,128 +0,0 @@
import {
createDecoder,
createEncoder,
DecodedMessage,
DefaultPubSubTopic,
waitForRemotePeer,
} from "@waku/core";
import type { IFilter, LightNode } from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import { createLightNode } from "@waku/sdk";
import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai";
import debug from "debug";
import { delay, makeLogFileName, NOISE_KEY_1 } from "../src/index.js";
import { NimGoNode } from "../src/node/node.js";
const log = debug("waku:test");
const TestContentTopic = "/test/1/waku-filter";
const TestEncoder = createEncoder({ contentTopic: TestContentTopic });
const TestDecoder = createDecoder(TestContentTopic);
describe("Waku Filter: V1", () => {
let waku: LightNode;
let nwaku: NimGoNode;
let filter: IFilter;
afterEach(async function () {
!!nwaku &&
nwaku.stop().catch((e) => console.log("Nwaku failed to stop", e));
!!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e));
});
beforeEach(async function () {
this.timeout(15000);
nwaku = new NimGoNode(makeLogFileName(this));
await nwaku.start({
filter: true,
lightpush: true,
relay: true,
legacyFilter: true,
});
waku = await createLightNode({
useFilterV1: true,
staticNoiseKey: NOISE_KEY_1,
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } },
});
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]);
filter = waku.filter as IFilter;
});
it("creates a subscription", async function () {
this.timeout(10000);
let messageCount = 0;
const messageText = "Filtering works!";
const message = { payload: utf8ToBytes(messageText) };
const callback = (msg: DecodedMessage): void => {
log("Got a message");
messageCount++;
expect(msg.contentTopic).to.eq(TestContentTopic);
expect(msg.pubSubTopic).to.eq(DefaultPubSubTopic);
expect(bytesToUtf8(msg.payload)).to.eq(messageText);
};
await filter.subscribe([TestDecoder], callback);
// As the filter protocol does not cater for an ack of subscription
// we cannot know whether the subscription happened. Something we want to
// correct in future versions of the protocol.
await delay(200);
await waku.lightPush.send(TestEncoder, message);
while (messageCount === 0) {
await delay(250);
}
expect(messageCount).to.eq(1);
});
it("handles multiple messages", async function () {
this.timeout(10000);
let messageCount = 0;
const callback = (msg: DecodedMessage): void => {
messageCount++;
expect(msg.contentTopic).to.eq(TestContentTopic);
};
await filter.subscribe(TestDecoder, callback);
await delay(200);
await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes("Filtering works!"),
});
await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes("Filtering still works!"),
});
while (messageCount < 2) {
await delay(250);
}
expect(messageCount).to.eq(2);
});
it("unsubscribes", async function () {
let messageCount = 0;
const callback = (): void => {
messageCount++;
};
const unsubscribe = await filter.subscribe([TestDecoder], callback);
await delay(200);
await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes("This should be received"),
});
await delay(100);
await unsubscribe();
await delay(200);
await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes("This should not be received"),
});
await delay(100);
expect(messageCount).to.eq(1);
});
});

View File

@ -4,7 +4,7 @@ import {
DefaultPubSubTopic,
waitForRemotePeer,
} from "@waku/core";
import type { IFilter, IFilterV2, LightNode } from "@waku/interfaces";
import type { LightNode } from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import { createLightNode } from "@waku/sdk";
import { toAsyncIterator } from "@waku/utils";
@ -18,119 +18,10 @@ const TestContentTopic = "/test/1/waku-filter";
const TestEncoder = createEncoder({ contentTopic: TestContentTopic });
const TestDecoder = createDecoder(TestContentTopic);
describe("Util: toAsyncIterator: FilterV1", () => {
describe("Util: toAsyncIterator: Filter", () => {
let waku: LightNode;
let nwaku: NimGoNode;
let filter: IFilter;
beforeEach(async function () {
this.timeout(15000);
nwaku = new NimGoNode(makeLogFileName(this));
await nwaku.start({
filter: true,
lightpush: true,
relay: true,
legacyFilter: true,
});
waku = await createLightNode({
useFilterV1: true,
staticNoiseKey: NOISE_KEY_1,
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } },
});
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]);
filter = waku.filter as IFilter;
});
afterEach(async () => {
try {
await nwaku.stop();
await waku.stop();
} catch (err) {
console.log("Failed to stop", err);
}
});
it("creates an iterator", async function () {
this.timeout(10000);
const messageText = "hey, what's up?";
const sent = { payload: utf8ToBytes(messageText) };
const { iterator } = await toAsyncIterator(
filter,
TestDecoder,
{},
{ timeoutMs: 1000 }
);
await waku.lightPush.send(TestEncoder, sent);
const { value } = await iterator.next();
expect(value.contentTopic).to.eq(TestContentTopic);
expect(value.pubSubTopic).to.eq(DefaultPubSubTopic);
expect(bytesToUtf8(value.payload)).to.eq(messageText);
});
it("handles multiple messages", async function () {
this.timeout(10000);
const { iterator } = await toAsyncIterator(
filter,
TestDecoder,
{},
{ timeoutMs: 1000 }
);
await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes("Filtering works!"),
});
await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes("Filtering still works!"),
});
let result = await iterator.next();
expect(bytesToUtf8(result.value.payload)).to.eq("Filtering works!");
result = await iterator.next();
expect(bytesToUtf8(result.value.payload)).to.eq("Filtering still works!");
});
it("unsubscribes", async function () {
this.timeout(10000);
const { iterator, stop } = await toAsyncIterator(
filter,
TestDecoder,
{},
{ timeoutMs: 1000 }
);
await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes("This should be received"),
});
await stop();
await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes("This should not be received"),
});
let result = await iterator.next();
expect(result.done).to.eq(true);
expect(bytesToUtf8(result.value.payload)).to.eq("This should be received");
result = await iterator.next();
expect(result.value).to.eq(undefined);
expect(result.done).to.eq(true);
});
});
describe("Util: toAsyncIterator: FilterV2", () => {
let waku: LightNode;
let nwaku: NimGoNode;
let filter: IFilterV2;
beforeEach(async function () {
this.timeout(15000);
nwaku = new NimGoNode(makeLogFileName(this));
@ -142,7 +33,6 @@ describe("Util: toAsyncIterator: FilterV2", () => {
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]);
filter = waku.filter as IFilterV2;
});
afterEach(async () => {
@ -160,7 +50,7 @@ describe("Util: toAsyncIterator: FilterV2", () => {
const sent = { payload: utf8ToBytes(messageText) };
const { iterator } = await toAsyncIterator(
filter,
waku.filter,
TestDecoder,
{},
{ timeoutMs: 1000 }
@ -177,7 +67,7 @@ describe("Util: toAsyncIterator: FilterV2", () => {
it("handles multiple messages", async function () {
this.timeout(10000);
const { iterator } = await toAsyncIterator(
filter,
waku.filter,
TestDecoder,
{},
{ timeoutMs: 1000 }
@ -200,7 +90,7 @@ describe("Util: toAsyncIterator: FilterV2", () => {
it("unsubscribes", async function () {
this.timeout(10000);
const { iterator, stop } = await toAsyncIterator(
filter,
waku.filter,
TestDecoder,
{},
{ timeoutMs: 1000 }