From 4734e4b7c77ed7450b6e980a8491292b952eefb5 Mon Sep 17 00:00:00 2001 From: Nicholas Molnar Date: Tue, 19 Apr 2022 21:51:44 -0700 Subject: [PATCH] Finish implementation --- src/lib/waku.ts | 27 +++- src/lib/waku_filter/filter_rpc.ts | 46 +++++++ src/lib/waku_filter/index.node.spec.ts | 113 ++++++++++++++++ src/lib/waku_filter/index.ts | 170 +++++++++++++++++++++++++ src/test_utils/nwaku.ts | 1 + 5 files changed, 355 insertions(+), 2 deletions(-) create mode 100644 src/lib/waku_filter/filter_rpc.ts create mode 100644 src/lib/waku_filter/index.node.spec.ts create mode 100644 src/lib/waku_filter/index.ts diff --git a/src/lib/waku.ts b/src/lib/waku.ts index bd82fe3e76..99fb6c9dc6 100644 --- a/src/lib/waku.ts +++ b/src/lib/waku.ts @@ -17,6 +17,8 @@ import { Multiaddr, multiaddr } from "multiaddr"; import PeerId from "peer-id"; import { Bootstrap, BootstrapOptions } from "./discovery"; +import { WakuFilter } from "./waku_filter"; +import { FilterCodec } from "./waku_filter"; import { LightPushCodec, WakuLightPush } from "./waku_light_push"; import { DecryptionMethod, WakuMessage } from "./waku_message"; import { RelayCodecs, WakuRelay } from "./waku_relay"; @@ -39,6 +41,7 @@ export enum Protocols { Relay = "relay", Store = "store", LightPush = "lightpush", + Filter = "filter", } export interface CreateOptions { @@ -102,6 +105,7 @@ export class Waku { public libp2p: Libp2p; public relay: WakuRelay; public store: WakuStore; + public filter: WakuFilter; public lightPush: WakuLightPush; private pingKeepAliveTimers: { @@ -115,11 +119,13 @@ export class Waku { options: CreateOptions, libp2p: Libp2p, store: WakuStore, - lightPush: WakuLightPush + lightPush: WakuLightPush, + filter: WakuFilter ) { this.libp2p = libp2p; this.relay = libp2p.pubsub as unknown as WakuRelay; this.store = store; + this.filter = filter; this.lightPush = lightPush; this.pingKeepAliveTimers = {}; this.relayKeepAliveTimers = {}; @@ -220,10 +226,17 @@ export class Waku { pubSubTopic: options?.pubSubTopic, }); const wakuLightPush = new WakuLightPush(libp2p); + const wakuFilter = new WakuFilter(libp2p); await libp2p.start(); - return new Waku(options ? options : {}, libp2p, wakuStore, wakuLightPush); + return new Waku( + options ? options : {}, + libp2p, + wakuStore, + wakuLightPush, + wakuFilter + ); } /** @@ -381,6 +394,16 @@ export class Waku { promises.push(lightPushPromise); } + if (protocols.includes(Protocols.Filter)) { + const filterPromise = (async (): Promise => { + for await (const peer of this.filter.peers) { + dbg("Filter peer found", peer.id.toB58String()); + break; + } + })(); + promises.push(filterPromise); + } + if (timeoutMs) { await rejectOnTimeout( Promise.all(promises), diff --git a/src/lib/waku_filter/filter_rpc.ts b/src/lib/waku_filter/filter_rpc.ts new file mode 100644 index 0000000000..38624bef82 --- /dev/null +++ b/src/lib/waku_filter/filter_rpc.ts @@ -0,0 +1,46 @@ +import { Reader } from "protobufjs/minimal"; +import { v4 as uuid } from "uuid"; + +import * as proto from "../../proto/waku/v2/filter"; + +export type ContentFilter = { + contentTopic: string; +}; + +export class FilterRPC { + public constructor(public proto: proto.FilterRPC) {} + + static createRequest( + topic: string, + contentFilters: ContentFilter[], + requestId?: string, + subscribe = true + ): FilterRPC { + return new FilterRPC({ + requestId: requestId || uuid(), + request: { + subscribe, + topic, + contentFilters, + }, + push: undefined, + }); + } + + static decode(bytes: Uint8Array): FilterRPC { + const res = proto.FilterRPC.decode(Reader.create(bytes)); + return new FilterRPC(res); + } + + encode(): Uint8Array { + return proto.FilterRPC.encode(this.proto).finish(); + } + + get push(): proto.MessagePush | undefined { + return this.proto.push; + } + + get requestId(): string { + return this.proto.requestId; + } +} diff --git a/src/lib/waku_filter/index.node.spec.ts b/src/lib/waku_filter/index.node.spec.ts new file mode 100644 index 0000000000..b7468eb0ca --- /dev/null +++ b/src/lib/waku_filter/index.node.spec.ts @@ -0,0 +1,113 @@ +import { expect } from "chai"; +import debug from "debug"; + +import { makeLogFileName, NOISE_KEY_1, Nwaku } from "../../test_utils"; +import { delay } from "../../test_utils/delay"; +import { Protocols, Waku } from "../waku"; +import { WakuMessage } from "../waku_message"; + +const log = debug("waku:test"); + +const TestContentTopic = "/test/1/waku-filter"; + +describe("Waku Filter", () => { + let waku: Waku; + let nwaku: Nwaku; + + afterEach(async function () { + !!nwaku && nwaku.stop(); + !!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e)); + }); + + beforeEach(async function () { + this.timeout(10000); + nwaku = new Nwaku(makeLogFileName(this)); + await nwaku.start({ filter: true }); + waku = await Waku.create({ + staticNoiseKey: NOISE_KEY_1, + libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }, + }); + await waku.dial(await nwaku.getMultiaddrWithId()); + await waku.waitForRemotePeer([Protocols.Filter, Protocols.Relay]); + }); + + it("creates a subscription", async function () { + this.timeout(10000); + + let messageCount = 0; + const messageText = "Filtering works!"; + const callback = (msg: WakuMessage): void => { + log("Got a message"); + messageCount++; + expect(msg.contentTopic).to.eq(TestContentTopic); + expect(msg.payloadAsUtf8).to.eq(messageText); + }; + await waku.filter.subscribe( + { contentTopics: [TestContentTopic] }, + callback + ); + const message = await WakuMessage.fromUtf8String( + messageText, + TestContentTopic + ); + waku.relay.send(message); + while (messageCount === 0) { + await delay(250); + } + expect(messageCount).to.eq(1); + }); + + it("handles multiple messages", async function () { + this.timeout(10000); + + let messageCount = 0; + const callback = (msg: WakuMessage): void => { + messageCount++; + expect(msg.contentTopic).to.eq(TestContentTopic); + }; + await waku.filter.subscribe( + { contentTopics: [TestContentTopic] }, + callback + ); + waku.relay.send( + await WakuMessage.fromUtf8String("Filtering works!", TestContentTopic) + ); + waku.relay.send( + await WakuMessage.fromUtf8String( + "Filtering still works!", + TestContentTopic + ) + ); + while (messageCount < 2) { + await delay(250); + } + expect(messageCount).to.eq(2); + }); + + it("unsubscribes", async function () { + let messageCount = 0; + const callback = (): void => { + messageCount++; + }; + const unsubscribe = await waku.filter.subscribe( + { contentTopics: [TestContentTopic] }, + callback + ); + waku.relay.send( + await WakuMessage.fromUtf8String( + "This should be received", + TestContentTopic + ) + ); + await delay(100); + await unsubscribe(); + waku.relay.send( + await WakuMessage.fromUtf8String( + "This should not be received", + TestContentTopic + ) + ); + await delay(100); + expect(messageCount).to.eq(1); + }); +}); diff --git a/src/lib/waku_filter/index.ts b/src/lib/waku_filter/index.ts new file mode 100644 index 0000000000..0f96105d32 --- /dev/null +++ b/src/lib/waku_filter/index.ts @@ -0,0 +1,170 @@ +import debug from "debug"; +// import concat from "it-concat"; +import lp from "it-length-prefixed"; +import { pipe } from "it-pipe"; +import Libp2p from "libp2p"; +import { Peer, PeerId } from "libp2p/src/peer-store"; + +import { WakuMessage as WakuMessageProto } from "../../proto/waku/v2/message"; +import { getPeersForProtocol, selectRandomPeer } from "../select_peer"; +import { DefaultPubSubTopic } from "../waku"; +import { DecryptionMethod, WakuMessage } from "../waku_message/index"; + +import { ContentFilter, FilterRPC } from "./filter_rpc"; + +export const FilterCodec = "/vac/waku/filter/2.0.0-beta1"; + +const log = debug("waku:filter"); + +type FilterSubscriptionOpts = { + topic?: string; + peerId?: PeerId; + contentTopics: string[]; +}; + +type FilterCallback = (msg: WakuMessage) => void | Promise; + +type UnsubscribeFunction = () => Promise; + +export class WakuFilter { + private subscriptions: { + [requestId: string]: FilterCallback; + }; + public decryptionKeys: Map< + Uint8Array, + { method?: DecryptionMethod; contentTopics?: string[] } + >; + constructor(public libp2p: Libp2p) { + this.libp2p.handle(FilterCodec, this.onRequest.bind(this)); + this.subscriptions = {}; + this.decryptionKeys = new Map(); + } + + async subscribe( + opts: FilterSubscriptionOpts, + callback: FilterCallback + ): Promise { + const topic = opts.topic || DefaultPubSubTopic; + const contentFilters = opts.contentTopics.map((contentTopic) => ({ + contentTopic, + })); + const request = FilterRPC.createRequest( + topic, + contentFilters, + undefined, + true + ); + const peer = await this.getPeer(); + const connection = this.libp2p.connectionManager.get(peer.id); + if (!connection) { + throw "Failed to get a connection to the peer"; + } + + const { stream } = await connection.newStream(FilterCodec); + try { + await pipe([request.encode()], lp.encode(), stream.sink); + } catch (e) { + log("Error subscribing", e); + } + + this.addCallback(request.requestId, callback); + + return async () => { + await this.unsubscribe(topic, contentFilters, request.requestId, peer); + this.removeCallback(request.requestId); + }; + } + + private async onRequest({ stream }: Libp2p.HandlerProps): Promise { + log("Receiving message push"); + try { + await pipe( + stream.source, + lp.decode(), + async (source: AsyncIterable) => { + for await (const bytes of source) { + const res = FilterRPC.decode(bytes.slice()); + if (res.push?.messages?.length) { + await this.pushMessages(res.requestId, res.push.messages); + } + } + } + ); + } catch (e) { + log("Error decoding message", e); + } + } + + private async pushMessages( + requestId: string, + messages: WakuMessageProto[] + ): Promise { + const callback = this.subscriptions[requestId]; + if (!callback) { + console.warn(`No callback registered for request ID ${requestId}`); + return; + } + for (const message of messages) { + const decoded = await WakuMessage.decodeProto(message, []); + if (!decoded) { + console.error("Not able to decode message"); + continue; + } + callback(decoded); + } + } + + private addCallback(requestId: string, callback: FilterCallback): void { + this.subscriptions[requestId] = callback; + } + + private removeCallback(requestId: string): void { + delete this.subscriptions[requestId]; + } + + private async unsubscribe( + topic: string, + contentFilters: ContentFilter[], + requestId: string, + peer: Peer + ): Promise { + const unsubscribeRequest = FilterRPC.createRequest( + topic, + contentFilters, + requestId, + false + ); + const connection = this.libp2p.connectionManager.get(peer.id); + if (!connection) { + throw "Failed to get a connection to the peer"; + } + const { stream } = await connection.newStream(FilterCodec); + try { + await pipe([unsubscribeRequest.encode()], lp.encode(), stream.sink); + } catch (e) { + console.error("Error unsubscribing", e); + } + } + + private async getPeer(peerId?: PeerId): Promise { + let peer; + if (peerId) { + peer = await this.libp2p.peerStore.get(peerId); + if (!peer) + throw `Failed to retrieve connection details for provided peer in peer store: ${peerId.toB58String()}`; + } else { + peer = await this.randomPeer; + if (!peer) + throw "Failed to find known peer that registers waku store protocol"; + } + return peer; + } + + get peers(): AsyncIterable { + return getPeersForProtocol(this.libp2p, [FilterCodec]); + } + + get randomPeer(): Promise { + return selectRandomPeer(this.peers); + } +} diff --git a/src/test_utils/nwaku.ts b/src/test_utils/nwaku.ts index 21d91285ba..e428730cb1 100644 --- a/src/test_utils/nwaku.ts +++ b/src/test_utils/nwaku.ts @@ -33,6 +33,7 @@ export interface Args { relay?: boolean; rpc?: boolean; rpcAdmin?: boolean; + filter?: boolean; nodekey?: string; portsShift?: number; logLevel?: LogLevel;