wip: implementation for the new filter protocol

This commit is contained in:
danisharora099 2023-04-04 22:23:09 +05:30
parent eca56c054e
commit a7f6094911
No known key found for this signature in database
GPG Key ID: FBD2BF500037F135
2 changed files with 445 additions and 0 deletions

View File

@ -0,0 +1,275 @@
import type { Libp2p } from "@libp2p/interface-libp2p";
import type { Peer } from "@libp2p/interface-peer-store";
import type { IncomingStreamData } from "@libp2p/interface-registrar";
import type {
ActiveSubscriptions,
Callback,
IDecodedMessage,
IDecoder,
IFilter,
ProtocolCreateOptions,
ProtocolOptions,
} from "@waku/interfaces";
import { WakuMessage as WakuMessageProto } from "@waku/proto";
import debug from "debug";
import all from "it-all";
import * as lp from "it-length-prefixed";
import { pipe } from "it-pipe";
import { Uint8ArrayList } from "uint8arraylist";
import { BaseProtocol } from "../../base_protocol.js";
import { DefaultPubSubTopic } from "../../constants.js";
import { groupByContentTopic } from "../../group_by.js";
import { toProtoMessage } from "../../to_proto_message.js";
import {
ContentFilter,
FilterPushRpc,
FilterSubscribeResponse,
FilterSubscribeRpc,
} from "./rpc.js";
export type UnsubscribeFunction = () => Promise<void>;
export type RequestID = string;
type Subscription<T extends IDecodedMessage> = {
decoders: IDecoder<T>[];
callback: Callback<T>;
pubSubTopic: string;
};
const FilterCodecs = {
SUBSCRIBE: "/vac/waku/filter-subscribe/2.0.0-beta1",
PUSH: "/vac/waku/filter-push/2.0.0-beta1",
};
const log = debug("waku:filter_v2");
/**
* Implements client side of the [Waku v2 Filter protocol](https://rfc.vac.dev/spec/12/).
*
* Note this currently only works in NodeJS when the Waku node is listening on a port, see:
* - https://github.com/status-im/go-waku/issues/245
* - https://github.com/status-im/nwaku/issues/948
*/
class FilterV2 extends BaseProtocol implements IFilter {
options: ProtocolCreateOptions;
private subscriptions: Map<RequestID, unknown>;
constructor(public libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(
Object.values(FilterCodecs),
libp2p.peerStore,
libp2p.getConnections.bind(libp2p)
);
this.options = options ?? {};
this.subscriptions = new Map();
this.libp2p
.handle(this.multicodecs, this.onRequest.bind(this))
.catch((e) => log("Failed to register filter protocol", e));
}
/**
* @param decoders Decoder or array of Decoders to use to decode messages, it also specifies the content topics.
* @param callback A function that will be called on each message returned by the filter.
* @param opts The FilterSubscriptionOpts used to narrow which messages are returned, and which peer to connect to.
* @returns Unsubscribe function that can be used to end the subscription.
*/
async subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>,
opts?: ProtocolOptions
): Promise<UnsubscribeFunction> {
const decodersArray = Array.isArray(decoders) ? decoders : [decoders];
const { pubSubTopic = DefaultPubSubTopic } = this.options;
const contentTopics = Array.from(groupByContentTopic(decodersArray).keys());
const contentFilters = contentTopics.map((contentTopic) => ({
contentTopic,
}));
const request = FilterSubscribeRpc.createSubscribeRequest(
pubSubTopic,
contentFilters.map((contentFilter) => contentFilter.contentTopic),
undefined
);
const { requestId } = request;
const peer = await this.getPeer(opts?.peerId);
const stream = await this.newStream(peer);
try {
const res = await pipe(
[request.encode()],
lp.encode(),
stream,
lp.decode(),
async (source) => await all(source)
);
const bytes = new Uint8ArrayList();
res.forEach((chunk) => {
bytes.append(chunk);
});
const filterResponse = FilterSubscribeResponse.decode(bytes.slice());
const { statusCode, statusDesc, requestId } = filterResponse;
if (statusCode < 200 || statusCode >= 300) {
throw new Error(
`Filter subscribe request failed with status code ${statusCode} and description ${statusDesc} for request ${requestId}`
);
}
log("response", res);
} catch (e) {
log(
"Error subscribing to peer ",
peer.id.toString(),
"for content topics",
contentTopics,
": ",
e
);
throw e;
}
const subscription: Subscription<T> = {
callback,
decoders: decodersArray,
pubSubTopic,
};
this.subscriptions.set(requestId, subscription);
return async () => {
await this.unsubscribe(pubSubTopic, contentFilters, requestId, peer);
this.subscriptions.delete(requestId);
};
}
public getActiveSubscriptions(): ActiveSubscriptions {
const map: ActiveSubscriptions = new Map();
const subscriptions = this.subscriptions as Map<
RequestID,
Subscription<IDecodedMessage>
>;
for (const item of subscriptions.values()) {
const values = map.get(item.pubSubTopic) || [];
const nextValues = item.decoders.map((decoder) => decoder.contentTopic);
map.set(item.pubSubTopic, [...values, ...nextValues]);
}
return map;
}
private onRequest(streamData: IncomingStreamData): void {
log("Receiving message push");
try {
pipe(streamData.stream, lp.decode(), async (source) => {
for await (const bytes of source) {
const res = FilterPushRpc.decode(bytes.slice());
const { wakuMessage, pubsubTopic } = res;
if (!wakuMessage) {
// this should never happen as:
// Each `MessagePush` MUST contain one (and only one) `waku_message`.
// defined https://github.com/vacp2p/rfc/pull/562/files#
throw new Error("No waku message found");
}
await this.pushMessages();
// if (res.requestId && res.push?.messages?.length) {
// await this.pushMessages(res.requestId, res.push.messages);
// }
}
}).then(
() => {
log("Receiving pipe closed.");
},
(e) => {
log("Error with receiving pipe", e);
}
);
} catch (e) {
log("Error decoding message", e);
}
}
private async pushMessages<T extends IDecodedMessage>(
requestId: string,
messages: WakuMessageProto[]
): Promise<void> {
// const subscription = this.subscriptions.get(requestId) as
// | Subscription<T>
// | undefined;
// if (!subscription) {
// log(`No subscription locally registered for request ID ${requestId}`);
// return;
// }
const { decoders, callback, pubSubTopic } = subscription;
if (!decoders || !decoders.length) {
log(`No decoder registered for request ID ${requestId}`);
return;
}
for (const protoMessage of messages) {
const contentTopic = protoMessage.contentTopic;
if (!contentTopic) {
log("Message has no content topic, skipping");
return;
}
let didDecodeMsg = false;
// We don't want to wait for decoding failure, just attempt to decode
// all messages and do the call back on the one that works
// noinspection ES6MissingAwait
decoders.forEach(async (dec: IDecoder<T>) => {
if (didDecodeMsg) return;
const decoded = await dec.fromProtoObj(
pubSubTopic,
toProtoMessage(protoMessage)
);
if (!decoded) {
log("Not able to decode message");
return;
}
// This is just to prevent more decoding attempt
// TODO: Could be better if we were to abort promises
didDecodeMsg = Boolean(decoded);
await callback(decoded);
});
}
}
private async unsubscribe(
topic: string,
contentFilters: ContentFilter[],
requestId: string,
peer: Peer
): Promise<void> {
const unsubscribeRequest = FilterSubscribeRpc.createUnsubscribeRequest(
topic,
contentFilters.map((contentFilter) => contentFilter.contentTopic),
requestId
);
const stream = await this.newStream(peer);
try {
await pipe([unsubscribeRequest.encode()], lp.encode(), stream.sink);
} catch (e) {
log("Error unsubscribing", e);
throw e;
}
}
}
export function wakuFilter(
init: Partial<ProtocolCreateOptions> = {}
): (libp2p: Libp2p) => IFilter {
return (libp2p: Libp2p) => new FilterV2(libp2p, init);
}

