2022-07-25 12:43:17 +10:00
|
|
|
import { Peer, PeerProtocolsChangeData } from "@libp2p/interface-peer-store";
|
2022-07-25 12:33:08 +10:00
|
|
|
import debug from "debug";
|
2022-07-25 12:43:17 +10:00
|
|
|
import type { Libp2p } from "libp2p";
|
2022-07-25 12:33:08 +10:00
|
|
|
|
|
|
|
|
import { StoreCodecs } from "./constants";
|
|
|
|
|
import { Protocols, Waku } from "./waku";
|
|
|
|
|
import { FilterCodec } from "./waku_filter";
|
|
|
|
|
import { LightPushCodec } from "./waku_light_push";
|
|
|
|
|
|
|
|
|
|
const log = debug("waku:wait-for-remote-peer");
|
|
|
|
|
|
2022-07-25 12:43:17 +10:00
|
|
|
interface WakuProtocol {
|
|
|
|
|
libp2p: Libp2p;
|
|
|
|
|
peers: () => Promise<Peer[]>;
|
|
|
|
|
}
|
|
|
|
|
|
2022-07-25 12:33:08 +10:00
|
|
|
/**
|
|
|
|
|
* Wait for a remote peer to be ready given the passed protocols.
|
|
|
|
|
* Useful when using the [[CreateOptions.bootstrap]] with [[Waku.create]].
|
|
|
|
|
*
|
|
|
|
|
* @param waku The Waku Node
|
|
|
|
|
* @param protocols The protocols that need to be enabled by remote peers.
|
|
|
|
|
* @param timeoutMs A timeout value in milliseconds..
|
|
|
|
|
*
|
|
|
|
|
* @returns A promise that **resolves** if all desired protocols are fulfilled by
|
|
|
|
|
* remote nodes, **rejects** if the timeoutMs is reached.
|
|
|
|
|
*
|
|
|
|
|
* @default Remote peer must have Waku Relay enabled and no time out is applied.
|
|
|
|
|
*/
|
|
|
|
|
export async function waitForRemotePeer(
|
|
|
|
|
waku: Waku,
|
|
|
|
|
protocols?: Protocols[],
|
|
|
|
|
timeoutMs?: number
|
|
|
|
|
): Promise<void> {
|
|
|
|
|
protocols = protocols ?? [Protocols.Relay];
|
|
|
|
|
|
|
|
|
|
const promises: Promise<void>[] = [];
|
|
|
|
|
|
|
|
|
|
if (protocols.includes(Protocols.Relay)) {
|
|
|
|
|
const peers = waku.relay.getMeshPeers(waku.relay.pubSubTopic);
|
|
|
|
|
|
|
|
|
|
if (peers.length == 0) {
|
|
|
|
|
// No peer yet available, wait for a subscription
|
|
|
|
|
const promise = new Promise<void>((resolve) => {
|
|
|
|
|
// TODO: Remove listeners once done
|
|
|
|
|
waku.relay.addEventListener("subscription-change", () => {
|
|
|
|
|
// Remote peer subscribed to topic, now wait for a heartbeat
|
|
|
|
|
// so that the mesh is updated and the remote peer added to it
|
|
|
|
|
waku.relay.addEventListener("gossipsub:heartbeat", () => resolve());
|
|
|
|
|
});
|
|
|
|
|
});
|
|
|
|
|
promises.push(promise);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (protocols.includes(Protocols.Store)) {
|
2022-07-25 12:51:41 +10:00
|
|
|
promises.push(waitForConnectedPeer(waku.store, Object.values(StoreCodecs)));
|
2022-07-25 12:33:08 +10:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (protocols.includes(Protocols.LightPush)) {
|
2022-07-25 12:51:41 +10:00
|
|
|
promises.push(waitForConnectedPeer(waku.lightPush, [LightPushCodec]));
|
2022-07-25 12:33:08 +10:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (protocols.includes(Protocols.Filter)) {
|
2022-07-25 12:51:41 +10:00
|
|
|
promises.push(waitForConnectedPeer(waku.filter, [FilterCodec]));
|
2022-07-25 12:33:08 +10:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (timeoutMs) {
|
|
|
|
|
await rejectOnTimeout(
|
|
|
|
|
Promise.all(promises),
|
|
|
|
|
timeoutMs,
|
|
|
|
|
"Timed out waiting for a remote peer."
|
|
|
|
|
);
|
|
|
|
|
} else {
|
|
|
|
|
await Promise.all(promises);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2022-07-25 12:43:17 +10:00
|
|
|
/**
|
|
|
|
|
* Wait for a peer with the given protocol to be connected.
|
|
|
|
|
*/
|
|
|
|
|
async function waitForConnectedPeer(
|
|
|
|
|
waku: WakuProtocol,
|
2022-07-25 12:51:41 +10:00
|
|
|
codecs: string[]
|
2022-07-25 12:43:17 +10:00
|
|
|
): Promise<void> {
|
|
|
|
|
const peers = await waku.peers();
|
|
|
|
|
|
|
|
|
|
if (peers.length) {
|
2022-07-25 12:51:41 +10:00
|
|
|
log(`${codecs} peer found: `, peers[0].id.toString());
|
2022-07-25 12:43:17 +10:00
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
await new Promise<void>((resolve) => {
|
|
|
|
|
const cb = (evt: CustomEvent<PeerProtocolsChangeData>): void => {
|
2022-07-25 12:51:41 +10:00
|
|
|
for (const codec of codecs) {
|
|
|
|
|
if (evt.detail.protocols.includes(codec)) {
|
|
|
|
|
log("Resolving for", codec, evt.detail.protocols);
|
|
|
|
|
waku.libp2p.peerStore.removeEventListener("change:protocols", cb);
|
|
|
|
|
resolve();
|
|
|
|
|
break;
|
|
|
|
|
}
|
2022-07-25 12:43:17 +10:00
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
waku.libp2p.peerStore.addEventListener("change:protocols", cb);
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
2022-07-25 12:33:08 +10:00
|
|
|
const awaitTimeout = (ms: number, rejectReason: string): Promise<void> =>
|
|
|
|
|
new Promise((_resolve, reject) => setTimeout(() => reject(rejectReason), ms));
|
|
|
|
|
|
|
|
|
|
async function rejectOnTimeout<T>(
|
|
|
|
|
promise: Promise<T>,
|
|
|
|
|
timeoutMs: number,
|
|
|
|
|
rejectReason: string
|
|
|
|
|
): Promise<void> {
|
|
|
|
|
await Promise.race([promise, awaitTimeout(timeoutMs, rejectReason)]);
|
|
|
|
|
}
|