Merge pull request #762 from status-im/waku-filter

Implement Waku Filter protocol
This commit is contained in:
Franck R 2022-05-27 11:09:50 +10:00 committed by GitHub
commit ac56778950
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 842 additions and 2 deletions

View File

@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- `waitForRemotePeer` now accepts a `timeoutMs` parameter that rejects the promise if it is reached. By default, no timeout is applied.
- **Experimental** support for the [Waku Filter](https://rfc.vac.dev/spec/12/) protocol (client side) added, currently only works in NodeJS.
### Changed

View File

@ -0,0 +1,25 @@
syntax = "proto3";
package waku.v2;
import "waku/v2/message.proto";
message FilterRequest {
bool subscribe = 1;
string topic = 2;
repeated ContentFilter content_filters = 3;
message ContentFilter {
string content_topic = 1;
}
}
message MessagePush {
repeated WakuMessage messages = 1;
}
message FilterRPC {
string request_id = 1;
FilterRequest request = 2;
MessagePush push = 3;
}

View File

@ -305,4 +305,27 @@ describe("Wait for remote peer / get peers", function () {
expect(nimPeerId).to.not.be.undefined;
expect(peers.includes(nimPeerId as string)).to.be.true;
});
it("Filter", async function () {
this.timeout(20_000);
nwaku = new Nwaku(makeLogFileName(this));
await nwaku.start({ filter: true });
const multiAddrWithId = await nwaku.getMultiaddrWithId();
waku = await Waku.create({
staticNoiseKey: NOISE_KEY_1,
});
await waku.dial(multiAddrWithId);
await waku.waitForRemotePeer([Protocols.Filter]);
const peers = [];
for await (const peer of waku.filter.peers) {
peers.push(peer.id.toB58String());
}
const nimPeerId = multiAddrWithId.getPeerId();
expect(nimPeerId).to.not.be.undefined;
expect(peers.includes(nimPeerId as string)).to.be.true;
});
});

View File

