From d7b08f7e2440050a53c072561b81da4e1275f79a Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Mon, 25 Jul 2022 13:13:47 +1000 Subject: [PATCH] fix: Wait for remote peer for Waku Relay The promise resolves only if a remote peer is added to the gossipsub mesh. --- package-lock.json | 41 ++++++++++++++++++++++ package.json | 1 + src/lib/wait_for_remote_peer.node.spec.ts | 5 ++- src/lib/wait_for_remote_peer.ts | 42 +++++++++++++++-------- src/lib/waku_relay/index.ts | 5 +-- 5 files changed, 72 insertions(+), 22 deletions(-) diff --git a/package-lock.json b/package-lock.json index e09cd2d4f8..0447bd7b3b 100644 --- a/package-lock.json +++ b/package-lock.json @@ -32,6 +32,7 @@ "js-sha3": "^0.8.0", "libp2p": "next", "multiformats": "^9.6.5", + "p-event": "^5.0.1", "protons-runtime": "^1.0.4", "uint8arrays": "^3.0.0", "uuid": "^8.3.2" @@ -9798,6 +9799,31 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/p-event": { + "version": "5.0.1", + "resolved": "https://registry.npmjs.org/p-event/-/p-event-5.0.1.tgz", + "integrity": "sha512-dd589iCQ7m1L0bmC5NLlVYfy3TbBEsMUfWx9PyAgPeIcFZ/E2yaTZ4Rz4MiBmmJShviiftHVXOqfnfzJ6kyMrQ==", + "dependencies": { + "p-timeout": "^5.0.2" + }, + "engines": { + "node": "^12.20.0 || ^14.13.1 || >=16.0.0" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, + "node_modules/p-event/node_modules/p-timeout": { + "version": "5.1.0", + "resolved": "https://registry.npmjs.org/p-timeout/-/p-timeout-5.1.0.tgz", + "integrity": "sha512-auFDyzzzGZZZdHz3BtET9VEz0SE/uMEAx7uWfGPucfzEwwe/xH0iVeZibQmANYE/hp9T2+UUZT5m+BKyrDp3Ew==", + "engines": { + "node": ">=12" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/p-fifo": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/p-fifo/-/p-fifo-1.0.0.tgz", @@ -20602,6 +20628,21 @@ "resolved": "https://registry.npmjs.org/p-defer/-/p-defer-4.0.0.tgz", "integrity": "sha512-Vb3QRvQ0Y5XnF40ZUWW7JfLogicVh/EnA5gBIvKDJoYpeI82+1E3AlB9yOcKFS0AhHrWVnAQO39fbR0G99IVEQ==" }, + "p-event": { + "version": "5.0.1", + "resolved": "https://registry.npmjs.org/p-event/-/p-event-5.0.1.tgz", + "integrity": "sha512-dd589iCQ7m1L0bmC5NLlVYfy3TbBEsMUfWx9PyAgPeIcFZ/E2yaTZ4Rz4MiBmmJShviiftHVXOqfnfzJ6kyMrQ==", + "requires": { + "p-timeout": "^5.0.2" + }, + "dependencies": { + "p-timeout": { + "version": "5.1.0", + "resolved": "https://registry.npmjs.org/p-timeout/-/p-timeout-5.1.0.tgz", + "integrity": "sha512-auFDyzzzGZZZdHz3BtET9VEz0SE/uMEAx7uWfGPucfzEwwe/xH0iVeZibQmANYE/hp9T2+UUZT5m+BKyrDp3Ew==" + } + } + }, "p-fifo": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/p-fifo/-/p-fifo-1.0.0.tgz", diff --git a/package.json b/package.json index 16e0dec3eb..2d545fc25d 100644 --- a/package.json +++ b/package.json @@ -89,6 +89,7 @@ "js-sha3": "^0.8.0", "libp2p": "next", "multiformats": "^9.6.5", + "p-event": "^5.0.1", "protons-runtime": "^1.0.4", "uint8arrays": "^3.0.0", "uuid": "^8.3.2" diff --git a/src/lib/wait_for_remote_peer.node.spec.ts b/src/lib/wait_for_remote_peer.node.spec.ts index 06bf3e92b3..92c1ea489a 100644 --- a/src/lib/wait_for_remote_peer.node.spec.ts +++ b/src/lib/wait_for_remote_peer.node.spec.ts @@ -36,7 +36,7 @@ describe("Wait for remote peer", function () { await waku.dial(multiAddrWithId); await delay(1000); await waitForRemotePeer(waku, [Protocols.Relay]); - const peers = waku.relay.getPeers(); + const peers = waku.relay.getMeshPeers(); const nimPeerId = multiAddrWithId.getPeerId(); expect(nimPeerId).to.not.be.undefined; @@ -64,8 +64,7 @@ describe("Wait for remote peer", function () { await waku.dial(multiAddrWithId); await waitPromise; - // TODO: Should getMeshPeers be used instead? - const peers = waku.relay.getPeers(); + const peers = waku.relay.getMeshPeers(); const nimPeerId = multiAddrWithId.getPeerId(); expect(nimPeerId).to.not.be.undefined; diff --git a/src/lib/wait_for_remote_peer.ts b/src/lib/wait_for_remote_peer.ts index 826e2ea0b9..e52d9c4139 100644 --- a/src/lib/wait_for_remote_peer.ts +++ b/src/lib/wait_for_remote_peer.ts @@ -1,6 +1,8 @@ +import type { GossipSub } from "@chainsafe/libp2p-gossipsub"; import { Peer, PeerProtocolsChangeData } from "@libp2p/interface-peer-store"; import debug from "debug"; import type { Libp2p } from "libp2p"; +import { pEvent } from "p-event"; import { StoreCodecs } from "./constants"; import { Protocols, Waku } from "./waku"; @@ -14,10 +16,18 @@ interface WakuProtocol { peers: () => Promise; } +interface WakuGossipSubProtocol extends GossipSub { + getMeshPeers: () => string[]; +} + /** * Wait for a remote peer to be ready given the passed protocols. * Useful when using the [[CreateOptions.bootstrap]] with [[Waku.create]]. * + * If the passed protocols is a GossipSub protocol, then it resolves only once + * a peer is in a mesh, to help ensure that other peers will send and receive + * message to us. + * * @param waku The Waku Node * @param protocols The protocols that need to be enabled by remote peers. * @param timeoutMs A timeout value in milliseconds.. @@ -34,23 +44,10 @@ export async function waitForRemotePeer( ): Promise { protocols = protocols ?? [Protocols.Relay]; - const promises: Promise[] = []; + const promises = []; if (protocols.includes(Protocols.Relay)) { - const peers = waku.relay.getMeshPeers(waku.relay.pubSubTopic); - - if (peers.length == 0) { - // No peer yet available, wait for a subscription - const promise = new Promise((resolve) => { - // TODO: Remove listeners once done - waku.relay.addEventListener("subscription-change", () => { - // Remote peer subscribed to topic, now wait for a heartbeat - // so that the mesh is updated and the remote peer added to it - waku.relay.addEventListener("gossipsub:heartbeat", () => resolve()); - }); - }); - promises.push(promise); - } + promises.push(waitForGossipSubPeerInMesh(waku.relay)); } if (protocols.includes(Protocols.Store)) { @@ -105,6 +102,21 @@ async function waitForConnectedPeer( }); } +/** + * Wait for a peer with the given protocol to be connected and in the gossipsub + * mesh. + */ +async function waitForGossipSubPeerInMesh( + waku: WakuGossipSubProtocol +): Promise { + let peers = waku.getMeshPeers(); + + while (peers.length == 0) { + await pEvent(waku, "gossipsub:heartbeat"); + peers = waku.getMeshPeers(); + } +} + const awaitTimeout = (ms: number, rejectReason: string): Promise => new Promise((_resolve, reject) => setTimeout(() => reject(rejectReason), ms)); diff --git a/src/lib/waku_relay/index.ts b/src/lib/waku_relay/index.ts index a46ce15407..297bfd3fdc 100644 --- a/src/lib/waku_relay/index.ts +++ b/src/lib/waku_relay/index.ts @@ -210,9 +210,6 @@ export class WakuRelay extends GossipSub { } getMeshPeers(topic?: TopicStr): PeerIdStr[] { - return super.getMeshPeers(topic ?? DefaultPubSubTopic); + return super.getMeshPeers(topic ?? this.pubSubTopic); } - - // TODO: Implement method that uses Relay codec - // public async heartbeat(): Promise { }