Code cleanup

This commit is contained in:
Nicholas Molnar 2022-05-16 10:30:27 -07:00 committed by Franck Royer
parent 41f01c6d60
commit 971d080ab5
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4
1 changed files with 28 additions and 18 deletions

View File

@ -1,7 +1,7 @@
import debug from "debug"; import debug from "debug";
import lp from "it-length-prefixed"; import lp from "it-length-prefixed";
import { pipe } from "it-pipe"; import { pipe } from "it-pipe";
import Libp2p from "libp2p"; import Libp2p, { MuxedStream } from "libp2p";
import { Peer, PeerId } from "libp2p/src/peer-store"; import { Peer, PeerId } from "libp2p/src/peer-store";
import { WakuMessage as WakuMessageProto } from "../../proto/waku/v2/message"; import { WakuMessage as WakuMessageProto } from "../../proto/waku/v2/message";
@ -34,6 +34,7 @@ export class WakuFilter {
Uint8Array, Uint8Array,
{ method?: DecryptionMethod; contentTopics?: string[] } { method?: DecryptionMethod; contentTopics?: string[] }
>; >;
constructor(public libp2p: Libp2p) { constructor(public libp2p: Libp2p) {
this.libp2p.handle(FilterCodec, this.onRequest.bind(this)); this.libp2p.handle(FilterCodec, this.onRequest.bind(this));
this.subscriptions = {}; this.subscriptions = {};
@ -54,17 +55,15 @@ export class WakuFilter {
undefined, undefined,
true true
); );
const peer = await this.getPeer();
const connection = this.libp2p.connectionManager.get(peer.id);
if (!connection) {
throw "Failed to get a connection to the peer";
}
const { stream } = await connection.newStream(FilterCodec); const peer = await this.getPeer();
const stream = await this.newStream(peer);
try { try {
await pipe([request.encode()], lp.encode(), stream); await pipe([request.encode()], lp.encode(), stream);
} catch (e) { } catch (e) {
log("Error subscribing", e); log("Error subscribing", e);
throw e;
} }
this.addCallback(request.requestId, callback); this.addCallback(request.requestId, callback);
@ -101,7 +100,7 @@ export class WakuFilter {
): Promise<void> { ): Promise<void> {
const callback = this.subscriptions[requestId]; const callback = this.subscriptions[requestId];
if (!callback) { if (!callback) {
console.warn(`No callback registered for request ID ${requestId}`); log(`No callback registered for request ID ${requestId}`);
return; return;
} }
@ -118,7 +117,7 @@ export class WakuFilter {
for (const message of messages) { for (const message of messages) {
const decoded = await WakuMessage.decodeProto(message, decryptionKeys); const decoded = await WakuMessage.decodeProto(message, decryptionKeys);
if (!decoded) { if (!decoded) {
console.error("Not able to decode message"); log("Not able to decode message");
continue; continue;
} }
callback(decoded); callback(decoded);
@ -145,28 +144,39 @@ export class WakuFilter {
requestId, requestId,
false false
); );
const connection = this.libp2p.connectionManager.get(peer.id); const stream = await this.newStream(peer);
if (!connection) {
throw "Failed to get a connection to the peer";
}
const { stream } = await connection.newStream(FilterCodec);
try { try {
await pipe([unsubscribeRequest.encode()], lp.encode(), stream.sink); await pipe([unsubscribeRequest.encode()], lp.encode(), stream.sink);
} catch (e) { } catch (e) {
console.error("Error unsubscribing", e); log("Error unsubscribing", e);
throw e;
} }
} }
private async newStream(peer: Peer): Promise<MuxedStream> {
const connection = this.libp2p.connectionManager.get(peer.id);
if (!connection) {
throw new Error("Failed to get a connection to the peer");
}
const { stream } = await connection.newStream(FilterCodec);
return stream;
}
private async getPeer(peerId?: PeerId): Promise<Peer> { private async getPeer(peerId?: PeerId): Promise<Peer> {
let peer; let peer;
if (peerId) { if (peerId) {
peer = await this.libp2p.peerStore.get(peerId); peer = await this.libp2p.peerStore.get(peerId);
if (!peer) if (!peer)
throw `Failed to retrieve connection details for provided peer in peer store: ${peerId.toB58String()}`; throw new Error(
`Failed to retrieve connection details for provided peer in peer store: ${peerId.toB58String()}`
);
} else { } else {
peer = await this.randomPeer; peer = await this.randomPeer;
if (!peer) if (!peer)
throw "Failed to find known peer that registers waku filter protocol"; throw new Error(
"Failed to find known peer that registers waku filter protocol"
);
} }
return peer; return peer;
} }
@ -186,7 +196,7 @@ export class WakuFilter {
this.decryptionKeys.set(hexToBytes(key), options ?? {}); this.decryptionKeys.set(hexToBytes(key), options ?? {});
} }
/**cursorV2Beta4 /**
* Delete a decryption key so that it cannot be used in future [[subscribe]] calls * Delete a decryption key so that it cannot be used in future [[subscribe]] calls
* *
* Strings must be in hex format. * Strings must be in hex format.