diff --git a/packages/core/src/lib/filter/v2/index.ts b/packages/core/src/lib/filter/v2/index.ts new file mode 100644 index 0000000000..7eb4a39fb3 --- /dev/null +++ b/packages/core/src/lib/filter/v2/index.ts @@ -0,0 +1,275 @@ +import type { Libp2p } from "@libp2p/interface-libp2p"; +import type { Peer } from "@libp2p/interface-peer-store"; +import type { IncomingStreamData } from "@libp2p/interface-registrar"; +import type { + ActiveSubscriptions, + Callback, + IDecodedMessage, + IDecoder, + IFilter, + ProtocolCreateOptions, + ProtocolOptions, +} from "@waku/interfaces"; +import { WakuMessage as WakuMessageProto } from "@waku/proto"; +import debug from "debug"; +import all from "it-all"; +import * as lp from "it-length-prefixed"; +import { pipe } from "it-pipe"; +import { Uint8ArrayList } from "uint8arraylist"; + +import { BaseProtocol } from "../../base_protocol.js"; +import { DefaultPubSubTopic } from "../../constants.js"; +import { groupByContentTopic } from "../../group_by.js"; +import { toProtoMessage } from "../../to_proto_message.js"; + +import { + ContentFilter, + FilterPushRpc, + FilterSubscribeResponse, + FilterSubscribeRpc, +} from "./rpc.js"; + +export type UnsubscribeFunction = () => Promise; +export type RequestID = string; + +type Subscription = { + decoders: IDecoder[]; + callback: Callback; + pubSubTopic: string; +}; + +const FilterCodecs = { + SUBSCRIBE: "/vac/waku/filter-subscribe/2.0.0-beta1", + PUSH: "/vac/waku/filter-push/2.0.0-beta1", +}; + +const log = debug("waku:filter_v2"); + +/** + * Implements client side of the [Waku v2 Filter protocol](https://rfc.vac.dev/spec/12/). + * + * Note this currently only works in NodeJS when the Waku node is listening on a port, see: + * - https://github.com/status-im/go-waku/issues/245 + * - https://github.com/status-im/nwaku/issues/948 + */ +class FilterV2 extends BaseProtocol implements IFilter { + options: ProtocolCreateOptions; + private subscriptions: Map; + + constructor(public libp2p: Libp2p, options?: ProtocolCreateOptions) { + super( + Object.values(FilterCodecs), + libp2p.peerStore, + libp2p.getConnections.bind(libp2p) + ); + this.options = options ?? {}; + this.subscriptions = new Map(); + this.libp2p + .handle(this.multicodecs, this.onRequest.bind(this)) + .catch((e) => log("Failed to register filter protocol", e)); + } + + /** + * @param decoders Decoder or array of Decoders to use to decode messages, it also specifies the content topics. + * @param callback A function that will be called on each message returned by the filter. + * @param opts The FilterSubscriptionOpts used to narrow which messages are returned, and which peer to connect to. + * @returns Unsubscribe function that can be used to end the subscription. + */ + async subscribe( + decoders: IDecoder | IDecoder[], + callback: Callback, + opts?: ProtocolOptions + ): Promise { + const decodersArray = Array.isArray(decoders) ? decoders : [decoders]; + const { pubSubTopic = DefaultPubSubTopic } = this.options; + + const contentTopics = Array.from(groupByContentTopic(decodersArray).keys()); + + const contentFilters = contentTopics.map((contentTopic) => ({ + contentTopic, + })); + const request = FilterSubscribeRpc.createSubscribeRequest( + pubSubTopic, + contentFilters.map((contentFilter) => contentFilter.contentTopic), + undefined + ); + + const { requestId } = request; + + const peer = await this.getPeer(opts?.peerId); + const stream = await this.newStream(peer); + + try { + const res = await pipe( + [request.encode()], + lp.encode(), + stream, + lp.decode(), + async (source) => await all(source) + ); + + const bytes = new Uint8ArrayList(); + res.forEach((chunk) => { + bytes.append(chunk); + }); + + const filterResponse = FilterSubscribeResponse.decode(bytes.slice()); + + const { statusCode, statusDesc, requestId } = filterResponse; + + if (statusCode < 200 || statusCode >= 300) { + throw new Error( + `Filter subscribe request failed with status code ${statusCode} and description ${statusDesc} for request ${requestId}` + ); + } + + log("response", res); + } catch (e) { + log( + "Error subscribing to peer ", + peer.id.toString(), + "for content topics", + contentTopics, + ": ", + e + ); + throw e; + } + + const subscription: Subscription = { + callback, + decoders: decodersArray, + pubSubTopic, + }; + this.subscriptions.set(requestId, subscription); + + return async () => { + await this.unsubscribe(pubSubTopic, contentFilters, requestId, peer); + this.subscriptions.delete(requestId); + }; + } + + public getActiveSubscriptions(): ActiveSubscriptions { + const map: ActiveSubscriptions = new Map(); + const subscriptions = this.subscriptions as Map< + RequestID, + Subscription + >; + + for (const item of subscriptions.values()) { + const values = map.get(item.pubSubTopic) || []; + const nextValues = item.decoders.map((decoder) => decoder.contentTopic); + map.set(item.pubSubTopic, [...values, ...nextValues]); + } + + return map; + } + + private onRequest(streamData: IncomingStreamData): void { + log("Receiving message push"); + try { + pipe(streamData.stream, lp.decode(), async (source) => { + for await (const bytes of source) { + const res = FilterPushRpc.decode(bytes.slice()); + const { wakuMessage, pubsubTopic } = res; + + if (!wakuMessage) { + // this should never happen as: + // Each `MessagePush` MUST contain one (and only one) `waku_message`. + // defined https://github.com/vacp2p/rfc/pull/562/files# + throw new Error("No waku message found"); + } + + await this.pushMessages(); + + // if (res.requestId && res.push?.messages?.length) { + // await this.pushMessages(res.requestId, res.push.messages); + // } + } + }).then( + () => { + log("Receiving pipe closed."); + }, + (e) => { + log("Error with receiving pipe", e); + } + ); + } catch (e) { + log("Error decoding message", e); + } + } + + private async pushMessages( + requestId: string, + messages: WakuMessageProto[] + ): Promise { + // const subscription = this.subscriptions.get(requestId) as + // | Subscription + // | undefined; + // if (!subscription) { + // log(`No subscription locally registered for request ID ${requestId}`); + // return; + // } + const { decoders, callback, pubSubTopic } = subscription; + + if (!decoders || !decoders.length) { + log(`No decoder registered for request ID ${requestId}`); + return; + } + + for (const protoMessage of messages) { + const contentTopic = protoMessage.contentTopic; + if (!contentTopic) { + log("Message has no content topic, skipping"); + return; + } + + let didDecodeMsg = false; + // We don't want to wait for decoding failure, just attempt to decode + // all messages and do the call back on the one that works + // noinspection ES6MissingAwait + decoders.forEach(async (dec: IDecoder) => { + if (didDecodeMsg) return; + const decoded = await dec.fromProtoObj( + pubSubTopic, + toProtoMessage(protoMessage) + ); + if (!decoded) { + log("Not able to decode message"); + return; + } + // This is just to prevent more decoding attempt + // TODO: Could be better if we were to abort promises + didDecodeMsg = Boolean(decoded); + await callback(decoded); + }); + } + } + + private async unsubscribe( + topic: string, + contentFilters: ContentFilter[], + requestId: string, + peer: Peer + ): Promise { + const unsubscribeRequest = FilterSubscribeRpc.createUnsubscribeRequest( + topic, + contentFilters.map((contentFilter) => contentFilter.contentTopic), + requestId + ); + + const stream = await this.newStream(peer); + try { + await pipe([unsubscribeRequest.encode()], lp.encode(), stream.sink); + } catch (e) { + log("Error unsubscribing", e); + throw e; + } + } +} + +export function wakuFilter( + init: Partial = {} +): (libp2p: Libp2p) => IFilter { + return (libp2p: Libp2p) => new FilterV2(libp2p, init); +} diff --git a/packages/core/src/lib/filter/v2/rpc.ts b/packages/core/src/lib/filter/v2/rpc.ts new file mode 100644 index 0000000000..85993f89e1 --- /dev/null +++ b/packages/core/src/lib/filter/v2/rpc.ts @@ -0,0 +1,170 @@ +import { proto_filter_v2 as proto } from "@waku/proto"; +import { v4 as uuid } from "uuid"; + +/** + * FilterPushRPC represents a message conforming to the Waku FilterPush protocol. + * Protocol documentation: https://rfc.vac.dev/spec/12/ + */ +export class FilterPushRpc { + public constructor(public proto: proto.MessagePushV2) {} + + /** + * Create a FilterPushRPC object with the provided parameters. + * @param wakuMessage The WakuMessage to be pushed. + * @param pubsubTopic The pubsub topic on which the message was published. + * @returns FilterPushRpc + */ + static create(wakuMessage: Uint8Array, pubsubTopic: string): FilterPushRpc { + const message = proto.WakuMessage.decode(wakuMessage); + return new FilterPushRpc({ + wakuMessage: { + payload: wakuMessage, + contentTopic: message.contentTopic, + }, + pubsubTopic: pubsubTopic, + }); + } + + /** + * Decode the given bytes into a FilterPushRpc object. + * @param bytes Uint8Array of bytes from a FilterPushRPC message. + * @returns FilterPushRpc + */ + static decode(bytes: Uint8Array): FilterPushRpc { + const res = proto.MessagePushV2.decode(bytes); + return new FilterPushRpc(res); + } + + /** + * Encode the current FilterPushRpc object to bytes. + * @returns Uint8Array + */ + encode(): Uint8Array { + return proto.MessagePushV2.encode(this.proto); + } + + /** + * Get the WakuMessage from the FilterPushRpc object. + * @returns WakuMessage as a Uint8Array + */ + get wakuMessage(): Uint8Array | undefined { + return this.proto.wakuMessage?.payload; + } + + /** + * Get the pubsub topic from the FilterPushRpc object. + * @returns string + */ + get pubsubTopic(): string { + return this.proto.pubsubTopic; + } +} + +export type ContentFilter = { + contentTopic: string; +}; + +export class FilterSubscribeRpc { + public constructor(public proto: proto.FilterSubscribeRequest) {} + + static createSubscribeRequest( + pubsubTopic: string, + contentTopics: string[], + requestId?: string + ): FilterSubscribeRpc { + return new FilterSubscribeRpc({ + requestId: requestId || uuid(), + filterSubscribeType: + proto.FilterSubscribeRequest.FilterSubscribeType.SUBSCRIBE, + pubsubTopic, + contentTopics, + }); + } + + static createUnsubscribeRequest( + pubsubTopic: string, + contentTopics: string[], + requestId?: string + ): FilterSubscribeRpc { + return new FilterSubscribeRpc({ + requestId: requestId || uuid(), + filterSubscribeType: + proto.FilterSubscribeRequest.FilterSubscribeType.UNSUBSCRIBE, + pubsubTopic, + contentTopics, + }); + } + + static createUnsubscribeAllRequest( + pubsubTopic: string, + requestId?: string + ): FilterSubscribeRpc { + return new FilterSubscribeRpc({ + requestId: requestId || uuid(), + filterSubscribeType: + proto.FilterSubscribeRequest.FilterSubscribeType.UNSUBSCRIBE_ALL, + pubsubTopic, + contentTopics: [], + }); + } + + static createSubscriberPingRequest(requestId?: string): FilterSubscribeRpc { + return new FilterSubscribeRpc({ + requestId: requestId || uuid(), + filterSubscribeType: + proto.FilterSubscribeRequest.FilterSubscribeType.SUBSCRIBER_PING, + pubsubTopic: "", + contentTopics: [], + }); + } + + static decode(bytes: Uint8Array): FilterSubscribeRpc { + const res = proto.FilterSubscribeRequest.decode(bytes); + return new FilterSubscribeRpc(res); + } + + encode(): Uint8Array { + return proto.FilterSubscribeRequest.encode(this.proto); + } + + get filterSubscribeType(): proto.FilterSubscribeRequest.FilterSubscribeType { + return this.proto.filterSubscribeType; + } + + get requestId(): string { + return this.proto.requestId; + } + + get pubsubTopic(): string { + return this.proto.pubsubTopic; + } + + get contentTopics(): string[] { + return this.proto.contentTopics; + } +} + +export class FilterSubscribeResponse { + public constructor(public proto: proto.FilterSubscribeResponse) {} + + static decode(bytes: Uint8Array): FilterSubscribeResponse { + const res = proto.FilterSubscribeResponse.decode(bytes); + return new FilterSubscribeResponse(res); + } + + encode(): Uint8Array { + return proto.FilterSubscribeResponse.encode(this.proto); + } + + get statusCode(): number { + return this.proto.statusCode; + } + + get statusDesc(): string { + return this.proto.statusDesc; + } + + get requestId(): string { + return this.proto.requestId; + } +}