Finish implementation

This commit is contained in:
Nicholas Molnar 2022-04-19 21:51:44 -07:00 committed by Franck Royer
parent 691de1a194
commit 4734e4b7c7
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4
5 changed files with 355 additions and 2 deletions

View File

@ -17,6 +17,8 @@ import { Multiaddr, multiaddr } from "multiaddr";
import PeerId from "peer-id";
import { Bootstrap, BootstrapOptions } from "./discovery";
import { WakuFilter } from "./waku_filter";
import { FilterCodec } from "./waku_filter";
import { LightPushCodec, WakuLightPush } from "./waku_light_push";
import { DecryptionMethod, WakuMessage } from "./waku_message";
import { RelayCodecs, WakuRelay } from "./waku_relay";
@ -39,6 +41,7 @@ export enum Protocols {
Relay = "relay",
Store = "store",
LightPush = "lightpush",
Filter = "filter",
}
export interface CreateOptions {
@ -102,6 +105,7 @@ export class Waku {
public libp2p: Libp2p;
public relay: WakuRelay;
public store: WakuStore;
public filter: WakuFilter;
public lightPush: WakuLightPush;
private pingKeepAliveTimers: {
@ -115,11 +119,13 @@ export class Waku {
options: CreateOptions,
libp2p: Libp2p,
store: WakuStore,
lightPush: WakuLightPush
lightPush: WakuLightPush,
filter: WakuFilter
) {
this.libp2p = libp2p;
this.relay = libp2p.pubsub as unknown as WakuRelay;
this.store = store;
this.filter = filter;
this.lightPush = lightPush;
this.pingKeepAliveTimers = {};
this.relayKeepAliveTimers = {};
@ -220,10 +226,17 @@ export class Waku {
pubSubTopic: options?.pubSubTopic,
});
const wakuLightPush = new WakuLightPush(libp2p);
const wakuFilter = new WakuFilter(libp2p);
await libp2p.start();
return new Waku(options ? options : {}, libp2p, wakuStore, wakuLightPush);
return new Waku(
options ? options : {},
libp2p,
wakuStore,
wakuLightPush,
wakuFilter
);
}
/**
@ -381,6 +394,16 @@ export class Waku {
promises.push(lightPushPromise);
}
if (protocols.includes(Protocols.Filter)) {
const filterPromise = (async (): Promise<void> => {
for await (const peer of this.filter.peers) {
dbg("Filter peer found", peer.id.toB58String());
break;
}
})();
promises.push(filterPromise);
}
if (timeoutMs) {
await rejectOnTimeout(
Promise.all(promises),

View File

@ -0,0 +1,46 @@
import { Reader } from "protobufjs/minimal";
import { v4 as uuid } from "uuid";
import * as proto from "../../proto/waku/v2/filter";
export type ContentFilter = {
contentTopic: string;
};
export class FilterRPC {
public constructor(public proto: proto.FilterRPC) {}
static createRequest(
topic: string,
contentFilters: ContentFilter[],
requestId?: string,
subscribe = true
): FilterRPC {
return new FilterRPC({
requestId: requestId || uuid(),
request: {
subscribe,
topic,
contentFilters,
},
push: undefined,
});
}
static decode(bytes: Uint8Array): FilterRPC {
const res = proto.FilterRPC.decode(Reader.create(bytes));
return new FilterRPC(res);
}
encode(): Uint8Array {
return proto.FilterRPC.encode(this.proto).finish();
}
get push(): proto.MessagePush | undefined {
return this.proto.push;
}
get requestId(): string {
return this.proto.requestId;
}
}

View File

@ -0,0 +1,113 @@
import { expect } from "chai";
import debug from "debug";
import { makeLogFileName, NOISE_KEY_1, Nwaku } from "../../test_utils";
import { delay } from "../../test_utils/delay";
import { Protocols, Waku } from "../waku";
import { WakuMessage } from "../waku_message";
const log = debug("waku:test");
const TestContentTopic = "/test/1/waku-filter";
describe("Waku Filter", () => {
let waku: Waku;
let nwaku: Nwaku;
afterEach(async function () {
!!nwaku && nwaku.stop();
!!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e));
});
beforeEach(async function () {
this.timeout(10000);
nwaku = new Nwaku(makeLogFileName(this));
await nwaku.start({ filter: true });
waku = await Waku.create({
staticNoiseKey: NOISE_KEY_1,
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } },
});
await waku.dial(await nwaku.getMultiaddrWithId());
await waku.waitForRemotePeer([Protocols.Filter, Protocols.Relay]);
});
it("creates a subscription", async function () {
this.timeout(10000);
let messageCount = 0;
const messageText = "Filtering works!";
const callback = (msg: WakuMessage): void => {
log("Got a message");
messageCount++;
expect(msg.contentTopic).to.eq(TestContentTopic);
expect(msg.payloadAsUtf8).to.eq(messageText);
};
await waku.filter.subscribe(
{ contentTopics: [TestContentTopic] },
callback
);
const message = await WakuMessage.fromUtf8String(
messageText,
TestContentTopic
);
waku.relay.send(message);
while (messageCount === 0) {
await delay(250);
}
expect(messageCount).to.eq(1);
});
it("handles multiple messages", async function () {
this.timeout(10000);
let messageCount = 0;
const callback = (msg: WakuMessage): void => {
messageCount++;
expect(msg.contentTopic).to.eq(TestContentTopic);
};
await waku.filter.subscribe(
{ contentTopics: [TestContentTopic] },
callback
);
waku.relay.send(
await WakuMessage.fromUtf8String("Filtering works!", TestContentTopic)
);
waku.relay.send(
await WakuMessage.fromUtf8String(
"Filtering still works!",
TestContentTopic
)
);
while (messageCount < 2) {
await delay(250);
}
expect(messageCount).to.eq(2);
});
it("unsubscribes", async function () {
let messageCount = 0;
const callback = (): void => {
messageCount++;
};
const unsubscribe = await waku.filter.subscribe(
{ contentTopics: [TestContentTopic] },
callback
);
waku.relay.send(
await WakuMessage.fromUtf8String(
"This should be received",
TestContentTopic
)
);
await delay(100);
await unsubscribe();
waku.relay.send(
await WakuMessage.fromUtf8String(
"This should not be received",
TestContentTopic
)
);
await delay(100);
expect(messageCount).to.eq(1);
});
});

View File

@ -0,0 +1,170 @@
import debug from "debug";
// import concat from "it-concat";
import lp from "it-length-prefixed";
import { pipe } from "it-pipe";
import Libp2p from "libp2p";
import { Peer, PeerId } from "libp2p/src/peer-store";
import { WakuMessage as WakuMessageProto } from "../../proto/waku/v2/message";
import { getPeersForProtocol, selectRandomPeer } from "../select_peer";
import { DefaultPubSubTopic } from "../waku";
import { DecryptionMethod, WakuMessage } from "../waku_message/index";
import { ContentFilter, FilterRPC } from "./filter_rpc";
export const FilterCodec = "/vac/waku/filter/2.0.0-beta1";
const log = debug("waku:filter");
type FilterSubscriptionOpts = {
topic?: string;
peerId?: PeerId;
contentTopics: string[];
};
type FilterCallback = (msg: WakuMessage) => void | Promise<void>;
type UnsubscribeFunction = () => Promise<void>;
export class WakuFilter {
private subscriptions: {
[requestId: string]: FilterCallback;
};
public decryptionKeys: Map<
Uint8Array,
{ method?: DecryptionMethod; contentTopics?: string[] }
>;
constructor(public libp2p: Libp2p) {
this.libp2p.handle(FilterCodec, this.onRequest.bind(this));
this.subscriptions = {};
this.decryptionKeys = new Map();
}
async subscribe(
opts: FilterSubscriptionOpts,
callback: FilterCallback
): Promise<UnsubscribeFunction> {
const topic = opts.topic || DefaultPubSubTopic;
const contentFilters = opts.contentTopics.map((contentTopic) => ({
contentTopic,
}));
const request = FilterRPC.createRequest(
topic,
contentFilters,
undefined,
true
);
const peer = await this.getPeer();
const connection = this.libp2p.connectionManager.get(peer.id);
if (!connection) {
throw "Failed to get a connection to the peer";
}
const { stream } = await connection.newStream(FilterCodec);
try {
await pipe([request.encode()], lp.encode(), stream.sink);
} catch (e) {
log("Error subscribing", e);
}
this.addCallback(request.requestId, callback);
return async () => {
await this.unsubscribe(topic, contentFilters, request.requestId, peer);
this.removeCallback(request.requestId);
};
}
private async onRequest({ stream }: Libp2p.HandlerProps): Promise<void> {
log("Receiving message push");
try {
await pipe(
stream.source,
lp.decode(),
async (source: AsyncIterable<Buffer>) => {
for await (const bytes of source) {
const res = FilterRPC.decode(bytes.slice());
if (res.push?.messages?.length) {
await this.pushMessages(res.requestId, res.push.messages);
}
}
}
);
} catch (e) {
log("Error decoding message", e);
}
}
private async pushMessages(
requestId: string,
messages: WakuMessageProto[]
): Promise<void> {
const callback = this.subscriptions[requestId];
if (!callback) {
console.warn(`No callback registered for request ID ${requestId}`);
return;
}
for (const message of messages) {
const decoded = await WakuMessage.decodeProto(message, []);
if (!decoded) {
console.error("Not able to decode message");
continue;
}
callback(decoded);
}
}
private addCallback(requestId: string, callback: FilterCallback): void {
this.subscriptions[requestId] = callback;
}
private removeCallback(requestId: string): void {
delete this.subscriptions[requestId];
}
private async unsubscribe(
topic: string,
contentFilters: ContentFilter[],
requestId: string,
peer: Peer
): Promise<void> {
const unsubscribeRequest = FilterRPC.createRequest(
topic,
contentFilters,
requestId,
false
);
const connection = this.libp2p.connectionManager.get(peer.id);
if (!connection) {
throw "Failed to get a connection to the peer";
}
const { stream } = await connection.newStream(FilterCodec);
try {
await pipe([unsubscribeRequest.encode()], lp.encode(), stream.sink);
} catch (e) {
console.error("Error unsubscribing", e);
}
}
private async getPeer(peerId?: PeerId): Promise<Peer> {
let peer;
if (peerId) {
peer = await this.libp2p.peerStore.get(peerId);
if (!peer)
throw `Failed to retrieve connection details for provided peer in peer store: ${peerId.toB58String()}`;
} else {
peer = await this.randomPeer;
if (!peer)
throw "Failed to find known peer that registers waku store protocol";
}
return peer;
}
get peers(): AsyncIterable<Peer> {
return getPeersForProtocol(this.libp2p, [FilterCodec]);
}
get randomPeer(): Promise<Peer | undefined> {
return selectRandomPeer(this.peers);
}
}

View File

@ -33,6 +33,7 @@ export interface Args {
relay?: boolean;
rpc?: boolean;
rpcAdmin?: boolean;
filter?: boolean;
nodekey?: string;
portsShift?: number;
logLevel?: LogLevel;