@ -17,6 +17,7 @@ import { Multiaddr, multiaddr } from "multiaddr";
import PeerId from "peer-id";
import { Bootstrap, BootstrapOptions } from "./discovery";
import { FilterCodec, WakuFilter } from "./waku_filter";
import { LightPushCodec, WakuLightPush } from "./waku_light_push";
import { DecryptionMethod, WakuMessage } from "./waku_message";
import { RelayCodecs, WakuRelay } from "./waku_relay";
@ -39,6 +40,7 @@ export enum Protocols {
Relay = "relay",
Store = "store",
LightPush = "lightpush",
Filter = "filter",
}
export interface CreateOptions {
@ -102,6 +104,7 @@ export class Waku {
public libp2p: Libp2p;
public relay: WakuRelay;
public store: WakuStore;
public filter: WakuFilter;
public lightPush: WakuLightPush;
private pingKeepAliveTimers: {
@ -115,11 +118,13 @@ export class Waku {
options: CreateOptions,
libp2p: Libp2p,
store: WakuStore,
lightPush: WakuLightPush
lightPush: WakuLightPush,
filter: WakuFilter
) {
this.libp2p = libp2p;
this.relay = libp2p.pubsub as unknown as WakuRelay;
this.store = store;
this.filter = filter;
this.lightPush = lightPush;
this.pingKeepAliveTimers = {};
this.relayKeepAliveTimers = {};
@ -220,10 +225,17 @@ export class Waku {
pubSubTopic: options?.pubSubTopic,
});
const wakuLightPush = new WakuLightPush(libp2p);
const wakuFilter = new WakuFilter(libp2p);
await libp2p.start();
return new Waku(options ? options : {}, libp2p, wakuStore, wakuLightPush);
return new Waku(
options ? options : {},
libp2p,
wakuStore,
wakuLightPush,
wakuFilter
);
}
/**
@ -253,6 +265,9 @@ export class Waku {
if (_protocols.includes(Protocols.LightPush)) {
codecs.push(LightPushCodec);
}
if (_protocols.includes(Protocols.Filter)) {
codecs.push(FilterCodec);
}
return this.libp2p.dialProtocol(peer, codecs);
}
@ -297,6 +312,7 @@ export class Waku {
): void {
this.relay.addDecryptionKey(key, options);
this.store.addDecryptionKey(key, options);
this.filter.addDecryptionKey(key, options);
}
/**
@ -308,6 +324,7 @@ export class Waku {
deleteDecryptionKey(key: Uint8Array | string): void {
this.relay.deleteDecryptionKey(key);
this.store.deleteDecryptionKey(key);
this.filter.deleteDecryptionKey(key);
}
/**
@ -381,6 +398,16 @@ export class Waku {
promises.push(lightPushPromise);
}
if (protocols.includes(Protocols.Filter)) {
const filterPromise = (async (): Promise<void> => {
for await (const peer of this.filter.peers) {
dbg("Filter peer found", peer.id.toB58String());
break;
}
})();
promises.push(filterPromise);
}
if (timeoutMs) {
await rejectOnTimeout(
Promise.all(promises),

View File

@ -0,0 +1,58 @@
import { Reader } from "protobufjs/minimal";
import { v4 as uuid } from "uuid";
import * as proto from "../../proto/waku/v2/filter";
export type ContentFilter = {
contentTopic: string;
};
/**
* FilterRPC represents a message conforming to the Waku Filter protocol
*/
export class FilterRPC {
public constructor(public proto: proto.FilterRPC) {}
static createRequest(
topic: string,
contentFilters: ContentFilter[],
requestId?: string,
subscribe = true
): FilterRPC {
return new FilterRPC({
requestId: requestId || uuid(),
request: {
subscribe,
topic,
contentFilters,
},
push: undefined,
});
}
/**
*
* @param bytes Uint8Array of bytes from a FilterRPC message
* @returns FilterRPC
*/
static decode(bytes: Uint8Array): FilterRPC {
const res = proto.FilterRPC.decode(Reader.create(bytes));
return new FilterRPC(res);
}
/**
* Encode the current FilterRPC request to bytes
* @returns Uint8Array
*/
encode(): Uint8Array {
return proto.FilterRPC.encode(this.proto).finish();
}
get push(): proto.MessagePush | undefined {
return this.proto.push;
}
get requestId(): string {
return this.proto.requestId;
}
}

View File

@ -0,0 +1,106 @@
import { expect } from "chai";
import debug from "debug";
import { makeLogFileName, NOISE_KEY_1, Nwaku } from "../../test_utils";
import { delay } from "../../test_utils/delay";
import { Protocols, Waku } from "../waku";
import { WakuMessage } from "../waku_message";
const log = debug("waku:test");
const TestContentTopic = "/test/1/waku-filter";
describe("Waku Filter", () => {
let waku: Waku;
let nwaku: Nwaku;
afterEach(async function () {
!!nwaku && nwaku.stop();
!!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e));
});
beforeEach(async function () {
this.timeout(10000);
nwaku = new Nwaku(makeLogFileName(this));
await nwaku.start({ filter: true });
waku = await Waku.create({
staticNoiseKey: NOISE_KEY_1,
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } },
});
await waku.dial(await nwaku.getMultiaddrWithId());
await waku.waitForRemotePeer([Protocols.Filter, Protocols.Relay]);
});
it("creates a subscription", async function () {
this.timeout(10000);
let messageCount = 0;
const messageText = "Filtering works!";
const callback = (msg: WakuMessage): void => {
log("Got a message");
messageCount++;
expect(msg.contentTopic).to.eq(TestContentTopic);
expect(msg.payloadAsUtf8).to.eq(messageText);
};
await waku.filter.subscribe(callback, [TestContentTopic]);
const message = await WakuMessage.fromUtf8String(
messageText,
TestContentTopic
);
await waku.relay.send(message);
while (messageCount === 0) {
await delay(250);
}
expect(messageCount).to.eq(1);
});
it("handles multiple messages", async function () {
this.timeout(10000);
let messageCount = 0;
const callback = (msg: WakuMessage): void => {
messageCount++;
expect(msg.contentTopic).to.eq(TestContentTopic);
};
await waku.filter.subscribe(callback, [TestContentTopic]);
await waku.relay.send(
await WakuMessage.fromUtf8String("Filtering works!", TestContentTopic)
);
await waku.relay.send(
await WakuMessage.fromUtf8String(
"Filtering still works!",
TestContentTopic
)
);
while (messageCount < 2) {
await delay(250);
}
expect(messageCount).to.eq(2);
});
it("unsubscribes", async function () {
let messageCount = 0;
const callback = (): void => {
messageCount++;
};
const unsubscribe = await waku.filter.subscribe(callback, [
TestContentTopic,
]);
await waku.relay.send(
await WakuMessage.fromUtf8String(
"This should be received",
TestContentTopic
)
);
await delay(100);
await unsubscribe();
await waku.relay.send(
await WakuMessage.fromUtf8String(
"This should not be received",
TestContentTopic
)
);
await delay(100);
expect(messageCount).to.eq(1);
});
});

