diff --git a/src/lib/waku_filter/index.ts b/src/lib/waku_filter/index.ts index 2bfafb3bf5..562a19efd8 100644 --- a/src/lib/waku_filter/index.ts +++ b/src/lib/waku_filter/index.ts @@ -1,7 +1,7 @@ import debug from "debug"; import lp from "it-length-prefixed"; import { pipe } from "it-pipe"; -import Libp2p from "libp2p"; +import Libp2p, { MuxedStream } from "libp2p"; import { Peer, PeerId } from "libp2p/src/peer-store"; import { WakuMessage as WakuMessageProto } from "../../proto/waku/v2/message"; @@ -34,6 +34,7 @@ export class WakuFilter { Uint8Array, { method?: DecryptionMethod; contentTopics?: string[] } >; + constructor(public libp2p: Libp2p) { this.libp2p.handle(FilterCodec, this.onRequest.bind(this)); this.subscriptions = {}; @@ -54,17 +55,15 @@ export class WakuFilter { undefined, true ); - const peer = await this.getPeer(); - const connection = this.libp2p.connectionManager.get(peer.id); - if (!connection) { - throw "Failed to get a connection to the peer"; - } - const { stream } = await connection.newStream(FilterCodec); + const peer = await this.getPeer(); + const stream = await this.newStream(peer); + try { await pipe([request.encode()], lp.encode(), stream); } catch (e) { log("Error subscribing", e); + throw e; } this.addCallback(request.requestId, callback); @@ -101,7 +100,7 @@ export class WakuFilter { ): Promise { const callback = this.subscriptions[requestId]; if (!callback) { - console.warn(`No callback registered for request ID ${requestId}`); + log(`No callback registered for request ID ${requestId}`); return; } @@ -118,7 +117,7 @@ export class WakuFilter { for (const message of messages) { const decoded = await WakuMessage.decodeProto(message, decryptionKeys); if (!decoded) { - console.error("Not able to decode message"); + log("Not able to decode message"); continue; } callback(decoded); @@ -145,28 +144,39 @@ export class WakuFilter { requestId, false ); - const connection = this.libp2p.connectionManager.get(peer.id); - if (!connection) { - throw "Failed to get a connection to the peer"; - } - const { stream } = await connection.newStream(FilterCodec); + const stream = await this.newStream(peer); try { await pipe([unsubscribeRequest.encode()], lp.encode(), stream.sink); } catch (e) { - console.error("Error unsubscribing", e); + log("Error unsubscribing", e); + throw e; } } + private async newStream(peer: Peer): Promise { + const connection = this.libp2p.connectionManager.get(peer.id); + if (!connection) { + throw new Error("Failed to get a connection to the peer"); + } + + const { stream } = await connection.newStream(FilterCodec); + return stream; + } + private async getPeer(peerId?: PeerId): Promise { let peer; if (peerId) { peer = await this.libp2p.peerStore.get(peerId); if (!peer) - throw `Failed to retrieve connection details for provided peer in peer store: ${peerId.toB58String()}`; + throw new Error( + `Failed to retrieve connection details for provided peer in peer store: ${peerId.toB58String()}` + ); } else { peer = await this.randomPeer; if (!peer) - throw "Failed to find known peer that registers waku filter protocol"; + throw new Error( + "Failed to find known peer that registers waku filter protocol" + ); } return peer; } @@ -186,7 +196,7 @@ export class WakuFilter { this.decryptionKeys.set(hexToBytes(key), options ?? {}); } - /**cursorV2Beta4 + /** * Delete a decryption key so that it cannot be used in future [[subscribe]] calls * * Strings must be in hex format.