Merge pull request #744 from status-im/improve-error

Improve `waitForRemotePeer` API
This commit is contained in:
Franck R 2022-05-19 16:30:38 +10:00 committed by GitHub
commit d267d44b3b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 131 additions and 66 deletions

View File

@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
### Added
- `waitForRemotePeer` now accepts a `timeoutMs` parameter that rejects the promise if it is reached. By default, no timeout is applied.
### Changed
- `waitForRemotePeer` waits for a Relay peer by default instead of Relay and Store.
## [0.23.0] - 2022-05-19
### Added

View File

@ -1,5 +1,4 @@
import { expect } from "chai";
import debug from "debug";
import PeerId from "peer-id";
import {
@ -8,13 +7,12 @@ import {
NOISE_KEY_2,
Nwaku,
} from "../test_utils/";
import { delay } from "../test_utils/delay";
import { Protocols, Waku } from "./waku";
import { WakuMessage } from "./waku_message";
import { generateSymmetricKey } from "./waku_message/version_1";
const dbg = debug("waku:test");
const TestContentTopic = "/test/1/waku/utf8";
describe("Waku Dial [node only]", function () {
@ -168,28 +166,28 @@ describe("Decryption Keys", () => {
describe("Wait for remote peer / get peers", function () {
let waku: Waku;
let nwaku: Nwaku;
let nwaku: Nwaku | undefined;
afterEach(async function () {
!!nwaku && nwaku.stop();
if (nwaku) {
nwaku.stop();
nwaku = undefined;
}
!!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e));
});
it("Relay", async function () {
it("Relay - dialed first", async function () {
this.timeout(20_000);
nwaku = new Nwaku(makeLogFileName(this));
await nwaku.start();
const multiAddrWithId = await nwaku.getMultiaddrWithId();
dbg("Create");
waku = await Waku.create({
staticNoiseKey: NOISE_KEY_1,
});
dbg("Dial");
await waku.dial(multiAddrWithId);
dbg("waitForRemotePeer");
await delay(1000);
await waku.waitForRemotePeer([Protocols.Relay]);
dbg("Done, get peers");
const peers = waku.relay.getPeers();
const nimPeerId = multiAddrWithId.getPeerId();
@ -197,7 +195,46 @@ describe("Wait for remote peer / get peers", function () {
expect(peers.has(nimPeerId as string)).to.be.true;
});
it("Store", async function () {
it("Relay - dialed after", async function () {
this.timeout(20_000);
nwaku = new Nwaku(makeLogFileName(this));
await nwaku.start();
const multiAddrWithId = await nwaku.getMultiaddrWithId();
waku = await Waku.create({
staticNoiseKey: NOISE_KEY_1,
});
const waitPromise = waku.waitForRemotePeer([Protocols.Relay]);
await delay(1000);
await waku.dial(multiAddrWithId);
await waitPromise;
const peers = waku.relay.getPeers();
const nimPeerId = multiAddrWithId.getPeerId();
expect(nimPeerId).to.not.be.undefined;
expect(peers.has(nimPeerId as string)).to.be.true;
});
it("Relay - times out", function (done) {
this.timeout(5000);
Waku.create({
staticNoiseKey: NOISE_KEY_1,
}).then((waku) => {
waku.waitForRemotePeer([Protocols.Relay], 200).then(
() => {
throw "Promise expected to reject on time out";
},
(reason) => {
expect(reason).to.eq("Timed out waiting for a remote peer.");
done();
}
);
});
});
it("Store - dialed first", async function () {
this.timeout(20_000);
nwaku = new Nwaku(makeLogFileName(this));
await nwaku.start({ persistMessages: true });
@ -207,6 +244,7 @@ describe("Wait for remote peer / get peers", function () {
staticNoiseKey: NOISE_KEY_1,
});
await waku.dial(multiAddrWithId);
await delay(1000);
await waku.waitForRemotePeer([Protocols.Store]);
const peers = [];
@ -220,6 +258,31 @@ describe("Wait for remote peer / get peers", function () {
expect(peers.includes(nimPeerId as string)).to.be.true;
});
it("Store - dialed after - with timeout", async function () {
this.timeout(20_000);
nwaku = new Nwaku(makeLogFileName(this));
await nwaku.start({ persistMessages: true });
const multiAddrWithId = await nwaku.getMultiaddrWithId();
waku = await Waku.create({
staticNoiseKey: NOISE_KEY_1,
});
const waitPromise = waku.waitForRemotePeer([Protocols.Store], 2000);
await delay(1000);
await waku.dial(multiAddrWithId);
await waitPromise;
const peers = [];
for await (const peer of waku.store.peers) {
peers.push(peer.id.toB58String());
}
const nimPeerId = multiAddrWithId.getPeerId();
expect(nimPeerId).to.not.be.undefined;
expect(peers.includes(nimPeerId as string)).to.be.true;
});
it("LightPush", async function () {
this.timeout(20_000);
nwaku = new Nwaku(makeLogFileName(this));

View File

@ -329,14 +329,23 @@ export class Waku {
* Wait for a remote peer to be ready given the passed protocols.
* Useful when using the [[CreateOptions.bootstrap]] with [[Waku.create]].
*
* @default Remote peer must have Waku Store and Waku Relay enabled.
* @param protocols The protocols that need to be enabled by remote peers.
* @param timeoutMs A timeout value in milliseconds..
*
* @returns A promise that **resolves** if all desired protocols are fulfilled by
* remote nodes, **rejects** if the timeoutMs is reached.
*
* @default Remote peer must have Waku Relay enabled and no time out is applied.
*/
async waitForRemotePeer(protocols?: Protocols[]): Promise<void> {
const desiredProtocols = protocols ?? [Protocols.Relay, Protocols.Store];
async waitForRemotePeer(
protocols?: Protocols[],
timeoutMs?: number
): Promise<void> {
protocols = protocols ?? [Protocols.Relay];
const promises = [];
const promises: Promise<void>[] = [];
if (desiredProtocols.includes(Protocols.Relay)) {
if (protocols.includes(Protocols.Relay)) {
const peers = this.relay.getPeers();
if (peers.size == 0) {
@ -352,60 +361,35 @@ export class Waku {
}
}
if (desiredProtocols.includes(Protocols.Store)) {
let storePeerFound = false;
for await (const _peer of this.store.peers) {
storePeerFound = true;
break;
}
if (!storePeerFound) {
// No peer available for this protocol, waiting to connect to one.
const promise = new Promise<void>((resolve) => {
this.libp2p.peerStore.on(
"change:protocols",
({ protocols: connectedPeerProtocols }) => {
for (const codec of Object.values(StoreCodecs)) {
if (connectedPeerProtocols.includes(codec)) {
dbg("Resolving for", codec, connectedPeerProtocols);
resolve();
}
}
}
);
});
promises.push(promise);
}
if (protocols.includes(Protocols.Store)) {
const storePromise = (async (): Promise<void> => {
for await (const peer of this.store.peers) {
dbg("Store peer found", peer.id.toB58String());
break;
}
})();
promises.push(storePromise);
}
if (desiredProtocols.includes(Protocols.LightPush)) {
let lightPushPeerFound = false;
for await (const _peer of this.lightPush.peers) {
lightPushPeerFound = true;
break;
}
if (!lightPushPeerFound) {
// No peer available for this protocol, waiting to connect to one.
const promise = new Promise<void>((resolve) => {
this.libp2p.peerStore.on(
"change:protocols",
({ protocols: connectedPeerProtocols }) => {
if (connectedPeerProtocols.includes(LightPushCodec)) {
dbg("Resolving for", LightPushCodec, connectedPeerProtocols);
resolve();
}
}
);
});
promises.push(promise);
}
if (protocols.includes(Protocols.LightPush)) {
const lightPushPromise = (async (): Promise<void> => {
for await (const peer of this.lightPush.peers) {
dbg("Light Push peer found", peer.id.toB58String());
break;
}
})();
promises.push(lightPushPromise);
}
await Promise.all(promises);
if (timeoutMs) {
await rejectOnTimeout(
Promise.all(promises),
timeoutMs,
"Timed out waiting for a remote peer."
);
} else {
await Promise.all(promises);
}
}
private startKeepAlive(
@ -450,3 +434,13 @@ export class Waku {
}
}
}
const awaitTimeout = (ms: number, rejectReason: string): Promise<void> =>
new Promise((_resolve, reject) => setTimeout(() => reject(rejectReason), ms));
const rejectOnTimeout = (
promise: Promise<any>,
timeoutMs: number,
rejectReason: string
): Promise<void> =>
Promise.race([promise, awaitTimeout(timeoutMs, rejectReason)]);