diff --git a/package-lock.json b/package-lock.json index 180b26c59c..f4c6e91e23 100644 --- a/package-lock.json +++ b/package-lock.json @@ -26,6 +26,7 @@ "debug": "^4.3.4", "dns-query": "^0.11.1", "hi-base32": "^0.5.1", + "it-all": "^1.0.6", "it-length-prefixed": "^7.0.1", "it-pipe": "^2.0.3", "js-sha3": "^0.8.0", diff --git a/package.json b/package.json index d6f28a890c..e87d9813ea 100644 --- a/package.json +++ b/package.json @@ -83,6 +83,7 @@ "debug": "^4.3.4", "dns-query": "^0.11.1", "hi-base32": "^0.5.1", + "it-all": "^1.0.6", "it-length-prefixed": "^7.0.1", "it-pipe": "^2.0.3", "js-sha3": "^0.8.0", diff --git a/src/lib/waku_filter/index.node.spec.ts b/src/lib/waku_filter/index.node.spec.ts index c077b41964..ff15aadcf2 100644 --- a/src/lib/waku_filter/index.node.spec.ts +++ b/src/lib/waku_filter/index.node.spec.ts @@ -22,14 +22,14 @@ describe("Waku Filter", () => { beforeEach(async function () { this.timeout(10000); nwaku = new Nwaku(makeLogFileName(this)); - await nwaku.start({ filter: true }); + await nwaku.start({ filter: true, lightpush: true }); waku = await createWaku({ staticNoiseKey: NOISE_KEY_1, libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }, }); await waku.start(); await waku.dial(await nwaku.getMultiaddrWithId()); - await waku.waitForRemotePeer([Protocols.Filter, Protocols.Relay]); + await waku.waitForRemotePeer([Protocols.Filter, Protocols.LightPush]); }); it("creates a subscription", async function () { @@ -48,7 +48,7 @@ describe("Waku Filter", () => { messageText, TestContentTopic ); - await waku.relay.send(message); + await waku.lightPush.push(message); while (messageCount === 0) { await delay(250); } @@ -64,10 +64,10 @@ describe("Waku Filter", () => { expect(msg.contentTopic).to.eq(TestContentTopic); }; await waku.filter.subscribe(callback, [TestContentTopic]); - await waku.relay.send( + await waku.lightPush.push( await WakuMessage.fromUtf8String("Filtering works!", TestContentTopic) ); - await waku.relay.send( + await waku.lightPush.push( await WakuMessage.fromUtf8String( "Filtering still works!", TestContentTopic @@ -87,7 +87,7 @@ describe("Waku Filter", () => { const unsubscribe = await waku.filter.subscribe(callback, [ TestContentTopic, ]); - await waku.relay.send( + await waku.lightPush.push( await WakuMessage.fromUtf8String( "This should be received", TestContentTopic @@ -95,7 +95,7 @@ describe("Waku Filter", () => { ); await delay(100); await unsubscribe(); - await waku.relay.send( + await waku.lightPush.push( await WakuMessage.fromUtf8String( "This should not be received", TestContentTopic diff --git a/src/lib/waku_filter/index.ts b/src/lib/waku_filter/index.ts index d58a02e7aa..ab80118e2f 100644 --- a/src/lib/waku_filter/index.ts +++ b/src/lib/waku_filter/index.ts @@ -1,6 +1,7 @@ import type { PeerId } from "@libp2p/interface-peer-id"; import type { Peer } from "@libp2p/interface-peer-store"; import debug from "debug"; +import all from "it-all"; import * as lp from "it-length-prefixed"; import { pipe } from "it-pipe"; import { Libp2p } from "libp2p"; @@ -49,7 +50,9 @@ export class WakuFilter { constructor(public libp2p: Libp2p) { this.subscriptions = new Map(); this.decryptionKeys = new Map(); - this.libp2p.handle(FilterCodec, this.onRequest.bind(this)); + this.libp2p + .handle(FilterCodec, this.onRequest.bind(this)) + .catch((e) => log("Failed to register filter protocol", e)); } /** @@ -84,7 +87,15 @@ export class WakuFilter { const stream = await this.newStream(peer); try { - await pipe([request.encode()], lp.encode(), stream); + const res = await pipe( + [request.encode()], + lp.encode(), + stream, + lp.decode(), + async (source) => await all(source) + ); + + log("response", res); } catch (e) { log( "Error subscribing to peer ", @@ -109,14 +120,21 @@ export class WakuFilter { private onRequest({ stream }: any): void { log("Receiving message push"); try { - pipe(stream.source, lp.decode(), async (source) => { + pipe(stream, lp.decode(), async (source) => { for await (const bytes of source) { const res = FilterRPC.decode(bytes.slice()); if (res.requestId && res.push?.messages?.length) { await this.pushMessages(res.requestId, res.push.messages); } } - }); + }).then( + () => { + log("Receiving pipe closed."); + }, + (e) => { + log("Error with receiving pipe", e); + } + ); } catch (e) { log("Error decoding message", e); } diff --git a/src/lib/waku_light_push/index.ts b/src/lib/waku_light_push/index.ts index 7c0ed36982..00ac0b35ff 100644 --- a/src/lib/waku_light_push/index.ts +++ b/src/lib/waku_light_push/index.ts @@ -1,5 +1,6 @@ import type { PeerId } from "@libp2p/interface-peer-id"; import type { Peer } from "@libp2p/interface-peer-store"; +import all from "it-all"; import * as lp from "it-length-prefixed"; import { pipe } from "it-pipe"; import { Libp2p } from "libp2p"; @@ -71,17 +72,12 @@ export class WakuLightPush { ? opts.pubSubTopic : this.pubSubTopic; const query = PushRPC.createRequest(message, pubSubTopic); - const res: Uint8Array[] = []; - await pipe( + const res = await pipe( [query.encode()], lp.encode(), stream, lp.decode(), - async (source) => { - for await (const chunk of source) { - res.push(chunk.slice()); - } - } + async (source) => await all(source) ); try { const bytes = concat(res); diff --git a/src/lib/waku_store/index.ts b/src/lib/waku_store/index.ts index 1d9e94eee8..2f4a5d1bac 100644 --- a/src/lib/waku_store/index.ts +++ b/src/lib/waku_store/index.ts @@ -1,6 +1,7 @@ import type { PeerId } from "@libp2p/interface-peer-id"; import { Peer } from "@libp2p/interface-peer-store"; import debug from "debug"; +import all from "it-all"; import * as lp from "it-length-prefixed"; import { pipe } from "it-pipe"; import { Libp2p } from "libp2p"; @@ -205,17 +206,12 @@ export class WakuStore { const historyRpcQuery = HistoryRPC.createQuery(queryOpts); dbg("Querying store peer", connections[0].remoteAddr.toString()); - const res: Uint8Array[] = []; - await pipe( + const res = await pipe( [historyRpcQuery.encode()], lp.encode(), stream, lp.decode(), - async (source) => { - for await (const chunk of source) { - res.push(chunk.slice()); - } - } + async (source) => await all(source) ); const bytes = concat(res); const reply = historyRpcQuery.decode(bytes);