From 691de1a194b1123332a704431e3f304bd68dfaa2 Mon Sep 17 00:00:00 2001 From: Nicholas Molnar Date: Tue, 19 Apr 2022 18:50:32 -0700 Subject: [PATCH 01/19] Add proto --- proto/waku/v2/filter.proto | 25 +++ src/proto/waku/v2/filter.ts | 358 ++++++++++++++++++++++++++++++++++++ 2 files changed, 383 insertions(+) create mode 100644 proto/waku/v2/filter.proto create mode 100644 src/proto/waku/v2/filter.ts diff --git a/proto/waku/v2/filter.proto b/proto/waku/v2/filter.proto new file mode 100644 index 0000000000..2ab26e2dee --- /dev/null +++ b/proto/waku/v2/filter.proto @@ -0,0 +1,25 @@ +syntax = "proto3"; + +package waku.v2; + +import "waku/v2/message.proto"; + +message FilterRequest { + bool subscribe = 1; + string topic = 2; + repeated ContentFilter content_filters = 3; + + message ContentFilter { + string content_topic = 1; + } +} + +message MessagePush { + repeated WakuMessage messages = 1; +} + +message FilterRPC { + string request_id = 1; + FilterRequest request = 2; + MessagePush push = 3; +} \ No newline at end of file diff --git a/src/proto/waku/v2/filter.ts b/src/proto/waku/v2/filter.ts new file mode 100644 index 0000000000..97ab03a3d9 --- /dev/null +++ b/src/proto/waku/v2/filter.ts @@ -0,0 +1,358 @@ +/* eslint-disable */ +import Long from "long"; +import _m0 from "protobufjs/minimal"; +import { WakuMessage } from "../../waku/v2/message"; + +export const protobufPackage = "waku.v2"; + +export interface FilterRequest { + subscribe: boolean; + topic: string; + contentFilters: FilterRequest_ContentFilter[]; +} + +export interface FilterRequest_ContentFilter { + contentTopic: string; +} + +export interface MessagePush { + messages: WakuMessage[]; +} + +export interface FilterRPC { + requestId: string; + request: FilterRequest | undefined; + push: MessagePush | undefined; +} + +function createBaseFilterRequest(): FilterRequest { + return { subscribe: false, topic: "", contentFilters: [] }; +} + +export const FilterRequest = { + encode( + message: FilterRequest, + writer: _m0.Writer = _m0.Writer.create() + ): _m0.Writer { + if (message.subscribe === true) { + writer.uint32(8).bool(message.subscribe); + } + if (message.topic !== "") { + writer.uint32(18).string(message.topic); + } + for (const v of message.contentFilters) { + FilterRequest_ContentFilter.encode(v!, writer.uint32(26).fork()).ldelim(); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): FilterRequest { + const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseFilterRequest(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + message.subscribe = reader.bool(); + break; + case 2: + message.topic = reader.string(); + break; + case 3: + message.contentFilters.push( + FilterRequest_ContentFilter.decode(reader, reader.uint32()) + ); + break; + default: + reader.skipType(tag & 7); + break; + } + } + return message; + }, + + fromJSON(object: any): FilterRequest { + return { + subscribe: isSet(object.subscribe) ? Boolean(object.subscribe) : false, + topic: isSet(object.topic) ? String(object.topic) : "", + contentFilters: Array.isArray(object?.contentFilters) + ? object.contentFilters.map((e: any) => + FilterRequest_ContentFilter.fromJSON(e) + ) + : [], + }; + }, + + toJSON(message: FilterRequest): unknown { + const obj: any = {}; + message.subscribe !== undefined && (obj.subscribe = message.subscribe); + message.topic !== undefined && (obj.topic = message.topic); + if (message.contentFilters) { + obj.contentFilters = message.contentFilters.map((e) => + e ? FilterRequest_ContentFilter.toJSON(e) : undefined + ); + } else { + obj.contentFilters = []; + } + return obj; + }, + + fromPartial, I>>( + object: I + ): FilterRequest { + const message = createBaseFilterRequest(); + message.subscribe = object.subscribe ?? false; + message.topic = object.topic ?? ""; + message.contentFilters = + object.contentFilters?.map((e) => + FilterRequest_ContentFilter.fromPartial(e) + ) || []; + return message; + }, +}; + +function createBaseFilterRequest_ContentFilter(): FilterRequest_ContentFilter { + return { contentTopic: "" }; +} + +export const FilterRequest_ContentFilter = { + encode( + message: FilterRequest_ContentFilter, + writer: _m0.Writer = _m0.Writer.create() + ): _m0.Writer { + if (message.contentTopic !== "") { + writer.uint32(10).string(message.contentTopic); + } + return writer; + }, + + decode( + input: _m0.Reader | Uint8Array, + length?: number + ): FilterRequest_ContentFilter { + const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseFilterRequest_ContentFilter(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + message.contentTopic = reader.string(); + break; + default: + reader.skipType(tag & 7); + break; + } + } + return message; + }, + + fromJSON(object: any): FilterRequest_ContentFilter { + return { + contentTopic: isSet(object.contentTopic) + ? String(object.contentTopic) + : "", + }; + }, + + toJSON(message: FilterRequest_ContentFilter): unknown { + const obj: any = {}; + message.contentTopic !== undefined && + (obj.contentTopic = message.contentTopic); + return obj; + }, + + fromPartial, I>>( + object: I + ): FilterRequest_ContentFilter { + const message = createBaseFilterRequest_ContentFilter(); + message.contentTopic = object.contentTopic ?? ""; + return message; + }, +}; + +function createBaseMessagePush(): MessagePush { + return { messages: [] }; +} + +export const MessagePush = { + encode( + message: MessagePush, + writer: _m0.Writer = _m0.Writer.create() + ): _m0.Writer { + for (const v of message.messages) { + WakuMessage.encode(v!, writer.uint32(10).fork()).ldelim(); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): MessagePush { + const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseMessagePush(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + message.messages.push(WakuMessage.decode(reader, reader.uint32())); + break; + default: + reader.skipType(tag & 7); + break; + } + } + return message; + }, + + fromJSON(object: any): MessagePush { + return { + messages: Array.isArray(object?.messages) + ? object.messages.map((e: any) => WakuMessage.fromJSON(e)) + : [], + }; + }, + + toJSON(message: MessagePush): unknown { + const obj: any = {}; + if (message.messages) { + obj.messages = message.messages.map((e) => + e ? WakuMessage.toJSON(e) : undefined + ); + } else { + obj.messages = []; + } + return obj; + }, + + fromPartial, I>>( + object: I + ): MessagePush { + const message = createBaseMessagePush(); + message.messages = + object.messages?.map((e) => WakuMessage.fromPartial(e)) || []; + return message; + }, +}; + +function createBaseFilterRPC(): FilterRPC { + return { requestId: "", request: undefined, push: undefined }; +} + +export const FilterRPC = { + encode( + message: FilterRPC, + writer: _m0.Writer = _m0.Writer.create() + ): _m0.Writer { + if (message.requestId !== "") { + writer.uint32(10).string(message.requestId); + } + if (message.request !== undefined) { + FilterRequest.encode(message.request, writer.uint32(18).fork()).ldelim(); + } + if (message.push !== undefined) { + MessagePush.encode(message.push, writer.uint32(26).fork()).ldelim(); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): FilterRPC { + const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseFilterRPC(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + message.requestId = reader.string(); + break; + case 2: + message.request = FilterRequest.decode(reader, reader.uint32()); + break; + case 3: + message.push = MessagePush.decode(reader, reader.uint32()); + break; + default: + reader.skipType(tag & 7); + break; + } + } + return message; + }, + + fromJSON(object: any): FilterRPC { + return { + requestId: isSet(object.requestId) ? String(object.requestId) : "", + request: isSet(object.request) + ? FilterRequest.fromJSON(object.request) + : undefined, + push: isSet(object.push) ? MessagePush.fromJSON(object.push) : undefined, + }; + }, + + toJSON(message: FilterRPC): unknown { + const obj: any = {}; + message.requestId !== undefined && (obj.requestId = message.requestId); + message.request !== undefined && + (obj.request = message.request + ? FilterRequest.toJSON(message.request) + : undefined); + message.push !== undefined && + (obj.push = message.push ? MessagePush.toJSON(message.push) : undefined); + return obj; + }, + + fromPartial, I>>( + object: I + ): FilterRPC { + const message = createBaseFilterRPC(); + message.requestId = object.requestId ?? ""; + message.request = + object.request !== undefined && object.request !== null + ? FilterRequest.fromPartial(object.request) + : undefined; + message.push = + object.push !== undefined && object.push !== null + ? MessagePush.fromPartial(object.push) + : undefined; + return message; + }, +}; + +type Builtin = + | Date + | Function + | Uint8Array + | string + | number + | boolean + | undefined; + +export type DeepPartial = T extends Builtin + ? T + : T extends Long + ? string | number | Long + : T extends Array + ? Array> + : T extends ReadonlyArray + ? ReadonlyArray> + : T extends {} + ? { [K in keyof T]?: DeepPartial } + : Partial; + +type KeysOfUnion = T extends T ? keyof T : never; +export type Exact = P extends Builtin + ? P + : P & { [K in keyof P]: Exact } & Record< + Exclude>, + never + >; + +if (_m0.util.Long !== Long) { + _m0.util.Long = Long as any; + _m0.configure(); +} + +function isSet(value: any): boolean { + return value !== null && value !== undefined; +} From 4734e4b7c77ed7450b6e980a8491292b952eefb5 Mon Sep 17 00:00:00 2001 From: Nicholas Molnar Date: Tue, 19 Apr 2022 21:51:44 -0700 Subject: [PATCH 02/19] Finish implementation --- src/lib/waku.ts | 27 +++- src/lib/waku_filter/filter_rpc.ts | 46 +++++++ src/lib/waku_filter/index.node.spec.ts | 113 ++++++++++++++++ src/lib/waku_filter/index.ts | 170 +++++++++++++++++++++++++ src/test_utils/nwaku.ts | 1 + 5 files changed, 355 insertions(+), 2 deletions(-) create mode 100644 src/lib/waku_filter/filter_rpc.ts create mode 100644 src/lib/waku_filter/index.node.spec.ts create mode 100644 src/lib/waku_filter/index.ts diff --git a/src/lib/waku.ts b/src/lib/waku.ts index bd82fe3e76..99fb6c9dc6 100644 --- a/src/lib/waku.ts +++ b/src/lib/waku.ts @@ -17,6 +17,8 @@ import { Multiaddr, multiaddr } from "multiaddr"; import PeerId from "peer-id"; import { Bootstrap, BootstrapOptions } from "./discovery"; +import { WakuFilter } from "./waku_filter"; +import { FilterCodec } from "./waku_filter"; import { LightPushCodec, WakuLightPush } from "./waku_light_push"; import { DecryptionMethod, WakuMessage } from "./waku_message"; import { RelayCodecs, WakuRelay } from "./waku_relay"; @@ -39,6 +41,7 @@ export enum Protocols { Relay = "relay", Store = "store", LightPush = "lightpush", + Filter = "filter", } export interface CreateOptions { @@ -102,6 +105,7 @@ export class Waku { public libp2p: Libp2p; public relay: WakuRelay; public store: WakuStore; + public filter: WakuFilter; public lightPush: WakuLightPush; private pingKeepAliveTimers: { @@ -115,11 +119,13 @@ export class Waku { options: CreateOptions, libp2p: Libp2p, store: WakuStore, - lightPush: WakuLightPush + lightPush: WakuLightPush, + filter: WakuFilter ) { this.libp2p = libp2p; this.relay = libp2p.pubsub as unknown as WakuRelay; this.store = store; + this.filter = filter; this.lightPush = lightPush; this.pingKeepAliveTimers = {}; this.relayKeepAliveTimers = {}; @@ -220,10 +226,17 @@ export class Waku { pubSubTopic: options?.pubSubTopic, }); const wakuLightPush = new WakuLightPush(libp2p); + const wakuFilter = new WakuFilter(libp2p); await libp2p.start(); - return new Waku(options ? options : {}, libp2p, wakuStore, wakuLightPush); + return new Waku( + options ? options : {}, + libp2p, + wakuStore, + wakuLightPush, + wakuFilter + ); } /** @@ -381,6 +394,16 @@ export class Waku { promises.push(lightPushPromise); } + if (protocols.includes(Protocols.Filter)) { + const filterPromise = (async (): Promise => { + for await (const peer of this.filter.peers) { + dbg("Filter peer found", peer.id.toB58String()); + break; + } + })(); + promises.push(filterPromise); + } + if (timeoutMs) { await rejectOnTimeout( Promise.all(promises), diff --git a/src/lib/waku_filter/filter_rpc.ts b/src/lib/waku_filter/filter_rpc.ts new file mode 100644 index 0000000000..38624bef82 --- /dev/null +++ b/src/lib/waku_filter/filter_rpc.ts @@ -0,0 +1,46 @@ +import { Reader } from "protobufjs/minimal"; +import { v4 as uuid } from "uuid"; + +import * as proto from "../../proto/waku/v2/filter"; + +export type ContentFilter = { + contentTopic: string; +}; + +export class FilterRPC { + public constructor(public proto: proto.FilterRPC) {} + + static createRequest( + topic: string, + contentFilters: ContentFilter[], + requestId?: string, + subscribe = true + ): FilterRPC { + return new FilterRPC({ + requestId: requestId || uuid(), + request: { + subscribe, + topic, + contentFilters, + }, + push: undefined, + }); + } + + static decode(bytes: Uint8Array): FilterRPC { + const res = proto.FilterRPC.decode(Reader.create(bytes)); + return new FilterRPC(res); + } + + encode(): Uint8Array { + return proto.FilterRPC.encode(this.proto).finish(); + } + + get push(): proto.MessagePush | undefined { + return this.proto.push; + } + + get requestId(): string { + return this.proto.requestId; + } +} diff --git a/src/lib/waku_filter/index.node.spec.ts b/src/lib/waku_filter/index.node.spec.ts new file mode 100644 index 0000000000..b7468eb0ca --- /dev/null +++ b/src/lib/waku_filter/index.node.spec.ts @@ -0,0 +1,113 @@ +import { expect } from "chai"; +import debug from "debug"; + +import { makeLogFileName, NOISE_KEY_1, Nwaku } from "../../test_utils"; +import { delay } from "../../test_utils/delay"; +import { Protocols, Waku } from "../waku"; +import { WakuMessage } from "../waku_message"; + +const log = debug("waku:test"); + +const TestContentTopic = "/test/1/waku-filter"; + +describe("Waku Filter", () => { + let waku: Waku; + let nwaku: Nwaku; + + afterEach(async function () { + !!nwaku && nwaku.stop(); + !!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e)); + }); + + beforeEach(async function () { + this.timeout(10000); + nwaku = new Nwaku(makeLogFileName(this)); + await nwaku.start({ filter: true }); + waku = await Waku.create({ + staticNoiseKey: NOISE_KEY_1, + libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }, + }); + await waku.dial(await nwaku.getMultiaddrWithId()); + await waku.waitForRemotePeer([Protocols.Filter, Protocols.Relay]); + }); + + it("creates a subscription", async function () { + this.timeout(10000); + + let messageCount = 0; + const messageText = "Filtering works!"; + const callback = (msg: WakuMessage): void => { + log("Got a message"); + messageCount++; + expect(msg.contentTopic).to.eq(TestContentTopic); + expect(msg.payloadAsUtf8).to.eq(messageText); + }; + await waku.filter.subscribe( + { contentTopics: [TestContentTopic] }, + callback + ); + const message = await WakuMessage.fromUtf8String( + messageText, + TestContentTopic + ); + waku.relay.send(message); + while (messageCount === 0) { + await delay(250); + } + expect(messageCount).to.eq(1); + }); + + it("handles multiple messages", async function () { + this.timeout(10000); + + let messageCount = 0; + const callback = (msg: WakuMessage): void => { + messageCount++; + expect(msg.contentTopic).to.eq(TestContentTopic); + }; + await waku.filter.subscribe( + { contentTopics: [TestContentTopic] }, + callback + ); + waku.relay.send( + await WakuMessage.fromUtf8String("Filtering works!", TestContentTopic) + ); + waku.relay.send( + await WakuMessage.fromUtf8String( + "Filtering still works!", + TestContentTopic + ) + ); + while (messageCount < 2) { + await delay(250); + } + expect(messageCount).to.eq(2); + }); + + it("unsubscribes", async function () { + let messageCount = 0; + const callback = (): void => { + messageCount++; + }; + const unsubscribe = await waku.filter.subscribe( + { contentTopics: [TestContentTopic] }, + callback + ); + waku.relay.send( + await WakuMessage.fromUtf8String( + "This should be received", + TestContentTopic + ) + ); + await delay(100); + await unsubscribe(); + waku.relay.send( + await WakuMessage.fromUtf8String( + "This should not be received", + TestContentTopic + ) + ); + await delay(100); + expect(messageCount).to.eq(1); + }); +}); diff --git a/src/lib/waku_filter/index.ts b/src/lib/waku_filter/index.ts new file mode 100644 index 0000000000..0f96105d32 --- /dev/null +++ b/src/lib/waku_filter/index.ts @@ -0,0 +1,170 @@ +import debug from "debug"; +// import concat from "it-concat"; +import lp from "it-length-prefixed"; +import { pipe } from "it-pipe"; +import Libp2p from "libp2p"; +import { Peer, PeerId } from "libp2p/src/peer-store"; + +import { WakuMessage as WakuMessageProto } from "../../proto/waku/v2/message"; +import { getPeersForProtocol, selectRandomPeer } from "../select_peer"; +import { DefaultPubSubTopic } from "../waku"; +import { DecryptionMethod, WakuMessage } from "../waku_message/index"; + +import { ContentFilter, FilterRPC } from "./filter_rpc"; + +export const FilterCodec = "/vac/waku/filter/2.0.0-beta1"; + +const log = debug("waku:filter"); + +type FilterSubscriptionOpts = { + topic?: string; + peerId?: PeerId; + contentTopics: string[]; +}; + +type FilterCallback = (msg: WakuMessage) => void | Promise; + +type UnsubscribeFunction = () => Promise; + +export class WakuFilter { + private subscriptions: { + [requestId: string]: FilterCallback; + }; + public decryptionKeys: Map< + Uint8Array, + { method?: DecryptionMethod; contentTopics?: string[] } + >; + constructor(public libp2p: Libp2p) { + this.libp2p.handle(FilterCodec, this.onRequest.bind(this)); + this.subscriptions = {}; + this.decryptionKeys = new Map(); + } + + async subscribe( + opts: FilterSubscriptionOpts, + callback: FilterCallback + ): Promise { + const topic = opts.topic || DefaultPubSubTopic; + const contentFilters = opts.contentTopics.map((contentTopic) => ({ + contentTopic, + })); + const request = FilterRPC.createRequest( + topic, + contentFilters, + undefined, + true + ); + const peer = await this.getPeer(); + const connection = this.libp2p.connectionManager.get(peer.id); + if (!connection) { + throw "Failed to get a connection to the peer"; + } + + const { stream } = await connection.newStream(FilterCodec); + try { + await pipe([request.encode()], lp.encode(), stream.sink); + } catch (e) { + log("Error subscribing", e); + } + + this.addCallback(request.requestId, callback); + + return async () => { + await this.unsubscribe(topic, contentFilters, request.requestId, peer); + this.removeCallback(request.requestId); + }; + } + + private async onRequest({ stream }: Libp2p.HandlerProps): Promise { + log("Receiving message push"); + try { + await pipe( + stream.source, + lp.decode(), + async (source: AsyncIterable) => { + for await (const bytes of source) { + const res = FilterRPC.decode(bytes.slice()); + if (res.push?.messages?.length) { + await this.pushMessages(res.requestId, res.push.messages); + } + } + } + ); + } catch (e) { + log("Error decoding message", e); + } + } + + private async pushMessages( + requestId: string, + messages: WakuMessageProto[] + ): Promise { + const callback = this.subscriptions[requestId]; + if (!callback) { + console.warn(`No callback registered for request ID ${requestId}`); + return; + } + for (const message of messages) { + const decoded = await WakuMessage.decodeProto(message, []); + if (!decoded) { + console.error("Not able to decode message"); + continue; + } + callback(decoded); + } + } + + private addCallback(requestId: string, callback: FilterCallback): void { + this.subscriptions[requestId] = callback; + } + + private removeCallback(requestId: string): void { + delete this.subscriptions[requestId]; + } + + private async unsubscribe( + topic: string, + contentFilters: ContentFilter[], + requestId: string, + peer: Peer + ): Promise { + const unsubscribeRequest = FilterRPC.createRequest( + topic, + contentFilters, + requestId, + false + ); + const connection = this.libp2p.connectionManager.get(peer.id); + if (!connection) { + throw "Failed to get a connection to the peer"; + } + const { stream } = await connection.newStream(FilterCodec); + try { + await pipe([unsubscribeRequest.encode()], lp.encode(), stream.sink); + } catch (e) { + console.error("Error unsubscribing", e); + } + } + + private async getPeer(peerId?: PeerId): Promise { + let peer; + if (peerId) { + peer = await this.libp2p.peerStore.get(peerId); + if (!peer) + throw `Failed to retrieve connection details for provided peer in peer store: ${peerId.toB58String()}`; + } else { + peer = await this.randomPeer; + if (!peer) + throw "Failed to find known peer that registers waku store protocol"; + } + return peer; + } + + get peers(): AsyncIterable { + return getPeersForProtocol(this.libp2p, [FilterCodec]); + } + + get randomPeer(): Promise { + return selectRandomPeer(this.peers); + } +} diff --git a/src/test_utils/nwaku.ts b/src/test_utils/nwaku.ts index 21d91285ba..e428730cb1 100644 --- a/src/test_utils/nwaku.ts +++ b/src/test_utils/nwaku.ts @@ -33,6 +33,7 @@ export interface Args { relay?: boolean; rpc?: boolean; rpcAdmin?: boolean; + filter?: boolean; nodekey?: string; portsShift?: number; logLevel?: LogLevel; From 5faa186b34007b98270bf6b79c68f244f3278bfc Mon Sep 17 00:00:00 2001 From: Nicholas Molnar Date: Mon, 16 May 2022 08:55:02 -0700 Subject: [PATCH 03/19] Add decryption keys --- src/lib/waku_filter/index.ts | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/src/lib/waku_filter/index.ts b/src/lib/waku_filter/index.ts index 0f96105d32..94548af4a7 100644 --- a/src/lib/waku_filter/index.ts +++ b/src/lib/waku_filter/index.ts @@ -1,5 +1,4 @@ import debug from "debug"; -// import concat from "it-concat"; import lp from "it-length-prefixed"; import { pipe } from "it-pipe"; import Libp2p from "libp2p"; @@ -62,7 +61,7 @@ export class WakuFilter { const { stream } = await connection.newStream(FilterCodec); try { - await pipe([request.encode()], lp.encode(), stream.sink); + await pipe([request.encode()], lp.encode(), stream); } catch (e) { log("Error subscribing", e); } @@ -104,8 +103,19 @@ export class WakuFilter { console.warn(`No callback registered for request ID ${requestId}`); return; } + + const decryptionKeys = Array.from(this.decryptionKeys).map( + ([key, { method, contentTopics }]) => { + return { + key, + method, + contentTopics, + }; + } + ); + for (const message of messages) { - const decoded = await WakuMessage.decodeProto(message, []); + const decoded = await WakuMessage.decodeProto(message, decryptionKeys); if (!decoded) { console.error("Not able to decode message"); continue; From c77b74c10ce8741ba5f015c8baea6179caf3eafd Mon Sep 17 00:00:00 2001 From: Nicholas Molnar Date: Mon, 16 May 2022 08:57:26 -0700 Subject: [PATCH 04/19] Add/delete decryption keys --- src/lib/waku.ts | 2 ++ src/lib/waku_filter/index.ts | 25 +++++++++++++++++++++++++ 2 files changed, 27 insertions(+) diff --git a/src/lib/waku.ts b/src/lib/waku.ts index 99fb6c9dc6..b38573487f 100644 --- a/src/lib/waku.ts +++ b/src/lib/waku.ts @@ -310,6 +310,7 @@ export class Waku { ): void { this.relay.addDecryptionKey(key, options); this.store.addDecryptionKey(key, options); + this.filter.addDecryptionKey(key, options); } /** @@ -321,6 +322,7 @@ export class Waku { deleteDecryptionKey(key: Uint8Array | string): void { this.relay.deleteDecryptionKey(key); this.store.deleteDecryptionKey(key); + this.filter.deleteDecryptionKey(key); } /** diff --git a/src/lib/waku_filter/index.ts b/src/lib/waku_filter/index.ts index 94548af4a7..939f05c8d7 100644 --- a/src/lib/waku_filter/index.ts +++ b/src/lib/waku_filter/index.ts @@ -6,6 +6,7 @@ import { Peer, PeerId } from "libp2p/src/peer-store"; import { WakuMessage as WakuMessageProto } from "../../proto/waku/v2/message"; import { getPeersForProtocol, selectRandomPeer } from "../select_peer"; +import { hexToBytes } from "../utils"; import { DefaultPubSubTopic } from "../waku"; import { DecryptionMethod, WakuMessage } from "../waku_message/index"; @@ -170,6 +171,30 @@ export class WakuFilter { return peer; } + /** + * Register a decryption key to attempt decryption of messages received in any + * subsequent [[subscribe]] call. This can either be a private key for + * asymmetric encryption or a symmetric key. [[WakuStore]] will attempt to + * decrypt messages using both methods. + * + * Strings must be in hex format. + */ + addDecryptionKey( + key: Uint8Array | string, + options?: { method?: DecryptionMethod; contentTopics?: string[] } + ): void { + this.decryptionKeys.set(hexToBytes(key), options ?? {}); + } + + /**cursorV2Beta4 + * Delete a decryption key so that it cannot be used in future [[subscribe]] calls + * + * Strings must be in hex format. + */ + deleteDecryptionKey(key: Uint8Array | string): void { + this.decryptionKeys.delete(hexToBytes(key)); + } + get peers(): AsyncIterable { return getPeersForProtocol(this.libp2p, [FilterCodec]); } From 41f01c6d601e201cc9aa2befa299a49e55ef7f25 Mon Sep 17 00:00:00 2001 From: Nicholas Molnar Date: Mon, 16 May 2022 10:19:48 -0700 Subject: [PATCH 05/19] Fix comment --- src/lib/waku_filter/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib/waku_filter/index.ts b/src/lib/waku_filter/index.ts index 939f05c8d7..2bfafb3bf5 100644 --- a/src/lib/waku_filter/index.ts +++ b/src/lib/waku_filter/index.ts @@ -166,7 +166,7 @@ export class WakuFilter { } else { peer = await this.randomPeer; if (!peer) - throw "Failed to find known peer that registers waku store protocol"; + throw "Failed to find known peer that registers waku filter protocol"; } return peer; } From 971d080ab5f21e6705483297956404aa93a3cdc3 Mon Sep 17 00:00:00 2001 From: Nicholas Molnar Date: Mon, 16 May 2022 10:30:27 -0700 Subject: [PATCH 06/19] Code cleanup --- src/lib/waku_filter/index.ts | 46 ++++++++++++++++++++++-------------- 1 file changed, 28 insertions(+), 18 deletions(-) diff --git a/src/lib/waku_filter/index.ts b/src/lib/waku_filter/index.ts index 2bfafb3bf5..562a19efd8 100644 --- a/src/lib/waku_filter/index.ts +++ b/src/lib/waku_filter/index.ts @@ -1,7 +1,7 @@ import debug from "debug"; import lp from "it-length-prefixed"; import { pipe } from "it-pipe"; -import Libp2p from "libp2p"; +import Libp2p, { MuxedStream } from "libp2p"; import { Peer, PeerId } from "libp2p/src/peer-store"; import { WakuMessage as WakuMessageProto } from "../../proto/waku/v2/message"; @@ -34,6 +34,7 @@ export class WakuFilter { Uint8Array, { method?: DecryptionMethod; contentTopics?: string[] } >; + constructor(public libp2p: Libp2p) { this.libp2p.handle(FilterCodec, this.onRequest.bind(this)); this.subscriptions = {}; @@ -54,17 +55,15 @@ export class WakuFilter { undefined, true ); - const peer = await this.getPeer(); - const connection = this.libp2p.connectionManager.get(peer.id); - if (!connection) { - throw "Failed to get a connection to the peer"; - } - const { stream } = await connection.newStream(FilterCodec); + const peer = await this.getPeer(); + const stream = await this.newStream(peer); + try { await pipe([request.encode()], lp.encode(), stream); } catch (e) { log("Error subscribing", e); + throw e; } this.addCallback(request.requestId, callback); @@ -101,7 +100,7 @@ export class WakuFilter { ): Promise { const callback = this.subscriptions[requestId]; if (!callback) { - console.warn(`No callback registered for request ID ${requestId}`); + log(`No callback registered for request ID ${requestId}`); return; } @@ -118,7 +117,7 @@ export class WakuFilter { for (const message of messages) { const decoded = await WakuMessage.decodeProto(message, decryptionKeys); if (!decoded) { - console.error("Not able to decode message"); + log("Not able to decode message"); continue; } callback(decoded); @@ -145,28 +144,39 @@ export class WakuFilter { requestId, false ); - const connection = this.libp2p.connectionManager.get(peer.id); - if (!connection) { - throw "Failed to get a connection to the peer"; - } - const { stream } = await connection.newStream(FilterCodec); + const stream = await this.newStream(peer); try { await pipe([unsubscribeRequest.encode()], lp.encode(), stream.sink); } catch (e) { - console.error("Error unsubscribing", e); + log("Error unsubscribing", e); + throw e; } } + private async newStream(peer: Peer): Promise { + const connection = this.libp2p.connectionManager.get(peer.id); + if (!connection) { + throw new Error("Failed to get a connection to the peer"); + } + + const { stream } = await connection.newStream(FilterCodec); + return stream; + } + private async getPeer(peerId?: PeerId): Promise { let peer; if (peerId) { peer = await this.libp2p.peerStore.get(peerId); if (!peer) - throw `Failed to retrieve connection details for provided peer in peer store: ${peerId.toB58String()}`; + throw new Error( + `Failed to retrieve connection details for provided peer in peer store: ${peerId.toB58String()}` + ); } else { peer = await this.randomPeer; if (!peer) - throw "Failed to find known peer that registers waku filter protocol"; + throw new Error( + "Failed to find known peer that registers waku filter protocol" + ); } return peer; } @@ -186,7 +196,7 @@ export class WakuFilter { this.decryptionKeys.set(hexToBytes(key), options ?? {}); } - /**cursorV2Beta4 + /** * Delete a decryption key so that it cannot be used in future [[subscribe]] calls * * Strings must be in hex format. From c534bd6cffa7848eab2bc737391d2d355b72096c Mon Sep 17 00:00:00 2001 From: Nicholas Molnar Date: Mon, 16 May 2022 10:57:50 -0700 Subject: [PATCH 07/19] Add more comments --- src/lib/waku_filter/filter_rpc.ts | 12 ++++++++++++ src/lib/waku_filter/index.ts | 29 ++++++++++++++++++++++++++--- 2 files changed, 38 insertions(+), 3 deletions(-) diff --git a/src/lib/waku_filter/filter_rpc.ts b/src/lib/waku_filter/filter_rpc.ts index 38624bef82..6ce3b7bc23 100644 --- a/src/lib/waku_filter/filter_rpc.ts +++ b/src/lib/waku_filter/filter_rpc.ts @@ -7,6 +7,9 @@ export type ContentFilter = { contentTopic: string; }; +/** + * FilterRPC represents a message conforming to the Waku Filter protocol + */ export class FilterRPC { public constructor(public proto: proto.FilterRPC) {} @@ -27,11 +30,20 @@ export class FilterRPC { }); } + /** + * + * @param bytes Uint8Array of bytes from a FilterRPC message + * @returns FilterRPC + */ static decode(bytes: Uint8Array): FilterRPC { const res = proto.FilterRPC.decode(Reader.create(bytes)); return new FilterRPC(res); } + /** + * Encode the current FilterRPC request to bytes + * @returns Uint8Array + */ encode(): Uint8Array { return proto.FilterRPC.encode(this.proto).finish(); } diff --git a/src/lib/waku_filter/index.ts b/src/lib/waku_filter/index.ts index 562a19efd8..ee71fb8f07 100644 --- a/src/lib/waku_filter/index.ts +++ b/src/lib/waku_filter/index.ts @@ -17,8 +17,17 @@ export const FilterCodec = "/vac/waku/filter/2.0.0-beta1"; const log = debug("waku:filter"); type FilterSubscriptionOpts = { + /** + * The Pubsub topic for the subscription + */ topic?: string; + /** + * Optionally specify a PeerId for the subscription. If not included, will use a random peer + */ peerId?: PeerId; + /** + * Array of ContentTopics to subscribe to. If empty, will subscribe to all messages on the network + */ contentTopics: string[]; }; @@ -26,6 +35,11 @@ type FilterCallback = (msg: WakuMessage) => void | Promise; type UnsubscribeFunction = () => Promise; +/** + * Implements part of the [Waku v2 Filter protocol](https://rfc.vac.dev/spec/12/). + * + * WakuFilter can be used as a light filter node, but cannot currently be used as a full node that pushes messages to clients + */ export class WakuFilter { private subscriptions: { [requestId: string]: FilterCallback; @@ -36,11 +50,17 @@ export class WakuFilter { >; constructor(public libp2p: Libp2p) { - this.libp2p.handle(FilterCodec, this.onRequest.bind(this)); this.subscriptions = {}; this.decryptionKeys = new Map(); + this.libp2p.handle(FilterCodec, this.onRequest.bind(this)); } + /** + * + * @param opts The FilterSubscriptionOpts used to narrow which messages are returned, and which peer to connect to + * @param callback A function that will be called on each message returned by the filter + * @returns Unsubscribe function that can be used to end the subscription + */ async subscribe( opts: FilterSubscriptionOpts, callback: FilterCallback @@ -144,6 +164,7 @@ export class WakuFilter { requestId, false ); + const stream = await this.newStream(peer); try { await pipe([unsubscribeRequest.encode()], lp.encode(), stream.sink); @@ -167,16 +188,18 @@ export class WakuFilter { let peer; if (peerId) { peer = await this.libp2p.peerStore.get(peerId); - if (!peer) + if (!peer) { throw new Error( `Failed to retrieve connection details for provided peer in peer store: ${peerId.toB58String()}` ); + } } else { peer = await this.randomPeer; - if (!peer) + if (!peer) { throw new Error( "Failed to find known peer that registers waku filter protocol" ); + } } return peer; } From cd71beb26b0361b7c483ada292dfb60e1121ffa3 Mon Sep 17 00:00:00 2001 From: Nicholas Molnar Date: Mon, 16 May 2022 11:00:35 -0700 Subject: [PATCH 08/19] Fix more comments --- src/lib/waku_filter/index.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/lib/waku_filter/index.ts b/src/lib/waku_filter/index.ts index ee71fb8f07..a0b8689325 100644 --- a/src/lib/waku_filter/index.ts +++ b/src/lib/waku_filter/index.ts @@ -22,11 +22,11 @@ type FilterSubscriptionOpts = { */ topic?: string; /** - * Optionally specify a PeerId for the subscription. If not included, will use a random peer + * Optionally specify a PeerId for the subscription. If not included, will use a random peer. */ peerId?: PeerId; /** - * Array of ContentTopics to subscribe to. If empty, will subscribe to all messages on the network + * Array of ContentTopics to subscribe to. If empty, no messages will be returned from the filter. */ contentTopics: string[]; }; @@ -38,7 +38,7 @@ type UnsubscribeFunction = () => Promise; /** * Implements part of the [Waku v2 Filter protocol](https://rfc.vac.dev/spec/12/). * - * WakuFilter can be used as a light filter node, but cannot currently be used as a full node that pushes messages to clients + * WakuFilter can be used as a light filter node, but cannot currently be used as a full node that pushes messages to clients. */ export class WakuFilter { private subscriptions: { From 8aa966c2f6eaf5351ff55a91fbbb22ffaf6cd2d3 Mon Sep 17 00:00:00 2001 From: Nicholas Molnar Date: Mon, 16 May 2022 12:34:31 -0700 Subject: [PATCH 09/19] Consolidate imports --- src/lib/waku.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/lib/waku.ts b/src/lib/waku.ts index b38573487f..a9b66d8d0f 100644 --- a/src/lib/waku.ts +++ b/src/lib/waku.ts @@ -17,8 +17,7 @@ import { Multiaddr, multiaddr } from "multiaddr"; import PeerId from "peer-id"; import { Bootstrap, BootstrapOptions } from "./discovery"; -import { WakuFilter } from "./waku_filter"; -import { FilterCodec } from "./waku_filter"; +import { FilterCodec, WakuFilter } from "./waku_filter"; import { LightPushCodec, WakuLightPush } from "./waku_light_push"; import { DecryptionMethod, WakuMessage } from "./waku_message"; import { RelayCodecs, WakuRelay } from "./waku_relay"; From e3015abce9b9d2b8043b107e9792615ccda92c51 Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Thu, 26 May 2022 14:48:33 +1000 Subject: [PATCH 10/19] Add filter to `dial` --- src/lib/waku.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/lib/waku.ts b/src/lib/waku.ts index a9b66d8d0f..3df90f8476 100644 --- a/src/lib/waku.ts +++ b/src/lib/waku.ts @@ -265,6 +265,9 @@ export class Waku { if (_protocols.includes(Protocols.LightPush)) { codecs.push(LightPushCodec); } + if (_protocols.includes(Protocols.Filter)) { + codecs.push(FilterCodec); + } return this.libp2p.dialProtocol(peer, codecs); } From 4c640e0f9335250f01d15612355b87677317007f Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Thu, 26 May 2022 14:49:03 +1000 Subject: [PATCH 11/19] Remove due `filter` --- src/test_utils/nwaku.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/src/test_utils/nwaku.ts b/src/test_utils/nwaku.ts index e428730cb1..21d91285ba 100644 --- a/src/test_utils/nwaku.ts +++ b/src/test_utils/nwaku.ts @@ -33,7 +33,6 @@ export interface Args { relay?: boolean; rpc?: boolean; rpcAdmin?: boolean; - filter?: boolean; nodekey?: string; portsShift?: number; logLevel?: LogLevel; From f7613febed8e73817ddc930e37e1a86e468eeb6a Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Thu, 26 May 2022 14:51:01 +1000 Subject: [PATCH 12/19] Simplify import --- src/lib/waku_filter/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib/waku_filter/index.ts b/src/lib/waku_filter/index.ts index a0b8689325..c015ac3558 100644 --- a/src/lib/waku_filter/index.ts +++ b/src/lib/waku_filter/index.ts @@ -8,7 +8,7 @@ import { WakuMessage as WakuMessageProto } from "../../proto/waku/v2/message"; import { getPeersForProtocol, selectRandomPeer } from "../select_peer"; import { hexToBytes } from "../utils"; import { DefaultPubSubTopic } from "../waku"; -import { DecryptionMethod, WakuMessage } from "../waku_message/index"; +import { DecryptionMethod, WakuMessage } from "../waku_message"; import { ContentFilter, FilterRPC } from "./filter_rpc"; From e5dde6f9e94d8288681c196c9ab22eddf367a756 Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Thu, 26 May 2022 15:00:22 +1000 Subject: [PATCH 13/19] Minor fixes --- src/lib/waku_filter/index.ts | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/lib/waku_filter/index.ts b/src/lib/waku_filter/index.ts index c015ac3558..d0e501c210 100644 --- a/src/lib/waku_filter/index.ts +++ b/src/lib/waku_filter/index.ts @@ -20,7 +20,7 @@ type FilterSubscriptionOpts = { /** * The Pubsub topic for the subscription */ - topic?: string; + pubsubTopic?: string; /** * Optionally specify a PeerId for the subscription. If not included, will use a random peer. */ @@ -65,7 +65,7 @@ export class WakuFilter { opts: FilterSubscriptionOpts, callback: FilterCallback ): Promise { - const topic = opts.topic || DefaultPubSubTopic; + const topic = opts.pubsubTopic || DefaultPubSubTopic; const contentFilters = opts.contentTopics.map((contentTopic) => ({ contentTopic, })); @@ -76,13 +76,20 @@ export class WakuFilter { true ); - const peer = await this.getPeer(); + const peer = await this.getPeer(opts.peerId); const stream = await this.newStream(peer); try { await pipe([request.encode()], lp.encode(), stream); } catch (e) { - log("Error subscribing", e); + log( + "Error subscribing to peer ", + peer.id.toB58String(), + "for content topics", + opts.contentTopics, + ": ", + e + ); throw e; } From 4cf3d3ffd8f0304f1ad05fb7c91fa561bd4a7a16 Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Thu, 26 May 2022 15:05:18 +1000 Subject: [PATCH 14/19] Add some awaits --- src/lib/waku_filter/index.node.spec.ts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/lib/waku_filter/index.node.spec.ts b/src/lib/waku_filter/index.node.spec.ts index b7468eb0ca..db14526a58 100644 --- a/src/lib/waku_filter/index.node.spec.ts +++ b/src/lib/waku_filter/index.node.spec.ts @@ -50,7 +50,7 @@ describe("Waku Filter", () => { messageText, TestContentTopic ); - waku.relay.send(message); + await waku.relay.send(message); while (messageCount === 0) { await delay(250); } @@ -69,10 +69,10 @@ describe("Waku Filter", () => { { contentTopics: [TestContentTopic] }, callback ); - waku.relay.send( + await waku.relay.send( await WakuMessage.fromUtf8String("Filtering works!", TestContentTopic) ); - waku.relay.send( + await waku.relay.send( await WakuMessage.fromUtf8String( "Filtering still works!", TestContentTopic @@ -93,7 +93,7 @@ describe("Waku Filter", () => { { contentTopics: [TestContentTopic] }, callback ); - waku.relay.send( + await waku.relay.send( await WakuMessage.fromUtf8String( "This should be received", TestContentTopic @@ -101,7 +101,7 @@ describe("Waku Filter", () => { ); await delay(100); await unsubscribe(); - waku.relay.send( + await waku.relay.send( await WakuMessage.fromUtf8String( "This should not be received", TestContentTopic From 5687908ca065407aaedd2e62c5b58ef64a720456 Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Thu, 26 May 2022 16:03:46 +1000 Subject: [PATCH 15/19] Reorder parameters To match `waku.relay.addObserver`. --- src/lib/waku_filter/index.node.spec.ts | 17 +++++------------ src/lib/waku_filter/index.ts | 25 +++++++++++-------------- 2 files changed, 16 insertions(+), 26 deletions(-) diff --git a/src/lib/waku_filter/index.node.spec.ts b/src/lib/waku_filter/index.node.spec.ts index db14526a58..a9dc05e3dc 100644 --- a/src/lib/waku_filter/index.node.spec.ts +++ b/src/lib/waku_filter/index.node.spec.ts @@ -42,10 +42,7 @@ describe("Waku Filter", () => { expect(msg.contentTopic).to.eq(TestContentTopic); expect(msg.payloadAsUtf8).to.eq(messageText); }; - await waku.filter.subscribe( - { contentTopics: [TestContentTopic] }, - callback - ); + await waku.filter.subscribe(callback, [TestContentTopic]); const message = await WakuMessage.fromUtf8String( messageText, TestContentTopic @@ -65,10 +62,7 @@ describe("Waku Filter", () => { messageCount++; expect(msg.contentTopic).to.eq(TestContentTopic); }; - await waku.filter.subscribe( - { contentTopics: [TestContentTopic] }, - callback - ); + await waku.filter.subscribe(callback, [TestContentTopic]); await waku.relay.send( await WakuMessage.fromUtf8String("Filtering works!", TestContentTopic) ); @@ -89,10 +83,9 @@ describe("Waku Filter", () => { const callback = (): void => { messageCount++; }; - const unsubscribe = await waku.filter.subscribe( - { contentTopics: [TestContentTopic] }, - callback - ); + const unsubscribe = await waku.filter.subscribe(callback, [ + TestContentTopic, + ]); await waku.relay.send( await WakuMessage.fromUtf8String( "This should be received", diff --git a/src/lib/waku_filter/index.ts b/src/lib/waku_filter/index.ts index d0e501c210..883e5307cb 100644 --- a/src/lib/waku_filter/index.ts +++ b/src/lib/waku_filter/index.ts @@ -25,10 +25,6 @@ type FilterSubscriptionOpts = { * Optionally specify a PeerId for the subscription. If not included, will use a random peer. */ peerId?: PeerId; - /** - * Array of ContentTopics to subscribe to. If empty, no messages will be returned from the filter. - */ - contentTopics: string[]; }; type FilterCallback = (msg: WakuMessage) => void | Promise; @@ -56,17 +52,18 @@ export class WakuFilter { } /** - * - * @param opts The FilterSubscriptionOpts used to narrow which messages are returned, and which peer to connect to - * @param callback A function that will be called on each message returned by the filter - * @returns Unsubscribe function that can be used to end the subscription + * @param contentTopics Array of ContentTopics to subscribe to. If empty, no messages will be returned from the filter. + * @param callback A function that will be called on each message returned by the filter. + * @param opts The FilterSubscriptionOpts used to narrow which messages are returned, and which peer to connect to. + * @returns Unsubscribe function that can be used to end the subscription. */ async subscribe( - opts: FilterSubscriptionOpts, - callback: FilterCallback + callback: FilterCallback, + contentTopics: string[], + opts?: FilterSubscriptionOpts ): Promise { - const topic = opts.pubsubTopic || DefaultPubSubTopic; - const contentFilters = opts.contentTopics.map((contentTopic) => ({ + const topic = opts?.pubsubTopic || DefaultPubSubTopic; + const contentFilters = contentTopics.map((contentTopic) => ({ contentTopic, })); const request = FilterRPC.createRequest( @@ -76,7 +73,7 @@ export class WakuFilter { true ); - const peer = await this.getPeer(opts.peerId); + const peer = await this.getPeer(opts?.peerId); const stream = await this.newStream(peer); try { @@ -86,7 +83,7 @@ export class WakuFilter { "Error subscribing to peer ", peer.id.toB58String(), "for content topics", - opts.contentTopics, + contentTopics, ": ", e ); From 2e09b6dec36e0698ab7614cc37b6dc9a96518e5d Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Thu, 26 May 2022 16:05:36 +1000 Subject: [PATCH 16/19] Use Map Preferred to object when possible. --- src/lib/waku_filter/index.ts | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/src/lib/waku_filter/index.ts b/src/lib/waku_filter/index.ts index 883e5307cb..16785b339f 100644 --- a/src/lib/waku_filter/index.ts +++ b/src/lib/waku_filter/index.ts @@ -37,16 +37,14 @@ type UnsubscribeFunction = () => Promise; * WakuFilter can be used as a light filter node, but cannot currently be used as a full node that pushes messages to clients. */ export class WakuFilter { - private subscriptions: { - [requestId: string]: FilterCallback; - }; + private subscriptions: Map; public decryptionKeys: Map< Uint8Array, { method?: DecryptionMethod; contentTopics?: string[] } >; constructor(public libp2p: Libp2p) { - this.subscriptions = {}; + this.subscriptions = new Map(); this.decryptionKeys = new Map(); this.libp2p.handle(FilterCodec, this.onRequest.bind(this)); } @@ -122,7 +120,7 @@ export class WakuFilter { requestId: string, messages: WakuMessageProto[] ): Promise { - const callback = this.subscriptions[requestId]; + const callback = this.subscriptions.get(requestId); if (!callback) { log(`No callback registered for request ID ${requestId}`); return; @@ -149,11 +147,11 @@ export class WakuFilter { } private addCallback(requestId: string, callback: FilterCallback): void { - this.subscriptions[requestId] = callback; + this.subscriptions.set(requestId, callback); } private removeCallback(requestId: string): void { - delete this.subscriptions[requestId]; + this.subscriptions.delete(requestId); } private async unsubscribe( From b93e535af90eca1aed70d80ba2e0bb5e53a44cd7 Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Fri, 27 May 2022 20:19:35 +1000 Subject: [PATCH 17/19] Add wait for remote peer (filter) test --- src/lib/waku.node.spec.ts | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/src/lib/waku.node.spec.ts b/src/lib/waku.node.spec.ts index d854c634d4..134235dc0e 100644 --- a/src/lib/waku.node.spec.ts +++ b/src/lib/waku.node.spec.ts @@ -305,4 +305,27 @@ describe("Wait for remote peer / get peers", function () { expect(nimPeerId).to.not.be.undefined; expect(peers.includes(nimPeerId as string)).to.be.true; }); + + it("Filter", async function () { + this.timeout(20_000); + nwaku = new Nwaku(makeLogFileName(this)); + await nwaku.start({ filter: true }); + const multiAddrWithId = await nwaku.getMultiaddrWithId(); + + waku = await Waku.create({ + staticNoiseKey: NOISE_KEY_1, + }); + await waku.dial(multiAddrWithId); + await waku.waitForRemotePeer([Protocols.Filter]); + + const peers = []; + for await (const peer of waku.filter.peers) { + peers.push(peer.id.toB58String()); + } + + const nimPeerId = multiAddrWithId.getPeerId(); + + expect(nimPeerId).to.not.be.undefined; + expect(peers.includes(nimPeerId as string)).to.be.true; + }); }); From d3082c8dd89cb5f3d48f7d065d11bf07bf83b4c4 Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Thu, 26 May 2022 14:49:51 +1000 Subject: [PATCH 18/19] Update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7439ff2fd4..1fa0bcf34f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - `waitForRemotePeer` now accepts a `timeoutMs` parameter that rejects the promise if it is reached. By default, no timeout is applied. +- **Experimental** support for the [Waku Filter](https://rfc.vac.dev/spec/12/) protocol (client side) added, currently only works in NodeJS. ### Changed From 7a2dcd98818591f8ca9b5df9ea66d385453772f7 Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Fri, 27 May 2022 20:23:43 +1000 Subject: [PATCH 19/19] Update comments --- src/lib/waku_filter/index.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/lib/waku_filter/index.ts b/src/lib/waku_filter/index.ts index 16785b339f..dbc09cc37e 100644 --- a/src/lib/waku_filter/index.ts +++ b/src/lib/waku_filter/index.ts @@ -32,9 +32,11 @@ type FilterCallback = (msg: WakuMessage) => void | Promise; type UnsubscribeFunction = () => Promise; /** - * Implements part of the [Waku v2 Filter protocol](https://rfc.vac.dev/spec/12/). + * Implements client side of the [Waku v2 Filter protocol](https://rfc.vac.dev/spec/12/). * - * WakuFilter can be used as a light filter node, but cannot currently be used as a full node that pushes messages to clients. + * Note this currently only works in NodeJS when the Waku node is listening on a port, see: + * - https://github.com/status-im/go-waku/issues/245 + * - https://github.com/status-im/nwaku/issues/948 */ export class WakuFilter { private subscriptions: Map;