View File

@ -0,0 +1,170 @@
import { proto_filter_v2 as proto } from "@waku/proto";
import { v4 as uuid } from "uuid";
/**
* FilterPushRPC represents a message conforming to the Waku FilterPush protocol.
* Protocol documentation: https://rfc.vac.dev/spec/12/
*/
export class FilterPushRpc {
public constructor(public proto: proto.MessagePushV2) {}
/**
* Create a FilterPushRPC object with the provided parameters.
* @param wakuMessage The WakuMessage to be pushed.
* @param pubsubTopic The pubsub topic on which the message was published.
* @returns FilterPushRpc
*/
static create(wakuMessage: Uint8Array, pubsubTopic: string): FilterPushRpc {
const message = proto.WakuMessage.decode(wakuMessage);
return new FilterPushRpc({
wakuMessage: {
payload: wakuMessage,
contentTopic: message.contentTopic,
},
pubsubTopic: pubsubTopic,
});
}
/**
* Decode the given bytes into a FilterPushRpc object.
* @param bytes Uint8Array of bytes from a FilterPushRPC message.
* @returns FilterPushRpc
*/
static decode(bytes: Uint8Array): FilterPushRpc {
const res = proto.MessagePushV2.decode(bytes);
return new FilterPushRpc(res);
}
/**
* Encode the current FilterPushRpc object to bytes.
* @returns Uint8Array
*/
encode(): Uint8Array {
return proto.MessagePushV2.encode(this.proto);
}
/**
* Get the WakuMessage from the FilterPushRpc object.
* @returns WakuMessage as a Uint8Array
*/
get wakuMessage(): Uint8Array | undefined {
return this.proto.wakuMessage?.payload;
}
/**
* Get the pubsub topic from the FilterPushRpc object.
* @returns string
*/
get pubsubTopic(): string {
return this.proto.pubsubTopic;
}
}
export type ContentFilter = {
contentTopic: string;
};
export class FilterSubscribeRpc {
public constructor(public proto: proto.FilterSubscribeRequest) {}
static createSubscribeRequest(
pubsubTopic: string,
contentTopics: string[],
requestId?: string
): FilterSubscribeRpc {
return new FilterSubscribeRpc({
requestId: requestId || uuid(),
filterSubscribeType:
proto.FilterSubscribeRequest.FilterSubscribeType.SUBSCRIBE,
pubsubTopic,
contentTopics,
});
}
static createUnsubscribeRequest(
pubsubTopic: string,
contentTopics: string[],
requestId?: string
): FilterSubscribeRpc {
return new FilterSubscribeRpc({
requestId: requestId || uuid(),
filterSubscribeType:
proto.FilterSubscribeRequest.FilterSubscribeType.UNSUBSCRIBE,
pubsubTopic,
contentTopics,
});
}
static createUnsubscribeAllRequest(
pubsubTopic: string,
requestId?: string
): FilterSubscribeRpc {
return new FilterSubscribeRpc({
requestId: requestId || uuid(),
filterSubscribeType:
proto.FilterSubscribeRequest.FilterSubscribeType.UNSUBSCRIBE_ALL,
pubsubTopic,
contentTopics: [],
});
}
static createSubscriberPingRequest(requestId?: string): FilterSubscribeRpc {
return new FilterSubscribeRpc({
requestId: requestId || uuid(),
filterSubscribeType:
proto.FilterSubscribeRequest.FilterSubscribeType.SUBSCRIBER_PING,
pubsubTopic: "",
contentTopics: [],
});
}
static decode(bytes: Uint8Array): FilterSubscribeRpc {
const res = proto.FilterSubscribeRequest.decode(bytes);
return new FilterSubscribeRpc(res);
}
encode(): Uint8Array {
return proto.FilterSubscribeRequest.encode(this.proto);
}
get filterSubscribeType(): proto.FilterSubscribeRequest.FilterSubscribeType {
return this.proto.filterSubscribeType;
}
get requestId(): string {
return this.proto.requestId;
}
get pubsubTopic(): string {
return this.proto.pubsubTopic;
}
get contentTopics(): string[] {
return this.proto.contentTopics;
}
}
export class FilterSubscribeResponse {
public constructor(public proto: proto.FilterSubscribeResponse) {}
static decode(bytes: Uint8Array): FilterSubscribeResponse {
const res = proto.FilterSubscribeResponse.decode(bytes);
return new FilterSubscribeResponse(res);
}
encode(): Uint8Array {
return proto.FilterSubscribeResponse.encode(this.proto);
}
get statusCode(): number {
return this.proto.statusCode;
}
get statusDesc(): string {
return this.proto.statusDesc;
}
get requestId(): string {
return this.proto.requestId;
}
}