View File

@ -0,0 +1,242 @@
import debug from "debug";
import lp from "it-length-prefixed";
import { pipe } from "it-pipe";
import Libp2p, { MuxedStream } from "libp2p";
import { Peer, PeerId } from "libp2p/src/peer-store";
import { WakuMessage as WakuMessageProto } from "../../proto/waku/v2/message";
import { getPeersForProtocol, selectRandomPeer } from "../select_peer";
import { hexToBytes } from "../utils";
import { DefaultPubSubTopic } from "../waku";
import { DecryptionMethod, WakuMessage } from "../waku_message";
import { ContentFilter, FilterRPC } from "./filter_rpc";
export const FilterCodec = "/vac/waku/filter/2.0.0-beta1";
const log = debug("waku:filter");
type FilterSubscriptionOpts = {
/**
* The Pubsub topic for the subscription
*/
pubsubTopic?: string;
/**
* Optionally specify a PeerId for the subscription. If not included, will use a random peer.
*/
peerId?: PeerId;
};
type FilterCallback = (msg: WakuMessage) => void | Promise<void>;
type UnsubscribeFunction = () => Promise<void>;
/**
* Implements client side of the [Waku v2 Filter protocol](https://rfc.vac.dev/spec/12/).
*
* Note this currently only works in NodeJS when the Waku node is listening on a port, see:
* - https://github.com/status-im/go-waku/issues/245
* - https://github.com/status-im/nwaku/issues/948
*/
export class WakuFilter {
private subscriptions: Map<string, FilterCallback>;
public decryptionKeys: Map<
Uint8Array,
{ method?: DecryptionMethod; contentTopics?: string[] }
>;
constructor(public libp2p: Libp2p) {
this.subscriptions = new Map();
this.decryptionKeys = new Map();
this.libp2p.handle(FilterCodec, this.onRequest.bind(this));
}
/**
* @param contentTopics Array of ContentTopics to subscribe to. If empty, no messages will be returned from the filter.
* @param callback A function that will be called on each message returned by the filter.
* @param opts The FilterSubscriptionOpts used to narrow which messages are returned, and which peer to connect to.
* @returns Unsubscribe function that can be used to end the subscription.
*/
async subscribe(
callback: FilterCallback,
contentTopics: string[],
opts?: FilterSubscriptionOpts
): Promise<UnsubscribeFunction> {
const topic = opts?.pubsubTopic || DefaultPubSubTopic;
const contentFilters = contentTopics.map((contentTopic) => ({
contentTopic,
}));
const request = FilterRPC.createRequest(
topic,
contentFilters,
undefined,
true
);
const peer = await this.getPeer(opts?.peerId);
const stream = await this.newStream(peer);
try {
await pipe([request.encode()], lp.encode(), stream);
} catch (e) {
log(
"Error subscribing to peer ",
peer.id.toB58String(),
"for content topics",
contentTopics,
": ",
e
);
throw e;
}
this.addCallback(request.requestId, callback);
return async () => {
await this.unsubscribe(topic, contentFilters, request.requestId, peer);
this.removeCallback(request.requestId);
};
}
private async onRequest({ stream }: Libp2p.HandlerProps): Promise<void> {
log("Receiving message push");
try {
await pipe(
stream.source,
lp.decode(),
async (source: AsyncIterable<Buffer>) => {
for await (const bytes of source) {
const res = FilterRPC.decode(bytes.slice());
if (res.push?.messages?.length) {
await this.pushMessages(res.requestId, res.push.messages);
}
}
}
);
} catch (e) {
log("Error decoding message", e);
}
}
private async pushMessages(
requestId: string,
messages: WakuMessageProto[]
): Promise<void> {
const callback = this.subscriptions.get(requestId);
if (!callback) {
log(`No callback registered for request ID ${requestId}`);
return;
}
const decryptionKeys = Array.from(this.decryptionKeys).map(
([key, { method, contentTopics }]) => {
return {
key,
method,
contentTopics,
};
}
);
for (const message of messages) {
const decoded = await WakuMessage.decodeProto(message, decryptionKeys);
if (!decoded) {
log("Not able to decode message");
continue;
}
callback(decoded);
}
}
private addCallback(requestId: string, callback: FilterCallback): void {
this.subscriptions.set(requestId, callback);
}
private removeCallback(requestId: string): void {
this.subscriptions.delete(requestId);
}
private async unsubscribe(
topic: string,
contentFilters: ContentFilter[],
requestId: string,
peer: Peer
): Promise<void> {
const unsubscribeRequest = FilterRPC.createRequest(
topic,
contentFilters,
requestId,
false
);
const stream = await this.newStream(peer);
try {
await pipe([unsubscribeRequest.encode()], lp.encode(), stream.sink);
} catch (e) {
log("Error unsubscribing", e);
throw e;
}
}
private async newStream(peer: Peer): Promise<MuxedStream> {
const connection = this.libp2p.connectionManager.get(peer.id);
if (!connection) {
throw new Error("Failed to get a connection to the peer");
}
const { stream } = await connection.newStream(FilterCodec);
return stream;
}
private async getPeer(peerId?: PeerId): Promise<Peer> {
let peer;
if (peerId) {
peer = await this.libp2p.peerStore.get(peerId);
if (!peer) {
throw new Error(
`Failed to retrieve connection details for provided peer in peer store: ${peerId.toB58String()}`
);
}
} else {
peer = await this.randomPeer;
if (!peer) {
throw new Error(
"Failed to find known peer that registers waku filter protocol"
);
}
}
return peer;
}
/**
* Register a decryption key to attempt decryption of messages received in any
* subsequent [[subscribe]] call. This can either be a private key for
* asymmetric encryption or a symmetric key. [[WakuStore]] will attempt to
* decrypt messages using both methods.
*
* Strings must be in hex format.
*/
addDecryptionKey(
key: Uint8Array | string,
options?: { method?: DecryptionMethod; contentTopics?: string[] }
): void {
this.decryptionKeys.set(hexToBytes(key), options ?? {});
}
/**
* Delete a decryption key so that it cannot be used in future [[subscribe]] calls
*
* Strings must be in hex format.
*/
deleteDecryptionKey(key: Uint8Array | string): void {
this.decryptionKeys.delete(hexToBytes(key));
}
get peers(): AsyncIterable<Peer> {
return getPeersForProtocol(this.libp2p, [FilterCodec]);
}
get randomPeer(): Promise<Peer | undefined> {
return selectRandomPeer(this.peers);
}
}

