mirror of https://github.com/waku-org/js-waku.git
refactor!: Move `waitForRemotePeer` to own file
This commit is contained in:
parent
da9b7b31f5
commit
811685e041
|
@ -13,6 +13,8 @@ export * as enr from "./lib/enr";
|
|||
|
||||
export * as utils from "./lib/utils";
|
||||
|
||||
export { waitForRemotePeer } from "./lib/wait_for_remote_peer";
|
||||
|
||||
export * as waku from "./lib/waku";
|
||||
export { Waku, Protocols } from "./lib/waku";
|
||||
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
import { expect } from "chai";
|
||||
|
||||
import { makeLogFileName, NOISE_KEY_1, Nwaku } from "../../test_utils";
|
||||
import { waitForRemotePeer } from "../wait_for_remote_peer";
|
||||
import { createWaku, Protocols, Waku } from "../waku";
|
||||
|
||||
import { ENR } from "./enr";
|
||||
|
@ -30,7 +31,7 @@ describe("ENR Interop: nwaku", function () {
|
|||
});
|
||||
await waku.start();
|
||||
await waku.dial(multiAddrWithId);
|
||||
await waku.waitForRemotePeer([Protocols.Relay]);
|
||||
await waitForRemotePeer(waku, [Protocols.Relay]);
|
||||
|
||||
const nwakuInfo = await nwaku.info();
|
||||
const nimPeerId = await nwaku.getPeerId();
|
||||
|
@ -62,7 +63,7 @@ describe("ENR Interop: nwaku", function () {
|
|||
});
|
||||
await waku.start();
|
||||
await waku.dial(multiAddrWithId);
|
||||
await waku.waitForRemotePeer([Protocols.Relay]);
|
||||
await waitForRemotePeer(waku, [Protocols.Relay]);
|
||||
|
||||
const nwakuInfo = await nwaku.info();
|
||||
const nimPeerId = await nwaku.getPeerId();
|
||||
|
@ -94,7 +95,7 @@ describe("ENR Interop: nwaku", function () {
|
|||
});
|
||||
await waku.start();
|
||||
await waku.dial(multiAddrWithId);
|
||||
await waku.waitForRemotePeer([Protocols.Relay]);
|
||||
await waitForRemotePeer(waku, [Protocols.Relay]);
|
||||
|
||||
const nwakuInfo = await nwaku.info();
|
||||
const nimPeerId = await nwaku.getPeerId();
|
||||
|
|
|
@ -0,0 +1,171 @@
|
|||
import { expect } from "chai";
|
||||
|
||||
import { makeLogFileName, NOISE_KEY_1, Nwaku } from "../test_utils";
|
||||
import { delay } from "../test_utils/delay";
|
||||
|
||||
import { waitForRemotePeer } from "./wait_for_remote_peer";
|
||||
import { createWaku, Protocols, Waku } from "./waku";
|
||||
|
||||
describe("Wait for remote peer", function () {
|
||||
let waku: Waku;
|
||||
let nwaku: Nwaku | undefined;
|
||||
|
||||
afterEach(async function () {
|
||||
if (nwaku) {
|
||||
nwaku.stop();
|
||||
nwaku = undefined;
|
||||
}
|
||||
!!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e));
|
||||
});
|
||||
|
||||
it("Relay - dialed first", async function () {
|
||||
this.timeout(20_000);
|
||||
nwaku = new Nwaku(makeLogFileName(this));
|
||||
await nwaku.start();
|
||||
const multiAddrWithId = await nwaku.getMultiaddrWithId();
|
||||
|
||||
waku = await createWaku({
|
||||
staticNoiseKey: NOISE_KEY_1,
|
||||
});
|
||||
await waku.start();
|
||||
await waku.dial(multiAddrWithId);
|
||||
await delay(1000);
|
||||
await waitForRemotePeer(waku, [Protocols.Relay]);
|
||||
const peers = waku.relay.getPeers();
|
||||
const nimPeerId = multiAddrWithId.getPeerId();
|
||||
|
||||
expect(nimPeerId).to.not.be.undefined;
|
||||
expect(peers).to.includes(nimPeerId);
|
||||
});
|
||||
|
||||
it("Relay - dialed after", async function () {
|
||||
this.timeout(20_000);
|
||||
nwaku = new Nwaku(makeLogFileName(this));
|
||||
await nwaku.start();
|
||||
const multiAddrWithId = await nwaku.getMultiaddrWithId();
|
||||
|
||||
waku = await createWaku({
|
||||
staticNoiseKey: NOISE_KEY_1,
|
||||
});
|
||||
await waku.start();
|
||||
|
||||
const waitPromise = waitForRemotePeer(waku, [Protocols.Relay]);
|
||||
await delay(1000);
|
||||
await waku.dial(multiAddrWithId);
|
||||
await waitPromise;
|
||||
|
||||
// TODO: Should getMeshPeers be used instead?
|
||||
const peers = waku.relay.getPeers();
|
||||
const nimPeerId = multiAddrWithId.getPeerId();
|
||||
|
||||
expect(nimPeerId).to.not.be.undefined;
|
||||
expect(peers).includes(nimPeerId);
|
||||
});
|
||||
|
||||
it("Relay - times out", function (done) {
|
||||
this.timeout(5000);
|
||||
createWaku({
|
||||
staticNoiseKey: NOISE_KEY_1,
|
||||
})
|
||||
.then((waku) => waku.start().then(() => waku))
|
||||
.then((waku) => {
|
||||
waitForRemotePeer(waku, [Protocols.Relay], 200).then(
|
||||
() => {
|
||||
throw "Promise expected to reject on time out";
|
||||
},
|
||||
(reason) => {
|
||||
expect(reason).to.eq("Timed out waiting for a remote peer.");
|
||||
done();
|
||||
}
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
it("Store - dialed first", async function () {
|
||||
this.timeout(20_000);
|
||||
nwaku = new Nwaku(makeLogFileName(this));
|
||||
await nwaku.start({ persistMessages: true });
|
||||
const multiAddrWithId = await nwaku.getMultiaddrWithId();
|
||||
|
||||
waku = await createWaku({
|
||||
staticNoiseKey: NOISE_KEY_1,
|
||||
});
|
||||
await waku.start();
|
||||
await waku.dial(multiAddrWithId);
|
||||
await delay(1000);
|
||||
await waitForRemotePeer(waku, [Protocols.Store]);
|
||||
|
||||
const peers = (await waku.store.peers()).map((peer) => peer.id.toString());
|
||||
const nimPeerId = multiAddrWithId.getPeerId();
|
||||
|
||||
expect(nimPeerId).to.not.be.undefined;
|
||||
expect(peers.includes(nimPeerId as string)).to.be.true;
|
||||
});
|
||||
|
||||
it("Store - dialed after - with timeout", async function () {
|
||||
this.timeout(20_000);
|
||||
nwaku = new Nwaku(makeLogFileName(this));
|
||||
await nwaku.start({ persistMessages: true });
|
||||
const multiAddrWithId = await nwaku.getMultiaddrWithId();
|
||||
|
||||
waku = await createWaku({
|
||||
staticNoiseKey: NOISE_KEY_1,
|
||||
});
|
||||
await waku.start();
|
||||
const waitPromise = waitForRemotePeer(waku, [Protocols.Store], 2000);
|
||||
await delay(1000);
|
||||
await waku.dial(multiAddrWithId);
|
||||
await waitPromise;
|
||||
|
||||
const peers = (await waku.store.peers()).map((peer) => peer.id.toString());
|
||||
|
||||
const nimPeerId = multiAddrWithId.getPeerId();
|
||||
|
||||
expect(nimPeerId).to.not.be.undefined;
|
||||
expect(peers.includes(nimPeerId as string)).to.be.true;
|
||||
});
|
||||
|
||||
it("LightPush", async function () {
|
||||
this.timeout(20_000);
|
||||
nwaku = new Nwaku(makeLogFileName(this));
|
||||
await nwaku.start({ lightpush: true });
|
||||
const multiAddrWithId = await nwaku.getMultiaddrWithId();
|
||||
|
||||
waku = await createWaku({
|
||||
staticNoiseKey: NOISE_KEY_1,
|
||||
});
|
||||
await waku.start();
|
||||
await waku.dial(multiAddrWithId);
|
||||
await waitForRemotePeer(waku, [Protocols.LightPush]);
|
||||
|
||||
const peers = (await waku.lightPush.peers()).map((peer) =>
|
||||
peer.id.toString()
|
||||
);
|
||||
|
||||
const nimPeerId = multiAddrWithId.getPeerId();
|
||||
|
||||
expect(nimPeerId).to.not.be.undefined;
|
||||
expect(peers.includes(nimPeerId as string)).to.be.true;
|
||||
});
|
||||
|
||||
it("Filter", async function () {
|
||||
this.timeout(20_000);
|
||||
nwaku = new Nwaku(makeLogFileName(this));
|
||||
await nwaku.start({ filter: true });
|
||||
const multiAddrWithId = await nwaku.getMultiaddrWithId();
|
||||
|
||||
waku = await createWaku({
|
||||
staticNoiseKey: NOISE_KEY_1,
|
||||
});
|
||||
await waku.start();
|
||||
await waku.dial(multiAddrWithId);
|
||||
await waitForRemotePeer(waku, [Protocols.Filter]);
|
||||
|
||||
const peers = (await waku.filter.peers()).map((peer) => peer.id.toString());
|
||||
|
||||
const nimPeerId = multiAddrWithId.getPeerId();
|
||||
|
||||
expect(nimPeerId).to.not.be.undefined;
|
||||
expect(peers.includes(nimPeerId as string)).to.be.true;
|
||||
});
|
||||
});
|
|
@ -0,0 +1,144 @@
|
|||
import { PeerProtocolsChangeData } from "@libp2p/interface-peer-store";
|
||||
import debug from "debug";
|
||||
|
||||
import { StoreCodecs } from "./constants";
|
||||
import { Protocols, Waku } from "./waku";
|
||||
import { FilterCodec } from "./waku_filter";
|
||||
import { LightPushCodec } from "./waku_light_push";
|
||||
|
||||
const log = debug("waku:wait-for-remote-peer");
|
||||
|
||||
/**
|
||||
* Wait for a remote peer to be ready given the passed protocols.
|
||||
* Useful when using the [[CreateOptions.bootstrap]] with [[Waku.create]].
|
||||
*
|
||||
* @param waku The Waku Node
|
||||
* @param protocols The protocols that need to be enabled by remote peers.
|
||||
* @param timeoutMs A timeout value in milliseconds..
|
||||
*
|
||||
* @returns A promise that **resolves** if all desired protocols are fulfilled by
|
||||
* remote nodes, **rejects** if the timeoutMs is reached.
|
||||
*
|
||||
* @default Remote peer must have Waku Relay enabled and no time out is applied.
|
||||
*/
|
||||
export async function waitForRemotePeer(
|
||||
waku: Waku,
|
||||
protocols?: Protocols[],
|
||||
timeoutMs?: number
|
||||
): Promise<void> {
|
||||
protocols = protocols ?? [Protocols.Relay];
|
||||
|
||||
const promises: Promise<void>[] = [];
|
||||
|
||||
if (protocols.includes(Protocols.Relay)) {
|
||||
const peers = waku.relay.getMeshPeers(waku.relay.pubSubTopic);
|
||||
|
||||
if (peers.length == 0) {
|
||||
// No peer yet available, wait for a subscription
|
||||
const promise = new Promise<void>((resolve) => {
|
||||
// TODO: Remove listeners once done
|
||||
waku.relay.addEventListener("subscription-change", () => {
|
||||
// Remote peer subscribed to topic, now wait for a heartbeat
|
||||
// so that the mesh is updated and the remote peer added to it
|
||||
waku.relay.addEventListener("gossipsub:heartbeat", () => resolve());
|
||||
});
|
||||
});
|
||||
promises.push(promise);
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: This can be factored in one helper function
|
||||
// Probably need to add a "string" protocol to each class to make it easier
|
||||
if (protocols.includes(Protocols.Store)) {
|
||||
const storePromise = (async (): Promise<void> => {
|
||||
const peers = await waku.store.peers();
|
||||
|
||||
if (peers.length) {
|
||||
log("Store peer found: ", peers[0].id.toString());
|
||||
return;
|
||||
}
|
||||
|
||||
await new Promise<void>((resolve) => {
|
||||
const cb = (evt: CustomEvent<PeerProtocolsChangeData>): void => {
|
||||
for (const codec of Object.values(StoreCodecs)) {
|
||||
if (evt.detail.protocols.includes(codec)) {
|
||||
log("Resolving for", StoreCodecs, evt.detail.protocols);
|
||||
waku.libp2p.peerStore.removeEventListener("change:protocols", cb);
|
||||
resolve();
|
||||
break;
|
||||
}
|
||||
}
|
||||
};
|
||||
waku.libp2p.peerStore.addEventListener("change:protocols", cb);
|
||||
});
|
||||
})();
|
||||
promises.push(storePromise);
|
||||
}
|
||||
|
||||
if (protocols.includes(Protocols.LightPush)) {
|
||||
const lightPushPromise = (async (): Promise<void> => {
|
||||
const peers = await waku.lightPush.peers();
|
||||
|
||||
if (peers.length) {
|
||||
log("Light Push peer found: ", peers[0].id.toString());
|
||||
return;
|
||||
}
|
||||
|
||||
await new Promise<void>((resolve) => {
|
||||
const cb = (evt: CustomEvent<PeerProtocolsChangeData>): void => {
|
||||
if (evt.detail.protocols.includes(LightPushCodec)) {
|
||||
log("Resolving for", LightPushCodec, evt.detail.protocols);
|
||||
waku.libp2p.peerStore.removeEventListener("change:protocols", cb);
|
||||
resolve();
|
||||
}
|
||||
};
|
||||
waku.libp2p.peerStore.addEventListener("change:protocols", cb);
|
||||
});
|
||||
})();
|
||||
promises.push(lightPushPromise);
|
||||
}
|
||||
|
||||
if (protocols.includes(Protocols.Filter)) {
|
||||
const filterPromise = (async (): Promise<void> => {
|
||||
const peers = await waku.filter.peers();
|
||||
|
||||
if (peers.length) {
|
||||
log("Filter peer found: ", peers[0].id.toString());
|
||||
return;
|
||||
}
|
||||
|
||||
await new Promise<void>((resolve) => {
|
||||
const cb = (evt: CustomEvent<PeerProtocolsChangeData>): void => {
|
||||
if (evt.detail.protocols.includes(FilterCodec)) {
|
||||
log("Resolving for", FilterCodec, evt.detail.protocols);
|
||||
waku.libp2p.peerStore.removeEventListener("change:protocols", cb);
|
||||
resolve();
|
||||
}
|
||||
};
|
||||
waku.libp2p.peerStore.addEventListener("change:protocols", cb);
|
||||
});
|
||||
})();
|
||||
promises.push(filterPromise);
|
||||
}
|
||||
|
||||
if (timeoutMs) {
|
||||
await rejectOnTimeout(
|
||||
Promise.all(promises),
|
||||
timeoutMs,
|
||||
"Timed out waiting for a remote peer."
|
||||
);
|
||||
} else {
|
||||
await Promise.all(promises);
|
||||
}
|
||||
}
|
||||
|
||||
const awaitTimeout = (ms: number, rejectReason: string): Promise<void> =>
|
||||
new Promise((_resolve, reject) => setTimeout(() => reject(rejectReason), ms));
|
||||
|
||||
async function rejectOnTimeout<T>(
|
||||
promise: Promise<T>,
|
||||
timeoutMs: number,
|
||||
rejectReason: string
|
||||
): Promise<void> {
|
||||
await Promise.race([promise, awaitTimeout(timeoutMs, rejectReason)]);
|
||||
}
|
|
@ -7,9 +7,9 @@ import {
|
|||
NOISE_KEY_2,
|
||||
Nwaku,
|
||||
} from "../test_utils/";
|
||||
import { delay } from "../test_utils/delay";
|
||||
|
||||
import { generateSymmetricKey } from "./crypto";
|
||||
import { waitForRemotePeer } from "./wait_for_remote_peer";
|
||||
import { createWaku, Protocols, Waku } from "./waku";
|
||||
import { WakuMessage } from "./waku_message";
|
||||
|
||||
|
@ -36,7 +36,7 @@ describe("Waku Dial [node only]", function () {
|
|||
});
|
||||
await waku.start();
|
||||
await waku.dial(multiAddrWithId);
|
||||
await waku.waitForRemotePeer([Protocols.Relay]);
|
||||
await waitForRemotePeer(waku, [Protocols.Relay]);
|
||||
|
||||
const nimPeerId = await nwaku.getPeerId();
|
||||
expect(await waku.libp2p.peerStore.has(nimPeerId)).to.be.true;
|
||||
|
@ -135,8 +135,8 @@ describe("Decryption Keys", () => {
|
|||
);
|
||||
|
||||
await Promise.all([
|
||||
waku1.waitForRemotePeer([Protocols.Relay]),
|
||||
waku2.waitForRemotePeer([Protocols.Relay]),
|
||||
waitForRemotePeer(waku1, [Protocols.Relay]),
|
||||
waitForRemotePeer(waku2, [Protocols.Relay]),
|
||||
]);
|
||||
});
|
||||
|
||||
|
@ -177,167 +177,3 @@ describe("Decryption Keys", () => {
|
|||
expect(receivedMsg.timestamp?.valueOf()).to.eq(messageTimestamp.valueOf());
|
||||
});
|
||||
});
|
||||
|
||||
describe("Wait for remote peer / get peers", function () {
|
||||
let waku: Waku;
|
||||
let nwaku: Nwaku | undefined;
|
||||
|
||||
afterEach(async function () {
|
||||
if (nwaku) {
|
||||
nwaku.stop();
|
||||
nwaku = undefined;
|
||||
}
|
||||
!!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e));
|
||||
});
|
||||
|
||||
it("Relay - dialed first", async function () {
|
||||
this.timeout(20_000);
|
||||
nwaku = new Nwaku(makeLogFileName(this));
|
||||
await nwaku.start();
|
||||
const multiAddrWithId = await nwaku.getMultiaddrWithId();
|
||||
|
||||
waku = await createWaku({
|
||||
staticNoiseKey: NOISE_KEY_1,
|
||||
});
|
||||
await waku.start();
|
||||
await waku.dial(multiAddrWithId);
|
||||
await delay(1000);
|
||||
await waku.waitForRemotePeer([Protocols.Relay]);
|
||||
const peers = waku.relay.getPeers();
|
||||
const nimPeerId = multiAddrWithId.getPeerId();
|
||||
|
||||
expect(nimPeerId).to.not.be.undefined;
|
||||
expect(peers).to.includes(nimPeerId);
|
||||
});
|
||||
|
||||
it("Relay - dialed after", async function () {
|
||||
this.timeout(20_000);
|
||||
nwaku = new Nwaku(makeLogFileName(this));
|
||||
await nwaku.start();
|
||||
const multiAddrWithId = await nwaku.getMultiaddrWithId();
|
||||
|
||||
waku = await createWaku({
|
||||
staticNoiseKey: NOISE_KEY_1,
|
||||
});
|
||||
await waku.start();
|
||||
|
||||
const waitPromise = waku.waitForRemotePeer([Protocols.Relay]);
|
||||
await delay(1000);
|
||||
await waku.dial(multiAddrWithId);
|
||||
await waitPromise;
|
||||
|
||||
// TODO: Should getMeshPeers be used instead?
|
||||
const peers = waku.relay.getPeers();
|
||||
const nimPeerId = multiAddrWithId.getPeerId();
|
||||
|
||||
expect(nimPeerId).to.not.be.undefined;
|
||||
expect(peers).includes(nimPeerId);
|
||||
});
|
||||
|
||||
it("Relay - times out", function (done) {
|
||||
this.timeout(5000);
|
||||
createWaku({
|
||||
staticNoiseKey: NOISE_KEY_1,
|
||||
})
|
||||
.then((waku) => waku.start().then(() => waku))
|
||||
.then((waku) => {
|
||||
waku.waitForRemotePeer([Protocols.Relay], 200).then(
|
||||
() => {
|
||||
throw "Promise expected to reject on time out";
|
||||
},
|
||||
(reason) => {
|
||||
expect(reason).to.eq("Timed out waiting for a remote peer.");
|
||||
done();
|
||||
}
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
it("Store - dialed first", async function () {
|
||||
this.timeout(20_000);
|
||||
nwaku = new Nwaku(makeLogFileName(this));
|
||||
await nwaku.start({ persistMessages: true });
|
||||
const multiAddrWithId = await nwaku.getMultiaddrWithId();
|
||||
|
||||
waku = await createWaku({
|
||||
staticNoiseKey: NOISE_KEY_1,
|
||||
});
|
||||
await waku.start();
|
||||
await waku.dial(multiAddrWithId);
|
||||
await delay(1000);
|
||||
await waku.waitForRemotePeer([Protocols.Store]);
|
||||
|
||||
const peers = (await waku.store.peers()).map((peer) => peer.id.toString());
|
||||
const nimPeerId = multiAddrWithId.getPeerId();
|
||||
|
||||
expect(nimPeerId).to.not.be.undefined;
|
||||
expect(peers.includes(nimPeerId as string)).to.be.true;
|
||||
});
|
||||
|
||||
it("Store - dialed after - with timeout", async function () {
|
||||
this.timeout(20_000);
|
||||
nwaku = new Nwaku(makeLogFileName(this));
|
||||
await nwaku.start({ persistMessages: true });
|
||||
const multiAddrWithId = await nwaku.getMultiaddrWithId();
|
||||
|
||||
waku = await createWaku({
|
||||
staticNoiseKey: NOISE_KEY_1,
|
||||
});
|
||||
await waku.start();
|
||||
const waitPromise = waku.waitForRemotePeer([Protocols.Store], 2000);
|
||||
await delay(1000);
|
||||
await waku.dial(multiAddrWithId);
|
||||
await waitPromise;
|
||||
|
||||
const peers = (await waku.store.peers()).map((peer) => peer.id.toString());
|
||||
|
||||
const nimPeerId = multiAddrWithId.getPeerId();
|
||||
|
||||
expect(nimPeerId).to.not.be.undefined;
|
||||
expect(peers.includes(nimPeerId as string)).to.be.true;
|
||||
});
|
||||
|
||||
it("LightPush", async function () {
|
||||
this.timeout(20_000);
|
||||
nwaku = new Nwaku(makeLogFileName(this));
|
||||
await nwaku.start({ lightpush: true });
|
||||
const multiAddrWithId = await nwaku.getMultiaddrWithId();
|
||||
|
||||
waku = await createWaku({
|
||||
staticNoiseKey: NOISE_KEY_1,
|
||||
});
|
||||
await waku.start();
|
||||
await waku.dial(multiAddrWithId);
|
||||
await waku.waitForRemotePeer([Protocols.LightPush]);
|
||||
|
||||
const peers = (await waku.lightPush.peers()).map((peer) =>
|
||||
peer.id.toString()
|
||||
);
|
||||
|
||||
const nimPeerId = multiAddrWithId.getPeerId();
|
||||
|
||||
expect(nimPeerId).to.not.be.undefined;
|
||||
expect(peers.includes(nimPeerId as string)).to.be.true;
|
||||
});
|
||||
|
||||
it("Filter", async function () {
|
||||
this.timeout(20_000);
|
||||
nwaku = new Nwaku(makeLogFileName(this));
|
||||
await nwaku.start({ filter: true });
|
||||
const multiAddrWithId = await nwaku.getMultiaddrWithId();
|
||||
|
||||
waku = await createWaku({
|
||||
staticNoiseKey: NOISE_KEY_1,
|
||||
});
|
||||
await waku.start();
|
||||
await waku.dial(multiAddrWithId);
|
||||
await waku.waitForRemotePeer([Protocols.Filter]);
|
||||
|
||||
const peers = (await waku.filter.peers()).map((peer) => peer.id.toString());
|
||||
|
||||
const nimPeerId = multiAddrWithId.getPeerId();
|
||||
|
||||
expect(nimPeerId).to.not.be.undefined;
|
||||
expect(peers.includes(nimPeerId as string)).to.be.true;
|
||||
});
|
||||
});
|
||||
|
|
137
src/lib/waku.ts
137
src/lib/waku.ts
|
@ -1,7 +1,6 @@
|
|||
import { Noise } from "@chainsafe/libp2p-noise";
|
||||
import type { Stream } from "@libp2p/interface-connection";
|
||||
import type { PeerId } from "@libp2p/interface-peer-id";
|
||||
import { PeerProtocolsChangeData } from "@libp2p/interface-peer-store";
|
||||
import { Mplex } from "@libp2p/mplex";
|
||||
import { peerIdFromString } from "@libp2p/peer-id";
|
||||
import { WebSockets } from "@libp2p/websockets";
|
||||
|
@ -276,131 +275,6 @@ export class Waku {
|
|||
return localMultiaddr + "/p2p/" + this.libp2p.peerId.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for a remote peer to be ready given the passed protocols.
|
||||
* Useful when using the [[CreateOptions.bootstrap]] with [[Waku.create]].
|
||||
*
|
||||
* @param protocols The protocols that need to be enabled by remote peers.
|
||||
* @param timeoutMs A timeout value in milliseconds..
|
||||
*
|
||||
* @returns A promise that **resolves** if all desired protocols are fulfilled by
|
||||
* remote nodes, **rejects** if the timeoutMs is reached.
|
||||
*
|
||||
* @default Remote peer must have Waku Relay enabled and no time out is applied.
|
||||
*/
|
||||
async waitForRemotePeer(
|
||||
protocols?: Protocols[],
|
||||
timeoutMs?: number
|
||||
): Promise<void> {
|
||||
protocols = protocols ?? [Protocols.Relay];
|
||||
|
||||
const promises: Promise<void>[] = [];
|
||||
|
||||
if (protocols.includes(Protocols.Relay)) {
|
||||
const peers = this.relay.getMeshPeers(this.relay.pubSubTopic);
|
||||
|
||||
if (peers.length == 0) {
|
||||
// No peer yet available, wait for a subscription
|
||||
const promise = new Promise<void>((resolve) => {
|
||||
// TODO: Remove listeners once done
|
||||
this.relay.addEventListener("subscription-change", () => {
|
||||
// Remote peer subscribed to topic, now wait for a heartbeat
|
||||
// so that the mesh is updated and the remote peer added to it
|
||||
this.relay.addEventListener("gossipsub:heartbeat", () => resolve());
|
||||
});
|
||||
});
|
||||
promises.push(promise);
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: This can be factored in one helper function
|
||||
// Probably need to add a "string" protocol to each class to make it easier
|
||||
if (protocols.includes(Protocols.Store)) {
|
||||
const storePromise = (async (): Promise<void> => {
|
||||
const peers = await this.store.peers();
|
||||
|
||||
if (peers.length) {
|
||||
log("Store peer found: ", peers[0].id.toString());
|
||||
return;
|
||||
}
|
||||
|
||||
await new Promise<void>((resolve) => {
|
||||
const cb = (evt: CustomEvent<PeerProtocolsChangeData>): void => {
|
||||
for (const codec of Object.values(StoreCodecs)) {
|
||||
if (evt.detail.protocols.includes(codec)) {
|
||||
log("Resolving for", StoreCodecs, evt.detail.protocols);
|
||||
this.libp2p.peerStore.removeEventListener(
|
||||
"change:protocols",
|
||||
cb
|
||||
);
|
||||
resolve();
|
||||
break;
|
||||
}
|
||||
}
|
||||
};
|
||||
this.libp2p.peerStore.addEventListener("change:protocols", cb);
|
||||
});
|
||||
})();
|
||||
promises.push(storePromise);
|
||||
}
|
||||
|
||||
if (protocols.includes(Protocols.LightPush)) {
|
||||
const lightPushPromise = (async (): Promise<void> => {
|
||||
const peers = await this.lightPush.peers();
|
||||
|
||||
if (peers.length) {
|
||||
log("Light Push peer found: ", peers[0].id.toString());
|
||||
return;
|
||||
}
|
||||
|
||||
await new Promise<void>((resolve) => {
|
||||
const cb = (evt: CustomEvent<PeerProtocolsChangeData>): void => {
|
||||
if (evt.detail.protocols.includes(LightPushCodec)) {
|
||||
log("Resolving for", LightPushCodec, evt.detail.protocols);
|
||||
this.libp2p.peerStore.removeEventListener("change:protocols", cb);
|
||||
resolve();
|
||||
}
|
||||
};
|
||||
this.libp2p.peerStore.addEventListener("change:protocols", cb);
|
||||
});
|
||||
})();
|
||||
promises.push(lightPushPromise);
|
||||
}
|
||||
|
||||
if (protocols.includes(Protocols.Filter)) {
|
||||
const filterPromise = (async (): Promise<void> => {
|
||||
const peers = await this.filter.peers();
|
||||
|
||||
if (peers.length) {
|
||||
log("Filter peer found: ", peers[0].id.toString());
|
||||
return;
|
||||
}
|
||||
|
||||
await new Promise<void>((resolve) => {
|
||||
const cb = (evt: CustomEvent<PeerProtocolsChangeData>): void => {
|
||||
if (evt.detail.protocols.includes(FilterCodec)) {
|
||||
log("Resolving for", FilterCodec, evt.detail.protocols);
|
||||
this.libp2p.peerStore.removeEventListener("change:protocols", cb);
|
||||
resolve();
|
||||
}
|
||||
};
|
||||
this.libp2p.peerStore.addEventListener("change:protocols", cb);
|
||||
});
|
||||
})();
|
||||
promises.push(filterPromise);
|
||||
}
|
||||
|
||||
if (timeoutMs) {
|
||||
await rejectOnTimeout(
|
||||
Promise.all(promises),
|
||||
timeoutMs,
|
||||
"Timed out waiting for a remote peer."
|
||||
);
|
||||
} else {
|
||||
await Promise.all(promises);
|
||||
}
|
||||
}
|
||||
|
||||
private startKeepAlive(
|
||||
peerId: PeerId,
|
||||
pingPeriodSecs: number,
|
||||
|
@ -454,14 +328,3 @@ export class Waku {
|
|||
this.relayKeepAliveTimers = {};
|
||||
}
|
||||
}
|
||||
|
||||
const awaitTimeout = (ms: number, rejectReason: string): Promise<void> =>
|
||||
new Promise((_resolve, reject) => setTimeout(() => reject(rejectReason), ms));
|
||||
|
||||
async function rejectOnTimeout<T>(
|
||||
promise: Promise<T>,
|
||||
timeoutMs: number,
|
||||
rejectReason: string
|
||||
): Promise<void> {
|
||||
await Promise.race([promise, awaitTimeout(timeoutMs, rejectReason)]);
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ import debug from "debug";
|
|||
|
||||
import { makeLogFileName, NOISE_KEY_1, Nwaku } from "../../test_utils";
|
||||
import { delay } from "../../test_utils/delay";
|
||||
import { waitForRemotePeer } from "../wait_for_remote_peer";
|
||||
import { createWaku, Protocols, Waku } from "../waku";
|
||||
import { WakuMessage } from "../waku_message";
|
||||
|
||||
|
@ -29,7 +30,7 @@ describe("Waku Filter", () => {
|
|||
});
|
||||
await waku.start();
|
||||
await waku.dial(await nwaku.getMultiaddrWithId());
|
||||
await waku.waitForRemotePeer([Protocols.Filter, Protocols.LightPush]);
|
||||
await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]);
|
||||
});
|
||||
|
||||
it("creates a subscription", async function () {
|
||||
|
|
|
@ -3,6 +3,7 @@ import debug from "debug";
|
|||
|
||||
import { makeLogFileName, NOISE_KEY_1, Nwaku } from "../../test_utils";
|
||||
import { delay } from "../../test_utils/delay";
|
||||
import { waitForRemotePeer } from "../wait_for_remote_peer";
|
||||
import { createWaku, Protocols, Waku } from "../waku";
|
||||
import { WakuMessage } from "../waku_message";
|
||||
|
||||
|
@ -30,7 +31,7 @@ describe("Waku Light Push [node only]", () => {
|
|||
});
|
||||
await waku.start();
|
||||
await waku.dial(await nwaku.getMultiaddrWithId());
|
||||
await waku.waitForRemotePeer([Protocols.LightPush]);
|
||||
await waitForRemotePeer(waku, [Protocols.LightPush]);
|
||||
|
||||
const messageText = "Light Push works!";
|
||||
const message = await WakuMessage.fromUtf8String(
|
||||
|
@ -67,7 +68,7 @@ describe("Waku Light Push [node only]", () => {
|
|||
});
|
||||
await waku.start();
|
||||
await waku.dial(await nwaku.getMultiaddrWithId());
|
||||
await waku.waitForRemotePeer([Protocols.LightPush]);
|
||||
await waitForRemotePeer(waku, [Protocols.LightPush]);
|
||||
|
||||
const nimPeerId = await nwaku.getPeerId();
|
||||
|
||||
|
|
|
@ -14,6 +14,7 @@ import {
|
|||
getPublicKey,
|
||||
} from "../crypto";
|
||||
import { bytesToHex, bytesToUtf8, hexToBytes, utf8ToBytes } from "../utils";
|
||||
import { waitForRemotePeer } from "../wait_for_remote_peer";
|
||||
import { createWaku, Protocols, Waku } from "../waku";
|
||||
|
||||
import { DecryptionMethod, WakuMessage } from "./index";
|
||||
|
@ -41,7 +42,7 @@ describe("Waku Message [node only]", function () {
|
|||
dbg("Dialing to nwaku node");
|
||||
await waku.dial(await nwaku.getMultiaddrWithId());
|
||||
dbg("Wait for remote peer");
|
||||
await waku.waitForRemotePeer([Protocols.Relay]);
|
||||
await waitForRemotePeer(waku, [Protocols.Relay]);
|
||||
dbg("Remote peer ready");
|
||||
// As this test uses the nwaku RPC API, we somehow often face
|
||||
// Race conditions where the nwaku node does not have the js-waku
|
||||
|
|
|
@ -15,6 +15,7 @@ import {
|
|||
generateSymmetricKey,
|
||||
getPublicKey,
|
||||
} from "../crypto";
|
||||
import { waitForRemotePeer } from "../wait_for_remote_peer";
|
||||
import { createWaku, Protocols, Waku } from "../waku";
|
||||
import { DecryptionMethod, WakuMessage } from "../waku_message";
|
||||
|
||||
|
@ -55,8 +56,8 @@ describe("Waku Relay [node only]", () => {
|
|||
|
||||
log("Wait for mutual pubsub subscription");
|
||||
await Promise.all([
|
||||
waku1.waitForRemotePeer([Protocols.Relay]),
|
||||
waku2.waitForRemotePeer([Protocols.Relay]),
|
||||
waitForRemotePeer(waku1, [Protocols.Relay]),
|
||||
waitForRemotePeer(waku2, [Protocols.Relay]),
|
||||
]);
|
||||
log("before each hook done");
|
||||
});
|
||||
|
@ -284,8 +285,8 @@ describe("Waku Relay [node only]", () => {
|
|||
);
|
||||
|
||||
await Promise.all([
|
||||
waku1.waitForRemotePeer([Protocols.Relay]),
|
||||
waku2.waitForRemotePeer([Protocols.Relay]),
|
||||
waitForRemotePeer(waku1, [Protocols.Relay]),
|
||||
waitForRemotePeer(waku2, [Protocols.Relay]),
|
||||
// No subscription change expected for Waku 3
|
||||
]);
|
||||
|
||||
|
@ -334,7 +335,7 @@ describe("Waku Relay [node only]", () => {
|
|||
await nwaku.start();
|
||||
|
||||
await waku.dial(await nwaku.getMultiaddrWithId());
|
||||
await waku.waitForRemotePeer([Protocols.Relay]);
|
||||
await waitForRemotePeer(waku, [Protocols.Relay]);
|
||||
});
|
||||
|
||||
afterEach(async function () {
|
||||
|
@ -437,8 +438,8 @@ describe("Waku Relay [node only]", () => {
|
|||
|
||||
// Wait for identify protocol to finish
|
||||
await Promise.all([
|
||||
waku1.waitForRemotePeer([Protocols.Relay]),
|
||||
waku2.waitForRemotePeer([Protocols.Relay]),
|
||||
waitForRemotePeer(waku1, [Protocols.Relay]),
|
||||
waitForRemotePeer(waku2, [Protocols.Relay]),
|
||||
]);
|
||||
|
||||
await delay(2000);
|
||||
|
|
|
@ -12,6 +12,7 @@ import {
|
|||
generateSymmetricKey,
|
||||
getPublicKey,
|
||||
} from "../crypto";
|
||||
import { waitForRemotePeer } from "../wait_for_remote_peer";
|
||||
import { createWaku, Protocols, Waku } from "../waku";
|
||||
import { DecryptionMethod, WakuMessage } from "../waku_message";
|
||||
|
||||
|
@ -51,7 +52,7 @@ describe("Waku Store", () => {
|
|||
});
|
||||
await waku.start();
|
||||
await waku.dial(await nwaku.getMultiaddrWithId());
|
||||
await waku.waitForRemotePeer([Protocols.Store]);
|
||||
await waitForRemotePeer(waku, [Protocols.Store]);
|
||||
const messages = await waku.store.queryHistory([]);
|
||||
|
||||
expect(messages?.length).eq(2);
|
||||
|
@ -84,7 +85,7 @@ describe("Waku Store", () => {
|
|||
});
|
||||
await waku.start();
|
||||
await waku.dial(await nwaku.getMultiaddrWithId());
|
||||
await waku.waitForRemotePeer([Protocols.Store]);
|
||||
await waitForRemotePeer(waku, [Protocols.Store]);
|
||||
|
||||
let messages: WakuMessage[] = [];
|
||||
|
||||
|
@ -124,7 +125,7 @@ describe("Waku Store", () => {
|
|||
});
|
||||
await waku.start();
|
||||
await waku.dial(await nwaku.getMultiaddrWithId());
|
||||
await waku.waitForRemotePeer([Protocols.Store]);
|
||||
await waitForRemotePeer(waku, [Protocols.Store]);
|
||||
|
||||
let messages: WakuMessage[] = [];
|
||||
const desiredMsgs = 14;
|
||||
|
@ -161,7 +162,7 @@ describe("Waku Store", () => {
|
|||
});
|
||||
await waku.start();
|
||||
await waku.dial(await nwaku.getMultiaddrWithId());
|
||||
await waku.waitForRemotePeer([Protocols.Store]);
|
||||
await waitForRemotePeer(waku, [Protocols.Store]);
|
||||
|
||||
const messages = await waku.store.queryHistory([], {
|
||||
pageDirection: PageDirection.FORWARD,
|
||||
|
@ -201,7 +202,7 @@ describe("Waku Store", () => {
|
|||
});
|
||||
await waku.start();
|
||||
await waku.dial(await nwaku.getMultiaddrWithId());
|
||||
await waku.waitForRemotePeer([Protocols.Store]);
|
||||
await waitForRemotePeer(waku, [Protocols.Store]);
|
||||
|
||||
const nimPeerId = await nwaku.getPeerId();
|
||||
|
||||
|
@ -282,7 +283,7 @@ describe("Waku Store", () => {
|
|||
|
||||
dbg("Waku nodes connected to nwaku");
|
||||
|
||||
await waku1.waitForRemotePeer([Protocols.LightPush]);
|
||||
await waitForRemotePeer(waku1, [Protocols.LightPush]);
|
||||
|
||||
dbg("Sending messages using light push");
|
||||
await Promise.all([
|
||||
|
@ -292,7 +293,7 @@ describe("Waku Store", () => {
|
|||
waku1.lightPush.push(clearMessage),
|
||||
]);
|
||||
|
||||
await waku2.waitForRemotePeer([Protocols.Store]);
|
||||
await waitForRemotePeer(waku2, [Protocols.Store]);
|
||||
|
||||
waku2.store.addDecryptionKey(symKey);
|
||||
|
||||
|
@ -386,7 +387,7 @@ describe("Waku Store", () => {
|
|||
|
||||
dbg("Waku nodes connected to nwaku");
|
||||
|
||||
await waku1.waitForRemotePeer([Protocols.LightPush]);
|
||||
await waitForRemotePeer(waku1, [Protocols.LightPush]);
|
||||
|
||||
dbg("Sending messages using light push");
|
||||
await Promise.all([
|
||||
|
@ -396,7 +397,7 @@ describe("Waku Store", () => {
|
|||
waku1.lightPush.push(clearMessage),
|
||||
]);
|
||||
|
||||
await waku2.waitForRemotePeer([Protocols.Store]);
|
||||
await waitForRemotePeer(waku2, [Protocols.Store]);
|
||||
|
||||
waku2.addDecryptionKey(symKey, {
|
||||
contentTopics: [encryptedSymmetricContentTopic],
|
||||
|
@ -460,7 +461,7 @@ describe("Waku Store", () => {
|
|||
});
|
||||
await waku.start();
|
||||
await waku.dial(await nwaku.getMultiaddrWithId());
|
||||
await waku.waitForRemotePeer([Protocols.Store]);
|
||||
await waitForRemotePeer(waku, [Protocols.Store]);
|
||||
|
||||
const nwakuPeerId = await nwaku.getPeerId();
|
||||
|
||||
|
|
Loading…
Reference in New Issue