diff --git a/CHANGELOG.md b/CHANGELOG.md index 7439ff2fd4..1fa0bcf34f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - `waitForRemotePeer` now accepts a `timeoutMs` parameter that rejects the promise if it is reached. By default, no timeout is applied. +- **Experimental** support for the [Waku Filter](https://rfc.vac.dev/spec/12/) protocol (client side) added, currently only works in NodeJS. ### Changed diff --git a/proto/waku/v2/filter.proto b/proto/waku/v2/filter.proto new file mode 100644 index 0000000000..2ab26e2dee --- /dev/null +++ b/proto/waku/v2/filter.proto @@ -0,0 +1,25 @@ +syntax = "proto3"; + +package waku.v2; + +import "waku/v2/message.proto"; + +message FilterRequest { + bool subscribe = 1; + string topic = 2; + repeated ContentFilter content_filters = 3; + + message ContentFilter { + string content_topic = 1; + } +} + +message MessagePush { + repeated WakuMessage messages = 1; +} + +message FilterRPC { + string request_id = 1; + FilterRequest request = 2; + MessagePush push = 3; +} \ No newline at end of file diff --git a/src/lib/waku.node.spec.ts b/src/lib/waku.node.spec.ts index d854c634d4..134235dc0e 100644 --- a/src/lib/waku.node.spec.ts +++ b/src/lib/waku.node.spec.ts @@ -305,4 +305,27 @@ describe("Wait for remote peer / get peers", function () { expect(nimPeerId).to.not.be.undefined; expect(peers.includes(nimPeerId as string)).to.be.true; }); + + it("Filter", async function () { + this.timeout(20_000); + nwaku = new Nwaku(makeLogFileName(this)); + await nwaku.start({ filter: true }); + const multiAddrWithId = await nwaku.getMultiaddrWithId(); + + waku = await Waku.create({ + staticNoiseKey: NOISE_KEY_1, + }); + await waku.dial(multiAddrWithId); + await waku.waitForRemotePeer([Protocols.Filter]); + + const peers = []; + for await (const peer of waku.filter.peers) { + peers.push(peer.id.toB58String()); + } + + const nimPeerId = multiAddrWithId.getPeerId(); + + expect(nimPeerId).to.not.be.undefined; + expect(peers.includes(nimPeerId as string)).to.be.true; + }); }); diff --git a/src/lib/waku.ts b/src/lib/waku.ts index bd82fe3e76..3df90f8476 100644 --- a/src/lib/waku.ts +++ b/src/lib/waku.ts @@ -17,6 +17,7 @@ import { Multiaddr, multiaddr } from "multiaddr"; import PeerId from "peer-id"; import { Bootstrap, BootstrapOptions } from "./discovery"; +import { FilterCodec, WakuFilter } from "./waku_filter"; import { LightPushCodec, WakuLightPush } from "./waku_light_push"; import { DecryptionMethod, WakuMessage } from "./waku_message"; import { RelayCodecs, WakuRelay } from "./waku_relay"; @@ -39,6 +40,7 @@ export enum Protocols { Relay = "relay", Store = "store", LightPush = "lightpush", + Filter = "filter", } export interface CreateOptions { @@ -102,6 +104,7 @@ export class Waku { public libp2p: Libp2p; public relay: WakuRelay; public store: WakuStore; + public filter: WakuFilter; public lightPush: WakuLightPush; private pingKeepAliveTimers: { @@ -115,11 +118,13 @@ export class Waku { options: CreateOptions, libp2p: Libp2p, store: WakuStore, - lightPush: WakuLightPush + lightPush: WakuLightPush, + filter: WakuFilter ) { this.libp2p = libp2p; this.relay = libp2p.pubsub as unknown as WakuRelay; this.store = store; + this.filter = filter; this.lightPush = lightPush; this.pingKeepAliveTimers = {}; this.relayKeepAliveTimers = {}; @@ -220,10 +225,17 @@ export class Waku { pubSubTopic: options?.pubSubTopic, }); const wakuLightPush = new WakuLightPush(libp2p); + const wakuFilter = new WakuFilter(libp2p); await libp2p.start(); - return new Waku(options ? options : {}, libp2p, wakuStore, wakuLightPush); + return new Waku( + options ? options : {}, + libp2p, + wakuStore, + wakuLightPush, + wakuFilter + ); } /** @@ -253,6 +265,9 @@ export class Waku { if (_protocols.includes(Protocols.LightPush)) { codecs.push(LightPushCodec); } + if (_protocols.includes(Protocols.Filter)) { + codecs.push(FilterCodec); + } return this.libp2p.dialProtocol(peer, codecs); } @@ -297,6 +312,7 @@ export class Waku { ): void { this.relay.addDecryptionKey(key, options); this.store.addDecryptionKey(key, options); + this.filter.addDecryptionKey(key, options); } /** @@ -308,6 +324,7 @@ export class Waku { deleteDecryptionKey(key: Uint8Array | string): void { this.relay.deleteDecryptionKey(key); this.store.deleteDecryptionKey(key); + this.filter.deleteDecryptionKey(key); } /** @@ -381,6 +398,16 @@ export class Waku { promises.push(lightPushPromise); } + if (protocols.includes(Protocols.Filter)) { + const filterPromise = (async (): Promise => { + for await (const peer of this.filter.peers) { + dbg("Filter peer found", peer.id.toB58String()); + break; + } + })(); + promises.push(filterPromise); + } + if (timeoutMs) { await rejectOnTimeout( Promise.all(promises), diff --git a/src/lib/waku_filter/filter_rpc.ts b/src/lib/waku_filter/filter_rpc.ts new file mode 100644 index 0000000000..6ce3b7bc23 --- /dev/null +++ b/src/lib/waku_filter/filter_rpc.ts @@ -0,0 +1,58 @@ +import { Reader } from "protobufjs/minimal"; +import { v4 as uuid } from "uuid"; + +import * as proto from "../../proto/waku/v2/filter"; + +export type ContentFilter = { + contentTopic: string; +}; + +/** + * FilterRPC represents a message conforming to the Waku Filter protocol + */ +export class FilterRPC { + public constructor(public proto: proto.FilterRPC) {} + + static createRequest( + topic: string, + contentFilters: ContentFilter[], + requestId?: string, + subscribe = true + ): FilterRPC { + return new FilterRPC({ + requestId: requestId || uuid(), + request: { + subscribe, + topic, + contentFilters, + }, + push: undefined, + }); + } + + /** + * + * @param bytes Uint8Array of bytes from a FilterRPC message + * @returns FilterRPC + */ + static decode(bytes: Uint8Array): FilterRPC { + const res = proto.FilterRPC.decode(Reader.create(bytes)); + return new FilterRPC(res); + } + + /** + * Encode the current FilterRPC request to bytes + * @returns Uint8Array + */ + encode(): Uint8Array { + return proto.FilterRPC.encode(this.proto).finish(); + } + + get push(): proto.MessagePush | undefined { + return this.proto.push; + } + + get requestId(): string { + return this.proto.requestId; + } +} diff --git a/src/lib/waku_filter/index.node.spec.ts b/src/lib/waku_filter/index.node.spec.ts new file mode 100644 index 0000000000..a9dc05e3dc --- /dev/null +++ b/src/lib/waku_filter/index.node.spec.ts @@ -0,0 +1,106 @@ +import { expect } from "chai"; +import debug from "debug"; + +import { makeLogFileName, NOISE_KEY_1, Nwaku } from "../../test_utils"; +import { delay } from "../../test_utils/delay"; +import { Protocols, Waku } from "../waku"; +import { WakuMessage } from "../waku_message"; + +const log = debug("waku:test"); + +const TestContentTopic = "/test/1/waku-filter"; + +describe("Waku Filter", () => { + let waku: Waku; + let nwaku: Nwaku; + + afterEach(async function () { + !!nwaku && nwaku.stop(); + !!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e)); + }); + + beforeEach(async function () { + this.timeout(10000); + nwaku = new Nwaku(makeLogFileName(this)); + await nwaku.start({ filter: true }); + waku = await Waku.create({ + staticNoiseKey: NOISE_KEY_1, + libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }, + }); + await waku.dial(await nwaku.getMultiaddrWithId()); + await waku.waitForRemotePeer([Protocols.Filter, Protocols.Relay]); + }); + + it("creates a subscription", async function () { + this.timeout(10000); + + let messageCount = 0; + const messageText = "Filtering works!"; + const callback = (msg: WakuMessage): void => { + log("Got a message"); + messageCount++; + expect(msg.contentTopic).to.eq(TestContentTopic); + expect(msg.payloadAsUtf8).to.eq(messageText); + }; + await waku.filter.subscribe(callback, [TestContentTopic]); + const message = await WakuMessage.fromUtf8String( + messageText, + TestContentTopic + ); + await waku.relay.send(message); + while (messageCount === 0) { + await delay(250); + } + expect(messageCount).to.eq(1); + }); + + it("handles multiple messages", async function () { + this.timeout(10000); + + let messageCount = 0; + const callback = (msg: WakuMessage): void => { + messageCount++; + expect(msg.contentTopic).to.eq(TestContentTopic); + }; + await waku.filter.subscribe(callback, [TestContentTopic]); + await waku.relay.send( + await WakuMessage.fromUtf8String("Filtering works!", TestContentTopic) + ); + await waku.relay.send( + await WakuMessage.fromUtf8String( + "Filtering still works!", + TestContentTopic + ) + ); + while (messageCount < 2) { + await delay(250); + } + expect(messageCount).to.eq(2); + }); + + it("unsubscribes", async function () { + let messageCount = 0; + const callback = (): void => { + messageCount++; + }; + const unsubscribe = await waku.filter.subscribe(callback, [ + TestContentTopic, + ]); + await waku.relay.send( + await WakuMessage.fromUtf8String( + "This should be received", + TestContentTopic + ) + ); + await delay(100); + await unsubscribe(); + await waku.relay.send( + await WakuMessage.fromUtf8String( + "This should not be received", + TestContentTopic + ) + ); + await delay(100); + expect(messageCount).to.eq(1); + }); +}); diff --git a/src/lib/waku_filter/index.ts b/src/lib/waku_filter/index.ts new file mode 100644 index 0000000000..dbc09cc37e --- /dev/null +++ b/src/lib/waku_filter/index.ts @@ -0,0 +1,242 @@ +import debug from "debug"; +import lp from "it-length-prefixed"; +import { pipe } from "it-pipe"; +import Libp2p, { MuxedStream } from "libp2p"; +import { Peer, PeerId } from "libp2p/src/peer-store"; + +import { WakuMessage as WakuMessageProto } from "../../proto/waku/v2/message"; +import { getPeersForProtocol, selectRandomPeer } from "../select_peer"; +import { hexToBytes } from "../utils"; +import { DefaultPubSubTopic } from "../waku"; +import { DecryptionMethod, WakuMessage } from "../waku_message"; + +import { ContentFilter, FilterRPC } from "./filter_rpc"; + +export const FilterCodec = "/vac/waku/filter/2.0.0-beta1"; + +const log = debug("waku:filter"); + +type FilterSubscriptionOpts = { + /** + * The Pubsub topic for the subscription + */ + pubsubTopic?: string; + /** + * Optionally specify a PeerId for the subscription. If not included, will use a random peer. + */ + peerId?: PeerId; +}; + +type FilterCallback = (msg: WakuMessage) => void | Promise; + +type UnsubscribeFunction = () => Promise; + +/** + * Implements client side of the [Waku v2 Filter protocol](https://rfc.vac.dev/spec/12/). + * + * Note this currently only works in NodeJS when the Waku node is listening on a port, see: + * - https://github.com/status-im/go-waku/issues/245 + * - https://github.com/status-im/nwaku/issues/948 + */ +export class WakuFilter { + private subscriptions: Map; + public decryptionKeys: Map< + Uint8Array, + { method?: DecryptionMethod; contentTopics?: string[] } + >; + + constructor(public libp2p: Libp2p) { + this.subscriptions = new Map(); + this.decryptionKeys = new Map(); + this.libp2p.handle(FilterCodec, this.onRequest.bind(this)); + } + + /** + * @param contentTopics Array of ContentTopics to subscribe to. If empty, no messages will be returned from the filter. + * @param callback A function that will be called on each message returned by the filter. + * @param opts The FilterSubscriptionOpts used to narrow which messages are returned, and which peer to connect to. + * @returns Unsubscribe function that can be used to end the subscription. + */ + async subscribe( + callback: FilterCallback, + contentTopics: string[], + opts?: FilterSubscriptionOpts + ): Promise { + const topic = opts?.pubsubTopic || DefaultPubSubTopic; + const contentFilters = contentTopics.map((contentTopic) => ({ + contentTopic, + })); + const request = FilterRPC.createRequest( + topic, + contentFilters, + undefined, + true + ); + + const peer = await this.getPeer(opts?.peerId); + const stream = await this.newStream(peer); + + try { + await pipe([request.encode()], lp.encode(), stream); + } catch (e) { + log( + "Error subscribing to peer ", + peer.id.toB58String(), + "for content topics", + contentTopics, + ": ", + e + ); + throw e; + } + + this.addCallback(request.requestId, callback); + + return async () => { + await this.unsubscribe(topic, contentFilters, request.requestId, peer); + this.removeCallback(request.requestId); + }; + } + + private async onRequest({ stream }: Libp2p.HandlerProps): Promise { + log("Receiving message push"); + try { + await pipe( + stream.source, + lp.decode(), + async (source: AsyncIterable) => { + for await (const bytes of source) { + const res = FilterRPC.decode(bytes.slice()); + if (res.push?.messages?.length) { + await this.pushMessages(res.requestId, res.push.messages); + } + } + } + ); + } catch (e) { + log("Error decoding message", e); + } + } + + private async pushMessages( + requestId: string, + messages: WakuMessageProto[] + ): Promise { + const callback = this.subscriptions.get(requestId); + if (!callback) { + log(`No callback registered for request ID ${requestId}`); + return; + } + + const decryptionKeys = Array.from(this.decryptionKeys).map( + ([key, { method, contentTopics }]) => { + return { + key, + method, + contentTopics, + }; + } + ); + + for (const message of messages) { + const decoded = await WakuMessage.decodeProto(message, decryptionKeys); + if (!decoded) { + log("Not able to decode message"); + continue; + } + callback(decoded); + } + } + + private addCallback(requestId: string, callback: FilterCallback): void { + this.subscriptions.set(requestId, callback); + } + + private removeCallback(requestId: string): void { + this.subscriptions.delete(requestId); + } + + private async unsubscribe( + topic: string, + contentFilters: ContentFilter[], + requestId: string, + peer: Peer + ): Promise { + const unsubscribeRequest = FilterRPC.createRequest( + topic, + contentFilters, + requestId, + false + ); + + const stream = await this.newStream(peer); + try { + await pipe([unsubscribeRequest.encode()], lp.encode(), stream.sink); + } catch (e) { + log("Error unsubscribing", e); + throw e; + } + } + + private async newStream(peer: Peer): Promise { + const connection = this.libp2p.connectionManager.get(peer.id); + if (!connection) { + throw new Error("Failed to get a connection to the peer"); + } + + const { stream } = await connection.newStream(FilterCodec); + return stream; + } + + private async getPeer(peerId?: PeerId): Promise { + let peer; + if (peerId) { + peer = await this.libp2p.peerStore.get(peerId); + if (!peer) { + throw new Error( + `Failed to retrieve connection details for provided peer in peer store: ${peerId.toB58String()}` + ); + } + } else { + peer = await this.randomPeer; + if (!peer) { + throw new Error( + "Failed to find known peer that registers waku filter protocol" + ); + } + } + return peer; + } + + /** + * Register a decryption key to attempt decryption of messages received in any + * subsequent [[subscribe]] call. This can either be a private key for + * asymmetric encryption or a symmetric key. [[WakuStore]] will attempt to + * decrypt messages using both methods. + * + * Strings must be in hex format. + */ + addDecryptionKey( + key: Uint8Array | string, + options?: { method?: DecryptionMethod; contentTopics?: string[] } + ): void { + this.decryptionKeys.set(hexToBytes(key), options ?? {}); + } + + /** + * Delete a decryption key so that it cannot be used in future [[subscribe]] calls + * + * Strings must be in hex format. + */ + deleteDecryptionKey(key: Uint8Array | string): void { + this.decryptionKeys.delete(hexToBytes(key)); + } + + get peers(): AsyncIterable { + return getPeersForProtocol(this.libp2p, [FilterCodec]); + } + + get randomPeer(): Promise { + return selectRandomPeer(this.peers); + } +} diff --git a/src/proto/waku/v2/filter.ts b/src/proto/waku/v2/filter.ts new file mode 100644 index 0000000000..97ab03a3d9 --- /dev/null +++ b/src/proto/waku/v2/filter.ts @@ -0,0 +1,358 @@ +/* eslint-disable */ +import Long from "long"; +import _m0 from "protobufjs/minimal"; +import { WakuMessage } from "../../waku/v2/message"; + +export const protobufPackage = "waku.v2"; + +export interface FilterRequest { + subscribe: boolean; + topic: string; + contentFilters: FilterRequest_ContentFilter[]; +} + +export interface FilterRequest_ContentFilter { + contentTopic: string; +} + +export interface MessagePush { + messages: WakuMessage[]; +} + +export interface FilterRPC { + requestId: string; + request: FilterRequest | undefined; + push: MessagePush | undefined; +} + +function createBaseFilterRequest(): FilterRequest { + return { subscribe: false, topic: "", contentFilters: [] }; +} + +export const FilterRequest = { + encode( + message: FilterRequest, + writer: _m0.Writer = _m0.Writer.create() + ): _m0.Writer { + if (message.subscribe === true) { + writer.uint32(8).bool(message.subscribe); + } + if (message.topic !== "") { + writer.uint32(18).string(message.topic); + } + for (const v of message.contentFilters) { + FilterRequest_ContentFilter.encode(v!, writer.uint32(26).fork()).ldelim(); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): FilterRequest { + const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseFilterRequest(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + message.subscribe = reader.bool(); + break; + case 2: + message.topic = reader.string(); + break; + case 3: + message.contentFilters.push( + FilterRequest_ContentFilter.decode(reader, reader.uint32()) + ); + break; + default: + reader.skipType(tag & 7); + break; + } + } + return message; + }, + + fromJSON(object: any): FilterRequest { + return { + subscribe: isSet(object.subscribe) ? Boolean(object.subscribe) : false, + topic: isSet(object.topic) ? String(object.topic) : "", + contentFilters: Array.isArray(object?.contentFilters) + ? object.contentFilters.map((e: any) => + FilterRequest_ContentFilter.fromJSON(e) + ) + : [], + }; + }, + + toJSON(message: FilterRequest): unknown { + const obj: any = {}; + message.subscribe !== undefined && (obj.subscribe = message.subscribe); + message.topic !== undefined && (obj.topic = message.topic); + if (message.contentFilters) { + obj.contentFilters = message.contentFilters.map((e) => + e ? FilterRequest_ContentFilter.toJSON(e) : undefined + ); + } else { + obj.contentFilters = []; + } + return obj; + }, + + fromPartial, I>>( + object: I + ): FilterRequest { + const message = createBaseFilterRequest(); + message.subscribe = object.subscribe ?? false; + message.topic = object.topic ?? ""; + message.contentFilters = + object.contentFilters?.map((e) => + FilterRequest_ContentFilter.fromPartial(e) + ) || []; + return message; + }, +}; + +function createBaseFilterRequest_ContentFilter(): FilterRequest_ContentFilter { + return { contentTopic: "" }; +} + +export const FilterRequest_ContentFilter = { + encode( + message: FilterRequest_ContentFilter, + writer: _m0.Writer = _m0.Writer.create() + ): _m0.Writer { + if (message.contentTopic !== "") { + writer.uint32(10).string(message.contentTopic); + } + return writer; + }, + + decode( + input: _m0.Reader | Uint8Array, + length?: number + ): FilterRequest_ContentFilter { + const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseFilterRequest_ContentFilter(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + message.contentTopic = reader.string(); + break; + default: + reader.skipType(tag & 7); + break; + } + } + return message; + }, + + fromJSON(object: any): FilterRequest_ContentFilter { + return { + contentTopic: isSet(object.contentTopic) + ? String(object.contentTopic) + : "", + }; + }, + + toJSON(message: FilterRequest_ContentFilter): unknown { + const obj: any = {}; + message.contentTopic !== undefined && + (obj.contentTopic = message.contentTopic); + return obj; + }, + + fromPartial, I>>( + object: I + ): FilterRequest_ContentFilter { + const message = createBaseFilterRequest_ContentFilter(); + message.contentTopic = object.contentTopic ?? ""; + return message; + }, +}; + +function createBaseMessagePush(): MessagePush { + return { messages: [] }; +} + +export const MessagePush = { + encode( + message: MessagePush, + writer: _m0.Writer = _m0.Writer.create() + ): _m0.Writer { + for (const v of message.messages) { + WakuMessage.encode(v!, writer.uint32(10).fork()).ldelim(); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): MessagePush { + const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseMessagePush(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + message.messages.push(WakuMessage.decode(reader, reader.uint32())); + break; + default: + reader.skipType(tag & 7); + break; + } + } + return message; + }, + + fromJSON(object: any): MessagePush { + return { + messages: Array.isArray(object?.messages) + ? object.messages.map((e: any) => WakuMessage.fromJSON(e)) + : [], + }; + }, + + toJSON(message: MessagePush): unknown { + const obj: any = {}; + if (message.messages) { + obj.messages = message.messages.map((e) => + e ? WakuMessage.toJSON(e) : undefined + ); + } else { + obj.messages = []; + } + return obj; + }, + + fromPartial, I>>( + object: I + ): MessagePush { + const message = createBaseMessagePush(); + message.messages = + object.messages?.map((e) => WakuMessage.fromPartial(e)) || []; + return message; + }, +}; + +function createBaseFilterRPC(): FilterRPC { + return { requestId: "", request: undefined, push: undefined }; +} + +export const FilterRPC = { + encode( + message: FilterRPC, + writer: _m0.Writer = _m0.Writer.create() + ): _m0.Writer { + if (message.requestId !== "") { + writer.uint32(10).string(message.requestId); + } + if (message.request !== undefined) { + FilterRequest.encode(message.request, writer.uint32(18).fork()).ldelim(); + } + if (message.push !== undefined) { + MessagePush.encode(message.push, writer.uint32(26).fork()).ldelim(); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): FilterRPC { + const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseFilterRPC(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + message.requestId = reader.string(); + break; + case 2: + message.request = FilterRequest.decode(reader, reader.uint32()); + break; + case 3: + message.push = MessagePush.decode(reader, reader.uint32()); + break; + default: + reader.skipType(tag & 7); + break; + } + } + return message; + }, + + fromJSON(object: any): FilterRPC { + return { + requestId: isSet(object.requestId) ? String(object.requestId) : "", + request: isSet(object.request) + ? FilterRequest.fromJSON(object.request) + : undefined, + push: isSet(object.push) ? MessagePush.fromJSON(object.push) : undefined, + }; + }, + + toJSON(message: FilterRPC): unknown { + const obj: any = {}; + message.requestId !== undefined && (obj.requestId = message.requestId); + message.request !== undefined && + (obj.request = message.request + ? FilterRequest.toJSON(message.request) + : undefined); + message.push !== undefined && + (obj.push = message.push ? MessagePush.toJSON(message.push) : undefined); + return obj; + }, + + fromPartial, I>>( + object: I + ): FilterRPC { + const message = createBaseFilterRPC(); + message.requestId = object.requestId ?? ""; + message.request = + object.request !== undefined && object.request !== null + ? FilterRequest.fromPartial(object.request) + : undefined; + message.push = + object.push !== undefined && object.push !== null + ? MessagePush.fromPartial(object.push) + : undefined; + return message; + }, +}; + +type Builtin = + | Date + | Function + | Uint8Array + | string + | number + | boolean + | undefined; + +export type DeepPartial = T extends Builtin + ? T + : T extends Long + ? string | number | Long + : T extends Array + ? Array> + : T extends ReadonlyArray + ? ReadonlyArray> + : T extends {} + ? { [K in keyof T]?: DeepPartial } + : Partial; + +type KeysOfUnion = T extends T ? keyof T : never; +export type Exact = P extends Builtin + ? P + : P & { [K in keyof P]: Exact } & Record< + Exclude>, + never + >; + +if (_m0.util.Long !== Long) { + _m0.util.Long = Long as any; + _m0.configure(); +} + +function isSet(value: any): boolean { + return value !== null && value !== undefined; +}