358
src/proto/waku/v2/filter.ts Normal file
View File

@ -0,0 +1,358 @@
/* eslint-disable */
import Long from "long";
import _m0 from "protobufjs/minimal";
import { WakuMessage } from "../../waku/v2/message";
export const protobufPackage = "waku.v2";
export interface FilterRequest {
subscribe: boolean;
topic: string;
contentFilters: FilterRequest_ContentFilter[];
}
export interface FilterRequest_ContentFilter {
contentTopic: string;
}
export interface MessagePush {
messages: WakuMessage[];
}
export interface FilterRPC {
requestId: string;
request: FilterRequest | undefined;
push: MessagePush | undefined;
}
function createBaseFilterRequest(): FilterRequest {
return { subscribe: false, topic: "", contentFilters: [] };
}
export const FilterRequest = {
encode(
message: FilterRequest,
writer: _m0.Writer = _m0.Writer.create()
): _m0.Writer {
if (message.subscribe === true) {
writer.uint32(8).bool(message.subscribe);
}
if (message.topic !== "") {
writer.uint32(18).string(message.topic);
}
for (const v of message.contentFilters) {
FilterRequest_ContentFilter.encode(v!, writer.uint32(26).fork()).ldelim();
}
return writer;
},
decode(input: _m0.Reader | Uint8Array, length?: number): FilterRequest {
const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input);
let end = length === undefined ? reader.len : reader.pos + length;
const message = createBaseFilterRequest();
while (reader.pos < end) {
const tag = reader.uint32();
switch (tag >>> 3) {
case 1:
message.subscribe = reader.bool();
break;
case 2:
message.topic = reader.string();
break;
case 3:
message.contentFilters.push(
FilterRequest_ContentFilter.decode(reader, reader.uint32())
);
break;
default:
reader.skipType(tag & 7);
break;
}
}
return message;
},
fromJSON(object: any): FilterRequest {
return {
subscribe: isSet(object.subscribe) ? Boolean(object.subscribe) : false,
topic: isSet(object.topic) ? String(object.topic) : "",
contentFilters: Array.isArray(object?.contentFilters)
? object.contentFilters.map((e: any) =>
FilterRequest_ContentFilter.fromJSON(e)
)
: [],
};
},
toJSON(message: FilterRequest): unknown {
const obj: any = {};
message.subscribe !== undefined && (obj.subscribe = message.subscribe);
message.topic !== undefined && (obj.topic = message.topic);
if (message.contentFilters) {
obj.contentFilters = message.contentFilters.map((e) =>
e ? FilterRequest_ContentFilter.toJSON(e) : undefined
);
} else {
obj.contentFilters = [];
}
return obj;
},
fromPartial<I extends Exact<DeepPartial<FilterRequest>, I>>(
object: I
): FilterRequest {
const message = createBaseFilterRequest();
message.subscribe = object.subscribe ?? false;
message.topic = object.topic ?? "";
message.contentFilters =
object.contentFilters?.map((e) =>
FilterRequest_ContentFilter.fromPartial(e)
) || [];
return message;
},
};
function createBaseFilterRequest_ContentFilter(): FilterRequest_ContentFilter {
return { contentTopic: "" };
}
export const FilterRequest_ContentFilter = {
encode(
message: FilterRequest_ContentFilter,
writer: _m0.Writer = _m0.Writer.create()
): _m0.Writer {
if (message.contentTopic !== "") {
writer.uint32(10).string(message.contentTopic);
}
return writer;
},
decode(
input: _m0.Reader | Uint8Array,
length?: number
): FilterRequest_ContentFilter {
const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input);
let end = length === undefined ? reader.len : reader.pos + length;
const message = createBaseFilterRequest_ContentFilter();
while (reader.pos < end) {
const tag = reader.uint32();
switch (tag >>> 3) {
case 1:
message.contentTopic = reader.string();
break;
default:
reader.skipType(tag & 7);
break;
}
}
return message;
},
fromJSON(object: any): FilterRequest_ContentFilter {
return {
contentTopic: isSet(object.contentTopic)
? String(object.contentTopic)
: "",
};
},
toJSON(message: FilterRequest_ContentFilter): unknown {
const obj: any = {};
message.contentTopic !== undefined &&
(obj.contentTopic = message.contentTopic);
return obj;
},
fromPartial<I extends Exact<DeepPartial<FilterRequest_ContentFilter>, I>>(
object: I
): FilterRequest_ContentFilter {
const message = createBaseFilterRequest_ContentFilter();
message.contentTopic = object.contentTopic ?? "";
return message;
},
};
function createBaseMessagePush(): MessagePush {
return { messages: [] };
}
export const MessagePush = {
encode(
message: MessagePush,
writer: _m0.Writer = _m0.Writer.create()
): _m0.Writer {
for (const v of message.messages) {
WakuMessage.encode(v!, writer.uint32(10).fork()).ldelim();
}
return writer;
},
decode(input: _m0.Reader | Uint8Array, length?: number): MessagePush {
const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input);
let end = length === undefined ? reader.len : reader.pos + length;
const message = createBaseMessagePush();
while (reader.pos < end) {
const tag = reader.uint32();
switch (tag >>> 3) {
case 1:
message.messages.push(WakuMessage.decode(reader, reader.uint32()));
break;
default:
reader.skipType(tag & 7);
break;
}
}
return message;
},
fromJSON(object: any): MessagePush {
return {
messages: Array.isArray(object?.messages)
? object.messages.map((e: any) => WakuMessage.fromJSON(e))
: [],
};
},
toJSON(message: MessagePush): unknown {
const obj: any = {};
if (message.messages) {
obj.messages = message.messages.map((e) =>
e ? WakuMessage.toJSON(e) : undefined
);
} else {
obj.messages = [];
}
return obj;
},
fromPartial<I extends Exact<DeepPartial<MessagePush>, I>>(
object: I
): MessagePush {
const message = createBaseMessagePush();
message.messages =
object.messages?.map((e) => WakuMessage.fromPartial(e)) || [];
return message;
},
};
function createBaseFilterRPC(): FilterRPC {
return { requestId: "", request: undefined, push: undefined };
}
export const FilterRPC = {
encode(
message: FilterRPC,
writer: _m0.Writer = _m0.Writer.create()
): _m0.Writer {
if (message.requestId !== "") {
writer.uint32(10).string(message.requestId);
}
if (message.request !== undefined) {
FilterRequest.encode(message.request, writer.uint32(18).fork()).ldelim();
}
if (message.push !== undefined) {
MessagePush.encode(message.push, writer.uint32(26).fork()).ldelim();
}
return writer;
},
decode(input: _m0.Reader | Uint8Array, length?: number): FilterRPC {
const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input);
let end = length === undefined ? reader.len : reader.pos + length;
const message = createBaseFilterRPC();
while (reader.pos < end) {
const tag = reader.uint32();
switch (tag >>> 3) {
case 1:
message.requestId = reader.string();
break;
case 2:
message.request = FilterRequest.decode(reader, reader.uint32());
break;
case 3:
message.push = MessagePush.decode(reader, reader.uint32());
break;
default:
reader.skipType(tag & 7);
break;
}
}
return message;
},
fromJSON(object: any): FilterRPC {
return {
requestId: isSet(object.requestId) ? String(object.requestId) : "",
request: isSet(object.request)
? FilterRequest.fromJSON(object.request)
: undefined,
push: isSet(object.push) ? MessagePush.fromJSON(object.push) : undefined,
};
},
toJSON(message: FilterRPC): unknown {
const obj: any = {};
message.requestId !== undefined && (obj.requestId = message.requestId);
message.request !== undefined &&
(obj.request = message.request
? FilterRequest.toJSON(message.request)
: undefined);
message.push !== undefined &&
(obj.push = message.push ? MessagePush.toJSON(message.push) : undefined);
return obj;
},
fromPartial<I extends Exact<DeepPartial<FilterRPC>, I>>(
object: I
): FilterRPC {
const message = createBaseFilterRPC();
message.requestId = object.requestId ?? "";
message.request =
object.request !== undefined && object.request !== null
? FilterRequest.fromPartial(object.request)
: undefined;
message.push =
object.push !== undefined && object.push !== null
? MessagePush.fromPartial(object.push)
: undefined;
return message;
},
};
type Builtin =
| Date
| Function
| Uint8Array
| string
| number
| boolean
| undefined;
export type DeepPartial<T> = T extends Builtin
? T
: T extends Long
? string | number | Long
: T extends Array<infer U>
? Array<DeepPartial<U>>
: T extends ReadonlyArray<infer U>
? ReadonlyArray<DeepPartial<U>>
: T extends {}
? { [K in keyof T]?: DeepPartial<T[K]> }
: Partial<T>;
type KeysOfUnion<T> = T extends T ? keyof T : never;
export type Exact<P, I extends P> = P extends Builtin
? P
: P & { [K in keyof P]: Exact<P[K], I[K]> } & Record<
Exclude<keyof I, KeysOfUnion<P>>,
never
>;
if (_m0.util.Long !== Long) {
_m0.util.Long = Long as any;
_m0.configure();
}
function isSet(value: any): boolean {
return value !== null && value !== undefined;
}