From 4b43db6daaf0e04951f7632e5d854be4b2abea77 Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Mon, 16 May 2022 15:47:54 +1000 Subject: [PATCH 1/4] Change default to wait for Relay only Store is not used by all applications, also, note all nodes have store enabled. Default value should prefer working software over feature complexity. --- CHANGELOG.md | 4 ++++ src/lib/waku.ts | 4 ++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b475c7fb41..4293fef6ff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changed + +- `waitForRemotePeer` waits for a Relay peer by default instead of Relay and Store. + ## [0.23.0] - 2022-05-19 ### Added diff --git a/src/lib/waku.ts b/src/lib/waku.ts index 76b6a3b27a..ace3238d56 100644 --- a/src/lib/waku.ts +++ b/src/lib/waku.ts @@ -329,10 +329,10 @@ export class Waku { * Wait for a remote peer to be ready given the passed protocols. * Useful when using the [[CreateOptions.bootstrap]] with [[Waku.create]]. * - * @default Remote peer must have Waku Store and Waku Relay enabled. + * @default Remote peer must have Waku Relay enabled. */ async waitForRemotePeer(protocols?: Protocols[]): Promise { - const desiredProtocols = protocols ?? [Protocols.Relay, Protocols.Store]; + const desiredProtocols = protocols ?? [Protocols.Relay]; const promises = []; From d9940f4d9bbaa095212efded81deee3058c25337 Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Tue, 17 May 2022 18:23:50 +1000 Subject: [PATCH 2/4] Simplify async iterator usage The iterator will return only once there is a first item available. Hence, there is no need to add an observer on `change:protocols` event. --- src/lib/waku.node.spec.ts | 61 +++++++++++++++++++++++++++++++++------ src/lib/waku.ts | 61 +++++++++------------------------------ 2 files changed, 66 insertions(+), 56 deletions(-) diff --git a/src/lib/waku.node.spec.ts b/src/lib/waku.node.spec.ts index 437573bb99..96d80742bc 100644 --- a/src/lib/waku.node.spec.ts +++ b/src/lib/waku.node.spec.ts @@ -1,5 +1,4 @@ import { expect } from "chai"; -import debug from "debug"; import PeerId from "peer-id"; import { @@ -8,13 +7,12 @@ import { NOISE_KEY_2, Nwaku, } from "../test_utils/"; +import { delay } from "../test_utils/delay"; import { Protocols, Waku } from "./waku"; import { WakuMessage } from "./waku_message"; import { generateSymmetricKey } from "./waku_message/version_1"; -const dbg = debug("waku:test"); - const TestContentTopic = "/test/1/waku/utf8"; describe("Waku Dial [node only]", function () { @@ -175,21 +173,18 @@ describe("Wait for remote peer / get peers", function () { !!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e)); }); - it("Relay", async function () { + it("Relay - dialed first", async function () { this.timeout(20_000); nwaku = new Nwaku(makeLogFileName(this)); await nwaku.start(); const multiAddrWithId = await nwaku.getMultiaddrWithId(); - dbg("Create"); waku = await Waku.create({ staticNoiseKey: NOISE_KEY_1, }); - dbg("Dial"); await waku.dial(multiAddrWithId); - dbg("waitForRemotePeer"); + await delay(1000); await waku.waitForRemotePeer([Protocols.Relay]); - dbg("Done, get peers"); const peers = waku.relay.getPeers(); const nimPeerId = multiAddrWithId.getPeerId(); @@ -197,7 +192,29 @@ describe("Wait for remote peer / get peers", function () { expect(peers.has(nimPeerId as string)).to.be.true; }); - it("Store", async function () { + it("Relay - dialed after", async function () { + this.timeout(20_000); + nwaku = new Nwaku(makeLogFileName(this)); + await nwaku.start(); + const multiAddrWithId = await nwaku.getMultiaddrWithId(); + + waku = await Waku.create({ + staticNoiseKey: NOISE_KEY_1, + }); + + const waitPromise = waku.waitForRemotePeer([Protocols.Relay]); + await delay(1000); + await waku.dial(multiAddrWithId); + await waitPromise; + + const peers = waku.relay.getPeers(); + const nimPeerId = multiAddrWithId.getPeerId(); + + expect(nimPeerId).to.not.be.undefined; + expect(peers.has(nimPeerId as string)).to.be.true; + }); + + it("Store - dialed first", async function () { this.timeout(20_000); nwaku = new Nwaku(makeLogFileName(this)); await nwaku.start({ persistMessages: true }); @@ -207,6 +224,7 @@ describe("Wait for remote peer / get peers", function () { staticNoiseKey: NOISE_KEY_1, }); await waku.dial(multiAddrWithId); + await delay(1000); await waku.waitForRemotePeer([Protocols.Store]); const peers = []; @@ -220,6 +238,31 @@ describe("Wait for remote peer / get peers", function () { expect(peers.includes(nimPeerId as string)).to.be.true; }); + it("Store - dialed after", async function () { + this.timeout(20_000); + nwaku = new Nwaku(makeLogFileName(this)); + await nwaku.start({ persistMessages: true }); + const multiAddrWithId = await nwaku.getMultiaddrWithId(); + + waku = await Waku.create({ + staticNoiseKey: NOISE_KEY_1, + }); + const waitPromise = waku.waitForRemotePeer([Protocols.Store]); + await delay(1000); + await waku.dial(multiAddrWithId); + await waitPromise; + + const peers = []; + for await (const peer of waku.store.peers) { + peers.push(peer.id.toB58String()); + } + + const nimPeerId = multiAddrWithId.getPeerId(); + + expect(nimPeerId).to.not.be.undefined; + expect(peers.includes(nimPeerId as string)).to.be.true; + }); + it("LightPush", async function () { this.timeout(20_000); nwaku = new Nwaku(makeLogFileName(this)); diff --git a/src/lib/waku.ts b/src/lib/waku.ts index ace3238d56..5a06c91f3b 100644 --- a/src/lib/waku.ts +++ b/src/lib/waku.ts @@ -353,56 +353,23 @@ export class Waku { } if (desiredProtocols.includes(Protocols.Store)) { - let storePeerFound = false; - - for await (const _peer of this.store.peers) { - storePeerFound = true; - break; - } - - if (!storePeerFound) { - // No peer available for this protocol, waiting to connect to one. - const promise = new Promise((resolve) => { - this.libp2p.peerStore.on( - "change:protocols", - ({ protocols: connectedPeerProtocols }) => { - for (const codec of Object.values(StoreCodecs)) { - if (connectedPeerProtocols.includes(codec)) { - dbg("Resolving for", codec, connectedPeerProtocols); - resolve(); - } - } - } - ); - }); - promises.push(promise); - } + const storePromise = (async (): Promise => { + for await (const peer of this.store.peers) { + dbg("Store peer found", peer.id.toB58String()); + break; + } + })(); + promises.push(storePromise); } if (desiredProtocols.includes(Protocols.LightPush)) { - let lightPushPeerFound = false; - - for await (const _peer of this.lightPush.peers) { - lightPushPeerFound = true; - break; - } - - if (!lightPushPeerFound) { - // No peer available for this protocol, waiting to connect to one. - const promise = new Promise((resolve) => { - this.libp2p.peerStore.on( - "change:protocols", - ({ protocols: connectedPeerProtocols }) => { - if (connectedPeerProtocols.includes(LightPushCodec)) { - dbg("Resolving for", LightPushCodec, connectedPeerProtocols); - resolve(); - } - } - ); - }); - - promises.push(promise); - } + const lightPushPromise = (async (): Promise => { + for await (const peer of this.lightPush.peers) { + dbg("Light Push peer found", peer.id.toB58String()); + break; + } + })(); + promises.push(lightPushPromise); } await Promise.all(promises); From 06930c15dcbae7a54c3e75564749efca7639e1bd Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Tue, 17 May 2022 18:38:43 +1000 Subject: [PATCH 3/4] Remove unneeded variable --- src/lib/waku.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/lib/waku.ts b/src/lib/waku.ts index 5a06c91f3b..fb20d16249 100644 --- a/src/lib/waku.ts +++ b/src/lib/waku.ts @@ -332,11 +332,11 @@ export class Waku { * @default Remote peer must have Waku Relay enabled. */ async waitForRemotePeer(protocols?: Protocols[]): Promise { - const desiredProtocols = protocols ?? [Protocols.Relay]; + protocols = protocols ?? [Protocols.Relay]; const promises = []; - if (desiredProtocols.includes(Protocols.Relay)) { + if (protocols.includes(Protocols.Relay)) { const peers = this.relay.getPeers(); if (peers.size == 0) { @@ -352,7 +352,7 @@ export class Waku { } } - if (desiredProtocols.includes(Protocols.Store)) { + if (protocols.includes(Protocols.Store)) { const storePromise = (async (): Promise => { for await (const peer of this.store.peers) { dbg("Store peer found", peer.id.toB58String()); @@ -362,7 +362,7 @@ export class Waku { promises.push(storePromise); } - if (desiredProtocols.includes(Protocols.LightPush)) { + if (protocols.includes(Protocols.LightPush)) { const lightPushPromise = (async (): Promise => { for await (const peer of this.lightPush.peers) { dbg("Light Push peer found", peer.id.toB58String()); From aad678a7083c8df75b8e3e00d895b1800716ff45 Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Wed, 18 May 2022 15:55:08 +1000 Subject: [PATCH 4/4] Add a timeout to `waitForRemotePeer` --- CHANGELOG.md | 4 ++++ src/lib/waku.node.spec.ts | 28 ++++++++++++++++++++++++---- src/lib/waku.ts | 35 +++++++++++++++++++++++++++++++---- 3 files changed, 59 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4293fef6ff..df498dae70 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- `waitForRemotePeer` now accepts a `timeoutMs` parameter that rejects the promise if it is reached. By default, no timeout is applied. + ### Changed - `waitForRemotePeer` waits for a Relay peer by default instead of Relay and Store. diff --git a/src/lib/waku.node.spec.ts b/src/lib/waku.node.spec.ts index 96d80742bc..81f3b25d53 100644 --- a/src/lib/waku.node.spec.ts +++ b/src/lib/waku.node.spec.ts @@ -166,10 +166,13 @@ describe("Decryption Keys", () => { describe("Wait for remote peer / get peers", function () { let waku: Waku; - let nwaku: Nwaku; + let nwaku: Nwaku | undefined; afterEach(async function () { - !!nwaku && nwaku.stop(); + if (nwaku) { + nwaku.stop(); + nwaku = undefined; + } !!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e)); }); @@ -214,6 +217,23 @@ describe("Wait for remote peer / get peers", function () { expect(peers.has(nimPeerId as string)).to.be.true; }); + it("Relay - times out", function (done) { + this.timeout(5000); + Waku.create({ + staticNoiseKey: NOISE_KEY_1, + }).then((waku) => { + waku.waitForRemotePeer([Protocols.Relay], 200).then( + () => { + throw "Promise expected to reject on time out"; + }, + (reason) => { + expect(reason).to.eq("Timed out waiting for a remote peer."); + done(); + } + ); + }); + }); + it("Store - dialed first", async function () { this.timeout(20_000); nwaku = new Nwaku(makeLogFileName(this)); @@ -238,7 +258,7 @@ describe("Wait for remote peer / get peers", function () { expect(peers.includes(nimPeerId as string)).to.be.true; }); - it("Store - dialed after", async function () { + it("Store - dialed after - with timeout", async function () { this.timeout(20_000); nwaku = new Nwaku(makeLogFileName(this)); await nwaku.start({ persistMessages: true }); @@ -247,7 +267,7 @@ describe("Wait for remote peer / get peers", function () { waku = await Waku.create({ staticNoiseKey: NOISE_KEY_1, }); - const waitPromise = waku.waitForRemotePeer([Protocols.Store]); + const waitPromise = waku.waitForRemotePeer([Protocols.Store], 2000); await delay(1000); await waku.dial(multiAddrWithId); await waitPromise; diff --git a/src/lib/waku.ts b/src/lib/waku.ts index fb20d16249..bd82fe3e76 100644 --- a/src/lib/waku.ts +++ b/src/lib/waku.ts @@ -329,12 +329,21 @@ export class Waku { * Wait for a remote peer to be ready given the passed protocols. * Useful when using the [[CreateOptions.bootstrap]] with [[Waku.create]]. * - * @default Remote peer must have Waku Relay enabled. + * @param protocols The protocols that need to be enabled by remote peers. + * @param timeoutMs A timeout value in milliseconds.. + * + * @returns A promise that **resolves** if all desired protocols are fulfilled by + * remote nodes, **rejects** if the timeoutMs is reached. + * + * @default Remote peer must have Waku Relay enabled and no time out is applied. */ - async waitForRemotePeer(protocols?: Protocols[]): Promise { + async waitForRemotePeer( + protocols?: Protocols[], + timeoutMs?: number + ): Promise { protocols = protocols ?? [Protocols.Relay]; - const promises = []; + const promises: Promise[] = []; if (protocols.includes(Protocols.Relay)) { const peers = this.relay.getPeers(); @@ -372,7 +381,15 @@ export class Waku { promises.push(lightPushPromise); } - await Promise.all(promises); + if (timeoutMs) { + await rejectOnTimeout( + Promise.all(promises), + timeoutMs, + "Timed out waiting for a remote peer." + ); + } else { + await Promise.all(promises); + } } private startKeepAlive( @@ -417,3 +434,13 @@ export class Waku { } } } + +const awaitTimeout = (ms: number, rejectReason: string): Promise => + new Promise((_resolve, reject) => setTimeout(() => reject(rejectReason), ms)); + +const rejectOnTimeout = ( + promise: Promise, + timeoutMs: number, + rejectReason: string +): Promise => + Promise.race([promise, awaitTimeout(timeoutMs, rejectReason)]);