diff --git a/src/index.ts b/src/index.ts index 5788df3c26..e06c6c38fe 100644 --- a/src/index.ts +++ b/src/index.ts @@ -13,6 +13,8 @@ export * as enr from "./lib/enr"; export * as utils from "./lib/utils"; +export { waitForRemotePeer } from "./lib/wait_for_remote_peer"; + export * as waku from "./lib/waku"; export { Waku, Protocols } from "./lib/waku"; diff --git a/src/lib/enr/enr.node.spec.ts b/src/lib/enr/enr.node.spec.ts index 475129b9e4..81c45c1c24 100644 --- a/src/lib/enr/enr.node.spec.ts +++ b/src/lib/enr/enr.node.spec.ts @@ -1,6 +1,7 @@ import { expect } from "chai"; import { makeLogFileName, NOISE_KEY_1, Nwaku } from "../../test_utils"; +import { waitForRemotePeer } from "../wait_for_remote_peer"; import { createWaku, Protocols, Waku } from "../waku"; import { ENR } from "./enr"; @@ -30,7 +31,7 @@ describe("ENR Interop: nwaku", function () { }); await waku.start(); await waku.dial(multiAddrWithId); - await waku.waitForRemotePeer([Protocols.Relay]); + await waitForRemotePeer(waku, [Protocols.Relay]); const nwakuInfo = await nwaku.info(); const nimPeerId = await nwaku.getPeerId(); @@ -62,7 +63,7 @@ describe("ENR Interop: nwaku", function () { }); await waku.start(); await waku.dial(multiAddrWithId); - await waku.waitForRemotePeer([Protocols.Relay]); + await waitForRemotePeer(waku, [Protocols.Relay]); const nwakuInfo = await nwaku.info(); const nimPeerId = await nwaku.getPeerId(); @@ -94,7 +95,7 @@ describe("ENR Interop: nwaku", function () { }); await waku.start(); await waku.dial(multiAddrWithId); - await waku.waitForRemotePeer([Protocols.Relay]); + await waitForRemotePeer(waku, [Protocols.Relay]); const nwakuInfo = await nwaku.info(); const nimPeerId = await nwaku.getPeerId(); diff --git a/src/lib/wait_for_remote_peer.node.spec.ts b/src/lib/wait_for_remote_peer.node.spec.ts new file mode 100644 index 0000000000..57b0a7c357 --- /dev/null +++ b/src/lib/wait_for_remote_peer.node.spec.ts @@ -0,0 +1,171 @@ +import { expect } from "chai"; + +import { makeLogFileName, NOISE_KEY_1, Nwaku } from "../test_utils"; +import { delay } from "../test_utils/delay"; + +import { waitForRemotePeer } from "./wait_for_remote_peer"; +import { createWaku, Protocols, Waku } from "./waku"; + +describe("Wait for remote peer", function () { + let waku: Waku; + let nwaku: Nwaku | undefined; + + afterEach(async function () { + if (nwaku) { + nwaku.stop(); + nwaku = undefined; + } + !!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e)); + }); + + it("Relay - dialed first", async function () { + this.timeout(20_000); + nwaku = new Nwaku(makeLogFileName(this)); + await nwaku.start(); + const multiAddrWithId = await nwaku.getMultiaddrWithId(); + + waku = await createWaku({ + staticNoiseKey: NOISE_KEY_1, + }); + await waku.start(); + await waku.dial(multiAddrWithId); + await delay(1000); + await waitForRemotePeer(waku, [Protocols.Relay]); + const peers = waku.relay.getPeers(); + const nimPeerId = multiAddrWithId.getPeerId(); + + expect(nimPeerId).to.not.be.undefined; + expect(peers).to.includes(nimPeerId); + }); + + it("Relay - dialed after", async function () { + this.timeout(20_000); + nwaku = new Nwaku(makeLogFileName(this)); + await nwaku.start(); + const multiAddrWithId = await nwaku.getMultiaddrWithId(); + + waku = await createWaku({ + staticNoiseKey: NOISE_KEY_1, + }); + await waku.start(); + + const waitPromise = waitForRemotePeer(waku, [Protocols.Relay]); + await delay(1000); + await waku.dial(multiAddrWithId); + await waitPromise; + + // TODO: Should getMeshPeers be used instead? + const peers = waku.relay.getPeers(); + const nimPeerId = multiAddrWithId.getPeerId(); + + expect(nimPeerId).to.not.be.undefined; + expect(peers).includes(nimPeerId); + }); + + it("Relay - times out", function (done) { + this.timeout(5000); + createWaku({ + staticNoiseKey: NOISE_KEY_1, + }) + .then((waku) => waku.start().then(() => waku)) + .then((waku) => { + waitForRemotePeer(waku, [Protocols.Relay], 200).then( + () => { + throw "Promise expected to reject on time out"; + }, + (reason) => { + expect(reason).to.eq("Timed out waiting for a remote peer."); + done(); + } + ); + }); + }); + + it("Store - dialed first", async function () { + this.timeout(20_000); + nwaku = new Nwaku(makeLogFileName(this)); + await nwaku.start({ persistMessages: true }); + const multiAddrWithId = await nwaku.getMultiaddrWithId(); + + waku = await createWaku({ + staticNoiseKey: NOISE_KEY_1, + }); + await waku.start(); + await waku.dial(multiAddrWithId); + await delay(1000); + await waitForRemotePeer(waku, [Protocols.Store]); + + const peers = (await waku.store.peers()).map((peer) => peer.id.toString()); + const nimPeerId = multiAddrWithId.getPeerId(); + + expect(nimPeerId).to.not.be.undefined; + expect(peers.includes(nimPeerId as string)).to.be.true; + }); + + it("Store - dialed after - with timeout", async function () { + this.timeout(20_000); + nwaku = new Nwaku(makeLogFileName(this)); + await nwaku.start({ persistMessages: true }); + const multiAddrWithId = await nwaku.getMultiaddrWithId(); + + waku = await createWaku({ + staticNoiseKey: NOISE_KEY_1, + }); + await waku.start(); + const waitPromise = waitForRemotePeer(waku, [Protocols.Store], 2000); + await delay(1000); + await waku.dial(multiAddrWithId); + await waitPromise; + + const peers = (await waku.store.peers()).map((peer) => peer.id.toString()); + + const nimPeerId = multiAddrWithId.getPeerId(); + + expect(nimPeerId).to.not.be.undefined; + expect(peers.includes(nimPeerId as string)).to.be.true; + }); + + it("LightPush", async function () { + this.timeout(20_000); + nwaku = new Nwaku(makeLogFileName(this)); + await nwaku.start({ lightpush: true }); + const multiAddrWithId = await nwaku.getMultiaddrWithId(); + + waku = await createWaku({ + staticNoiseKey: NOISE_KEY_1, + }); + await waku.start(); + await waku.dial(multiAddrWithId); + await waitForRemotePeer(waku, [Protocols.LightPush]); + + const peers = (await waku.lightPush.peers()).map((peer) => + peer.id.toString() + ); + + const nimPeerId = multiAddrWithId.getPeerId(); + + expect(nimPeerId).to.not.be.undefined; + expect(peers.includes(nimPeerId as string)).to.be.true; + }); + + it("Filter", async function () { + this.timeout(20_000); + nwaku = new Nwaku(makeLogFileName(this)); + await nwaku.start({ filter: true }); + const multiAddrWithId = await nwaku.getMultiaddrWithId(); + + waku = await createWaku({ + staticNoiseKey: NOISE_KEY_1, + }); + await waku.start(); + await waku.dial(multiAddrWithId); + await waitForRemotePeer(waku, [Protocols.Filter]); + + const peers = (await waku.filter.peers()).map((peer) => peer.id.toString()); + + const nimPeerId = multiAddrWithId.getPeerId(); + + expect(nimPeerId).to.not.be.undefined; + expect(peers.includes(nimPeerId as string)).to.be.true; + }); +}); diff --git a/src/lib/wait_for_remote_peer.ts b/src/lib/wait_for_remote_peer.ts new file mode 100644 index 0000000000..1c040c1902 --- /dev/null +++ b/src/lib/wait_for_remote_peer.ts @@ -0,0 +1,144 @@ +import { PeerProtocolsChangeData } from "@libp2p/interface-peer-store"; +import debug from "debug"; + +import { StoreCodecs } from "./constants"; +import { Protocols, Waku } from "./waku"; +import { FilterCodec } from "./waku_filter"; +import { LightPushCodec } from "./waku_light_push"; + +const log = debug("waku:wait-for-remote-peer"); + +/** + * Wait for a remote peer to be ready given the passed protocols. + * Useful when using the [[CreateOptions.bootstrap]] with [[Waku.create]]. + * + * @param waku The Waku Node + * @param protocols The protocols that need to be enabled by remote peers. + * @param timeoutMs A timeout value in milliseconds.. + * + * @returns A promise that **resolves** if all desired protocols are fulfilled by + * remote nodes, **rejects** if the timeoutMs is reached. + * + * @default Remote peer must have Waku Relay enabled and no time out is applied. + */ +export async function waitForRemotePeer( + waku: Waku, + protocols?: Protocols[], + timeoutMs?: number +): Promise { + protocols = protocols ?? [Protocols.Relay]; + + const promises: Promise[] = []; + + if (protocols.includes(Protocols.Relay)) { + const peers = waku.relay.getMeshPeers(waku.relay.pubSubTopic); + + if (peers.length == 0) { + // No peer yet available, wait for a subscription + const promise = new Promise((resolve) => { + // TODO: Remove listeners once done + waku.relay.addEventListener("subscription-change", () => { + // Remote peer subscribed to topic, now wait for a heartbeat + // so that the mesh is updated and the remote peer added to it + waku.relay.addEventListener("gossipsub:heartbeat", () => resolve()); + }); + }); + promises.push(promise); + } + } + + // TODO: This can be factored in one helper function + // Probably need to add a "string" protocol to each class to make it easier + if (protocols.includes(Protocols.Store)) { + const storePromise = (async (): Promise => { + const peers = await waku.store.peers(); + + if (peers.length) { + log("Store peer found: ", peers[0].id.toString()); + return; + } + + await new Promise((resolve) => { + const cb = (evt: CustomEvent): void => { + for (const codec of Object.values(StoreCodecs)) { + if (evt.detail.protocols.includes(codec)) { + log("Resolving for", StoreCodecs, evt.detail.protocols); + waku.libp2p.peerStore.removeEventListener("change:protocols", cb); + resolve(); + break; + } + } + }; + waku.libp2p.peerStore.addEventListener("change:protocols", cb); + }); + })(); + promises.push(storePromise); + } + + if (protocols.includes(Protocols.LightPush)) { + const lightPushPromise = (async (): Promise => { + const peers = await waku.lightPush.peers(); + + if (peers.length) { + log("Light Push peer found: ", peers[0].id.toString()); + return; + } + + await new Promise((resolve) => { + const cb = (evt: CustomEvent): void => { + if (evt.detail.protocols.includes(LightPushCodec)) { + log("Resolving for", LightPushCodec, evt.detail.protocols); + waku.libp2p.peerStore.removeEventListener("change:protocols", cb); + resolve(); + } + }; + waku.libp2p.peerStore.addEventListener("change:protocols", cb); + }); + })(); + promises.push(lightPushPromise); + } + + if (protocols.includes(Protocols.Filter)) { + const filterPromise = (async (): Promise => { + const peers = await waku.filter.peers(); + + if (peers.length) { + log("Filter peer found: ", peers[0].id.toString()); + return; + } + + await new Promise((resolve) => { + const cb = (evt: CustomEvent): void => { + if (evt.detail.protocols.includes(FilterCodec)) { + log("Resolving for", FilterCodec, evt.detail.protocols); + waku.libp2p.peerStore.removeEventListener("change:protocols", cb); + resolve(); + } + }; + waku.libp2p.peerStore.addEventListener("change:protocols", cb); + }); + })(); + promises.push(filterPromise); + } + + if (timeoutMs) { + await rejectOnTimeout( + Promise.all(promises), + timeoutMs, + "Timed out waiting for a remote peer." + ); + } else { + await Promise.all(promises); + } +} + +const awaitTimeout = (ms: number, rejectReason: string): Promise => + new Promise((_resolve, reject) => setTimeout(() => reject(rejectReason), ms)); + +async function rejectOnTimeout( + promise: Promise, + timeoutMs: number, + rejectReason: string +): Promise { + await Promise.race([promise, awaitTimeout(timeoutMs, rejectReason)]); +} diff --git a/src/lib/waku.node.spec.ts b/src/lib/waku.node.spec.ts index 40986c2c45..76fae32f26 100644 --- a/src/lib/waku.node.spec.ts +++ b/src/lib/waku.node.spec.ts @@ -7,9 +7,9 @@ import { NOISE_KEY_2, Nwaku, } from "../test_utils/"; -import { delay } from "../test_utils/delay"; import { generateSymmetricKey } from "./crypto"; +import { waitForRemotePeer } from "./wait_for_remote_peer"; import { createWaku, Protocols, Waku } from "./waku"; import { WakuMessage } from "./waku_message"; @@ -36,7 +36,7 @@ describe("Waku Dial [node only]", function () { }); await waku.start(); await waku.dial(multiAddrWithId); - await waku.waitForRemotePeer([Protocols.Relay]); + await waitForRemotePeer(waku, [Protocols.Relay]); const nimPeerId = await nwaku.getPeerId(); expect(await waku.libp2p.peerStore.has(nimPeerId)).to.be.true; @@ -135,8 +135,8 @@ describe("Decryption Keys", () => { ); await Promise.all([ - waku1.waitForRemotePeer([Protocols.Relay]), - waku2.waitForRemotePeer([Protocols.Relay]), + waitForRemotePeer(waku1, [Protocols.Relay]), + waitForRemotePeer(waku2, [Protocols.Relay]), ]); }); @@ -177,167 +177,3 @@ describe("Decryption Keys", () => { expect(receivedMsg.timestamp?.valueOf()).to.eq(messageTimestamp.valueOf()); }); }); - -describe("Wait for remote peer / get peers", function () { - let waku: Waku; - let nwaku: Nwaku | undefined; - - afterEach(async function () { - if (nwaku) { - nwaku.stop(); - nwaku = undefined; - } - !!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e)); - }); - - it("Relay - dialed first", async function () { - this.timeout(20_000); - nwaku = new Nwaku(makeLogFileName(this)); - await nwaku.start(); - const multiAddrWithId = await nwaku.getMultiaddrWithId(); - - waku = await createWaku({ - staticNoiseKey: NOISE_KEY_1, - }); - await waku.start(); - await waku.dial(multiAddrWithId); - await delay(1000); - await waku.waitForRemotePeer([Protocols.Relay]); - const peers = waku.relay.getPeers(); - const nimPeerId = multiAddrWithId.getPeerId(); - - expect(nimPeerId).to.not.be.undefined; - expect(peers).to.includes(nimPeerId); - }); - - it("Relay - dialed after", async function () { - this.timeout(20_000); - nwaku = new Nwaku(makeLogFileName(this)); - await nwaku.start(); - const multiAddrWithId = await nwaku.getMultiaddrWithId(); - - waku = await createWaku({ - staticNoiseKey: NOISE_KEY_1, - }); - await waku.start(); - - const waitPromise = waku.waitForRemotePeer([Protocols.Relay]); - await delay(1000); - await waku.dial(multiAddrWithId); - await waitPromise; - - // TODO: Should getMeshPeers be used instead? - const peers = waku.relay.getPeers(); - const nimPeerId = multiAddrWithId.getPeerId(); - - expect(nimPeerId).to.not.be.undefined; - expect(peers).includes(nimPeerId); - }); - - it("Relay - times out", function (done) { - this.timeout(5000); - createWaku({ - staticNoiseKey: NOISE_KEY_1, - }) - .then((waku) => waku.start().then(() => waku)) - .then((waku) => { - waku.waitForRemotePeer([Protocols.Relay], 200).then( - () => { - throw "Promise expected to reject on time out"; - }, - (reason) => { - expect(reason).to.eq("Timed out waiting for a remote peer."); - done(); - } - ); - }); - }); - - it("Store - dialed first", async function () { - this.timeout(20_000); - nwaku = new Nwaku(makeLogFileName(this)); - await nwaku.start({ persistMessages: true }); - const multiAddrWithId = await nwaku.getMultiaddrWithId(); - - waku = await createWaku({ - staticNoiseKey: NOISE_KEY_1, - }); - await waku.start(); - await waku.dial(multiAddrWithId); - await delay(1000); - await waku.waitForRemotePeer([Protocols.Store]); - - const peers = (await waku.store.peers()).map((peer) => peer.id.toString()); - const nimPeerId = multiAddrWithId.getPeerId(); - - expect(nimPeerId).to.not.be.undefined; - expect(peers.includes(nimPeerId as string)).to.be.true; - }); - - it("Store - dialed after - with timeout", async function () { - this.timeout(20_000); - nwaku = new Nwaku(makeLogFileName(this)); - await nwaku.start({ persistMessages: true }); - const multiAddrWithId = await nwaku.getMultiaddrWithId(); - - waku = await createWaku({ - staticNoiseKey: NOISE_KEY_1, - }); - await waku.start(); - const waitPromise = waku.waitForRemotePeer([Protocols.Store], 2000); - await delay(1000); - await waku.dial(multiAddrWithId); - await waitPromise; - - const peers = (await waku.store.peers()).map((peer) => peer.id.toString()); - - const nimPeerId = multiAddrWithId.getPeerId(); - - expect(nimPeerId).to.not.be.undefined; - expect(peers.includes(nimPeerId as string)).to.be.true; - }); - - it("LightPush", async function () { - this.timeout(20_000); - nwaku = new Nwaku(makeLogFileName(this)); - await nwaku.start({ lightpush: true }); - const multiAddrWithId = await nwaku.getMultiaddrWithId(); - - waku = await createWaku({ - staticNoiseKey: NOISE_KEY_1, - }); - await waku.start(); - await waku.dial(multiAddrWithId); - await waku.waitForRemotePeer([Protocols.LightPush]); - - const peers = (await waku.lightPush.peers()).map((peer) => - peer.id.toString() - ); - - const nimPeerId = multiAddrWithId.getPeerId(); - - expect(nimPeerId).to.not.be.undefined; - expect(peers.includes(nimPeerId as string)).to.be.true; - }); - - it("Filter", async function () { - this.timeout(20_000); - nwaku = new Nwaku(makeLogFileName(this)); - await nwaku.start({ filter: true }); - const multiAddrWithId = await nwaku.getMultiaddrWithId(); - - waku = await createWaku({ - staticNoiseKey: NOISE_KEY_1, - }); - await waku.start(); - await waku.dial(multiAddrWithId); - await waku.waitForRemotePeer([Protocols.Filter]); - - const peers = (await waku.filter.peers()).map((peer) => peer.id.toString()); - - const nimPeerId = multiAddrWithId.getPeerId(); - - expect(nimPeerId).to.not.be.undefined; - expect(peers.includes(nimPeerId as string)).to.be.true; - }); -}); diff --git a/src/lib/waku.ts b/src/lib/waku.ts index 6676d8db39..dc44e705b7 100644 --- a/src/lib/waku.ts +++ b/src/lib/waku.ts @@ -1,7 +1,6 @@ import { Noise } from "@chainsafe/libp2p-noise"; import type { Stream } from "@libp2p/interface-connection"; import type { PeerId } from "@libp2p/interface-peer-id"; -import { PeerProtocolsChangeData } from "@libp2p/interface-peer-store"; import { Mplex } from "@libp2p/mplex"; import { peerIdFromString } from "@libp2p/peer-id"; import { WebSockets } from "@libp2p/websockets"; @@ -276,131 +275,6 @@ export class Waku { return localMultiaddr + "/p2p/" + this.libp2p.peerId.toString(); } - /** - * Wait for a remote peer to be ready given the passed protocols. - * Useful when using the [[CreateOptions.bootstrap]] with [[Waku.create]]. - * - * @param protocols The protocols that need to be enabled by remote peers. - * @param timeoutMs A timeout value in milliseconds.. - * - * @returns A promise that **resolves** if all desired protocols are fulfilled by - * remote nodes, **rejects** if the timeoutMs is reached. - * - * @default Remote peer must have Waku Relay enabled and no time out is applied. - */ - async waitForRemotePeer( - protocols?: Protocols[], - timeoutMs?: number - ): Promise { - protocols = protocols ?? [Protocols.Relay]; - - const promises: Promise[] = []; - - if (protocols.includes(Protocols.Relay)) { - const peers = this.relay.getMeshPeers(this.relay.pubSubTopic); - - if (peers.length == 0) { - // No peer yet available, wait for a subscription - const promise = new Promise((resolve) => { - // TODO: Remove listeners once done - this.relay.addEventListener("subscription-change", () => { - // Remote peer subscribed to topic, now wait for a heartbeat - // so that the mesh is updated and the remote peer added to it - this.relay.addEventListener("gossipsub:heartbeat", () => resolve()); - }); - }); - promises.push(promise); - } - } - - // TODO: This can be factored in one helper function - // Probably need to add a "string" protocol to each class to make it easier - if (protocols.includes(Protocols.Store)) { - const storePromise = (async (): Promise => { - const peers = await this.store.peers(); - - if (peers.length) { - log("Store peer found: ", peers[0].id.toString()); - return; - } - - await new Promise((resolve) => { - const cb = (evt: CustomEvent): void => { - for (const codec of Object.values(StoreCodecs)) { - if (evt.detail.protocols.includes(codec)) { - log("Resolving for", StoreCodecs, evt.detail.protocols); - this.libp2p.peerStore.removeEventListener( - "change:protocols", - cb - ); - resolve(); - break; - } - } - }; - this.libp2p.peerStore.addEventListener("change:protocols", cb); - }); - })(); - promises.push(storePromise); - } - - if (protocols.includes(Protocols.LightPush)) { - const lightPushPromise = (async (): Promise => { - const peers = await this.lightPush.peers(); - - if (peers.length) { - log("Light Push peer found: ", peers[0].id.toString()); - return; - } - - await new Promise((resolve) => { - const cb = (evt: CustomEvent): void => { - if (evt.detail.protocols.includes(LightPushCodec)) { - log("Resolving for", LightPushCodec, evt.detail.protocols); - this.libp2p.peerStore.removeEventListener("change:protocols", cb); - resolve(); - } - }; - this.libp2p.peerStore.addEventListener("change:protocols", cb); - }); - })(); - promises.push(lightPushPromise); - } - - if (protocols.includes(Protocols.Filter)) { - const filterPromise = (async (): Promise => { - const peers = await this.filter.peers(); - - if (peers.length) { - log("Filter peer found: ", peers[0].id.toString()); - return; - } - - await new Promise((resolve) => { - const cb = (evt: CustomEvent): void => { - if (evt.detail.protocols.includes(FilterCodec)) { - log("Resolving for", FilterCodec, evt.detail.protocols); - this.libp2p.peerStore.removeEventListener("change:protocols", cb); - resolve(); - } - }; - this.libp2p.peerStore.addEventListener("change:protocols", cb); - }); - })(); - promises.push(filterPromise); - } - - if (timeoutMs) { - await rejectOnTimeout( - Promise.all(promises), - timeoutMs, - "Timed out waiting for a remote peer." - ); - } else { - await Promise.all(promises); - } - } - private startKeepAlive( peerId: PeerId, pingPeriodSecs: number, @@ -454,14 +328,3 @@ export class Waku { this.relayKeepAliveTimers = {}; } } - -const awaitTimeout = (ms: number, rejectReason: string): Promise => - new Promise((_resolve, reject) => setTimeout(() => reject(rejectReason), ms)); - -async function rejectOnTimeout( - promise: Promise, - timeoutMs: number, - rejectReason: string -): Promise { - await Promise.race([promise, awaitTimeout(timeoutMs, rejectReason)]); -} diff --git a/src/lib/waku_filter/index.node.spec.ts b/src/lib/waku_filter/index.node.spec.ts index ff15aadcf2..05054e31f3 100644 --- a/src/lib/waku_filter/index.node.spec.ts +++ b/src/lib/waku_filter/index.node.spec.ts @@ -3,6 +3,7 @@ import debug from "debug"; import { makeLogFileName, NOISE_KEY_1, Nwaku } from "../../test_utils"; import { delay } from "../../test_utils/delay"; +import { waitForRemotePeer } from "../wait_for_remote_peer"; import { createWaku, Protocols, Waku } from "../waku"; import { WakuMessage } from "../waku_message"; @@ -29,7 +30,7 @@ describe("Waku Filter", () => { }); await waku.start(); await waku.dial(await nwaku.getMultiaddrWithId()); - await waku.waitForRemotePeer([Protocols.Filter, Protocols.LightPush]); + await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]); }); it("creates a subscription", async function () { diff --git a/src/lib/waku_light_push/index.node.spec.ts b/src/lib/waku_light_push/index.node.spec.ts index 2d5e888f55..d67a59c223 100644 --- a/src/lib/waku_light_push/index.node.spec.ts +++ b/src/lib/waku_light_push/index.node.spec.ts @@ -3,6 +3,7 @@ import debug from "debug"; import { makeLogFileName, NOISE_KEY_1, Nwaku } from "../../test_utils"; import { delay } from "../../test_utils/delay"; +import { waitForRemotePeer } from "../wait_for_remote_peer"; import { createWaku, Protocols, Waku } from "../waku"; import { WakuMessage } from "../waku_message"; @@ -30,7 +31,7 @@ describe("Waku Light Push [node only]", () => { }); await waku.start(); await waku.dial(await nwaku.getMultiaddrWithId()); - await waku.waitForRemotePeer([Protocols.LightPush]); + await waitForRemotePeer(waku, [Protocols.LightPush]); const messageText = "Light Push works!"; const message = await WakuMessage.fromUtf8String( @@ -67,7 +68,7 @@ describe("Waku Light Push [node only]", () => { }); await waku.start(); await waku.dial(await nwaku.getMultiaddrWithId()); - await waku.waitForRemotePeer([Protocols.LightPush]); + await waitForRemotePeer(waku, [Protocols.LightPush]); const nimPeerId = await nwaku.getPeerId(); diff --git a/src/lib/waku_message/index.node.spec.ts b/src/lib/waku_message/index.node.spec.ts index db23835753..e26843b505 100644 --- a/src/lib/waku_message/index.node.spec.ts +++ b/src/lib/waku_message/index.node.spec.ts @@ -14,6 +14,7 @@ import { getPublicKey, } from "../crypto"; import { bytesToHex, bytesToUtf8, hexToBytes, utf8ToBytes } from "../utils"; +import { waitForRemotePeer } from "../wait_for_remote_peer"; import { createWaku, Protocols, Waku } from "../waku"; import { DecryptionMethod, WakuMessage } from "./index"; @@ -41,7 +42,7 @@ describe("Waku Message [node only]", function () { dbg("Dialing to nwaku node"); await waku.dial(await nwaku.getMultiaddrWithId()); dbg("Wait for remote peer"); - await waku.waitForRemotePeer([Protocols.Relay]); + await waitForRemotePeer(waku, [Protocols.Relay]); dbg("Remote peer ready"); // As this test uses the nwaku RPC API, we somehow often face // Race conditions where the nwaku node does not have the js-waku diff --git a/src/lib/waku_relay/index.node.spec.ts b/src/lib/waku_relay/index.node.spec.ts index e2a99665b8..d99ee00a5c 100644 --- a/src/lib/waku_relay/index.node.spec.ts +++ b/src/lib/waku_relay/index.node.spec.ts @@ -15,6 +15,7 @@ import { generateSymmetricKey, getPublicKey, } from "../crypto"; +import { waitForRemotePeer } from "../wait_for_remote_peer"; import { createWaku, Protocols, Waku } from "../waku"; import { DecryptionMethod, WakuMessage } from "../waku_message"; @@ -55,8 +56,8 @@ describe("Waku Relay [node only]", () => { log("Wait for mutual pubsub subscription"); await Promise.all([ - waku1.waitForRemotePeer([Protocols.Relay]), - waku2.waitForRemotePeer([Protocols.Relay]), + waitForRemotePeer(waku1, [Protocols.Relay]), + waitForRemotePeer(waku2, [Protocols.Relay]), ]); log("before each hook done"); }); @@ -284,8 +285,8 @@ describe("Waku Relay [node only]", () => { ); await Promise.all([ - waku1.waitForRemotePeer([Protocols.Relay]), - waku2.waitForRemotePeer([Protocols.Relay]), + waitForRemotePeer(waku1, [Protocols.Relay]), + waitForRemotePeer(waku2, [Protocols.Relay]), // No subscription change expected for Waku 3 ]); @@ -334,7 +335,7 @@ describe("Waku Relay [node only]", () => { await nwaku.start(); await waku.dial(await nwaku.getMultiaddrWithId()); - await waku.waitForRemotePeer([Protocols.Relay]); + await waitForRemotePeer(waku, [Protocols.Relay]); }); afterEach(async function () { @@ -437,8 +438,8 @@ describe("Waku Relay [node only]", () => { // Wait for identify protocol to finish await Promise.all([ - waku1.waitForRemotePeer([Protocols.Relay]), - waku2.waitForRemotePeer([Protocols.Relay]), + waitForRemotePeer(waku1, [Protocols.Relay]), + waitForRemotePeer(waku2, [Protocols.Relay]), ]); await delay(2000); diff --git a/src/lib/waku_store/index.node.spec.ts b/src/lib/waku_store/index.node.spec.ts index 2a99e5c896..e814819865 100644 --- a/src/lib/waku_store/index.node.spec.ts +++ b/src/lib/waku_store/index.node.spec.ts @@ -12,6 +12,7 @@ import { generateSymmetricKey, getPublicKey, } from "../crypto"; +import { waitForRemotePeer } from "../wait_for_remote_peer"; import { createWaku, Protocols, Waku } from "../waku"; import { DecryptionMethod, WakuMessage } from "../waku_message"; @@ -51,7 +52,7 @@ describe("Waku Store", () => { }); await waku.start(); await waku.dial(await nwaku.getMultiaddrWithId()); - await waku.waitForRemotePeer([Protocols.Store]); + await waitForRemotePeer(waku, [Protocols.Store]); const messages = await waku.store.queryHistory([]); expect(messages?.length).eq(2); @@ -84,7 +85,7 @@ describe("Waku Store", () => { }); await waku.start(); await waku.dial(await nwaku.getMultiaddrWithId()); - await waku.waitForRemotePeer([Protocols.Store]); + await waitForRemotePeer(waku, [Protocols.Store]); let messages: WakuMessage[] = []; @@ -124,7 +125,7 @@ describe("Waku Store", () => { }); await waku.start(); await waku.dial(await nwaku.getMultiaddrWithId()); - await waku.waitForRemotePeer([Protocols.Store]); + await waitForRemotePeer(waku, [Protocols.Store]); let messages: WakuMessage[] = []; const desiredMsgs = 14; @@ -161,7 +162,7 @@ describe("Waku Store", () => { }); await waku.start(); await waku.dial(await nwaku.getMultiaddrWithId()); - await waku.waitForRemotePeer([Protocols.Store]); + await waitForRemotePeer(waku, [Protocols.Store]); const messages = await waku.store.queryHistory([], { pageDirection: PageDirection.FORWARD, @@ -201,7 +202,7 @@ describe("Waku Store", () => { }); await waku.start(); await waku.dial(await nwaku.getMultiaddrWithId()); - await waku.waitForRemotePeer([Protocols.Store]); + await waitForRemotePeer(waku, [Protocols.Store]); const nimPeerId = await nwaku.getPeerId(); @@ -282,7 +283,7 @@ describe("Waku Store", () => { dbg("Waku nodes connected to nwaku"); - await waku1.waitForRemotePeer([Protocols.LightPush]); + await waitForRemotePeer(waku1, [Protocols.LightPush]); dbg("Sending messages using light push"); await Promise.all([ @@ -292,7 +293,7 @@ describe("Waku Store", () => { waku1.lightPush.push(clearMessage), ]); - await waku2.waitForRemotePeer([Protocols.Store]); + await waitForRemotePeer(waku2, [Protocols.Store]); waku2.store.addDecryptionKey(symKey); @@ -386,7 +387,7 @@ describe("Waku Store", () => { dbg("Waku nodes connected to nwaku"); - await waku1.waitForRemotePeer([Protocols.LightPush]); + await waitForRemotePeer(waku1, [Protocols.LightPush]); dbg("Sending messages using light push"); await Promise.all([ @@ -396,7 +397,7 @@ describe("Waku Store", () => { waku1.lightPush.push(clearMessage), ]); - await waku2.waitForRemotePeer([Protocols.Store]); + await waitForRemotePeer(waku2, [Protocols.Store]); waku2.addDecryptionKey(symKey, { contentTopics: [encryptedSymmetricContentTopic], @@ -460,7 +461,7 @@ describe("Waku Store", () => { }); await waku.start(); await waku.dial(await nwaku.getMultiaddrWithId()); - await waku.waitForRemotePeer([Protocols.Store]); + await waitForRemotePeer(waku, [Protocols.Store]); const nwakuPeerId = await nwaku.getPeerId();