diff --git a/CHANGELOG.md b/CHANGELOG.md index ec57f7c955..c8163325ff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Test: Upgrade nim-waku node to v0.7. - Doc: Renamed "DappConnect" to "Waku Connect". - Docs: API Docs are now available at https://js-waku.wakuconnect.dev/. +- **Breaking**: Replace `waitForConnectedPeer` with `waitForRemotePeer`; the new method checks that the peer is ready before resolving the promise. ## [0.15.0] - 2022-01-17 diff --git a/examples/relay-reactjs-chat/src/App.js b/examples/relay-reactjs-chat/src/App.js index 3d9d16b5cb..5de7fdf257 100644 --- a/examples/relay-reactjs-chat/src/App.js +++ b/examples/relay-reactjs-chat/src/App.js @@ -28,7 +28,7 @@ function App() { Waku.create({ bootstrap: { default: true } }).then((waku) => { setWaku(waku); setWakuStatus('Connecting'); - waku.waitForConnectedPeer().then(() => { + waku.waitForRemotePeer().then(() => { setWakuStatus('Ready'); }); }); diff --git a/examples/store-reactjs-chat/src/App.js b/examples/store-reactjs-chat/src/App.js index 891ad2a632..3e05d62ed7 100644 --- a/examples/store-reactjs-chat/src/App.js +++ b/examples/store-reactjs-chat/src/App.js @@ -34,7 +34,7 @@ function App() { // We do not handle disconnection/re-connection in this example if (wakuStatus === 'Connected') return; - waku.waitForConnectedPeer().then(() => { + waku.waitForRemotePeer().then(() => { // We are now connected to a store node setWakuStatus('Connected'); }); diff --git a/examples/web-chat/src/App.tsx b/examples/web-chat/src/App.tsx index 02a81caf75..dae8d1db06 100644 --- a/examples/web-chat/src/App.tsx +++ b/examples/web-chat/src/App.tsx @@ -129,7 +129,7 @@ export default function App() { if (historicalMessagesRetrieved) return; const retrieveMessages = async () => { - await waku.waitForConnectedPeer(); + await waku.waitForRemotePeer(); console.log(`Retrieving archived messages}`); try { diff --git a/src/lib/waku.node.spec.ts b/src/lib/waku.node.spec.ts index 50b47d44de..c3c3617a1c 100644 --- a/src/lib/waku.node.spec.ts +++ b/src/lib/waku.node.spec.ts @@ -10,10 +10,9 @@ import { } from '../test_utils/'; import { delay } from './delay'; -import { Waku } from './waku'; +import { Protocols, Waku } from './waku'; import { WakuMessage } from './waku_message'; import { generateSymmetricKey } from './waku_message/version_1'; -import { RelayCodecs } from './waku_relay'; const dbg = debug('waku:test'); @@ -39,7 +38,7 @@ describe('Waku Dial [node only]', function () { staticNoiseKey: NOISE_KEY_1, }); await waku.dial(multiAddrWithId); - await waku.waitForConnectedPeer([RelayCodecs]); + await waku.waitForRemotePeer([Protocols.Relay]); let nimPeers = await nimWaku.peers(); while (nimPeers.length === 0) { @@ -176,16 +175,8 @@ describe('Decryption Keys', () => { waku1.addPeerToAddressBook(waku2.libp2p.peerId, waku2.libp2p.multiaddrs); await Promise.all([ - new Promise((resolve) => - waku1.libp2p.pubsub.once('pubsub:subscription-change', () => - resolve(null) - ) - ), - new Promise((resolve) => - waku2.libp2p.pubsub.once('pubsub:subscription-change', () => - resolve(null) - ) - ), + waku1.waitForRemotePeer([Protocols.Relay]), + waku2.waitForRemotePeer([Protocols.Relay]), ]); }); diff --git a/src/lib/waku.ts b/src/lib/waku.ts index 25ac55bf4a..fb7e632287 100644 --- a/src/lib/waku.ts +++ b/src/lib/waku.ts @@ -13,7 +13,6 @@ import Websockets from 'libp2p-websockets'; // eslint-disable-next-line @typescript-eslint/ban-ts-comment // @ts-ignore: No types available import filters from 'libp2p-websockets/src/filters'; -import { Peer } from 'libp2p/dist/src/peer-store'; import Ping from 'libp2p/src/ping'; import { Multiaddr, multiaddr } from 'multiaddr'; import PeerId from 'peer-id'; @@ -38,6 +37,12 @@ export const DefaultPubSubTopic = '/waku/2/default-waku/proto'; const dbg = debug('waku:waku'); +export enum Protocols { + Relay = 'relay', + Store = 'store', + LightPush = 'lightpush', +} + export interface CreateOptions { /** * The PubSub Topic to use. Defaults to {@link DefaultPubSubTopic}. @@ -305,46 +310,91 @@ export class Waku { } /** - * Wait to be connected to a peer. Useful when using the [[CreateOptions.bootstrap]] - * with [[Waku.create]]. The Promise resolves only once we are connected to a - * Store peer, Relay peer and Light Push peer. + * Wait for a remote peer to be ready given the passed protocols. + * Useful when using the [[CreateOptions.bootstrap]] with [[Waku.create]]. + * + * @default Remote peer must have Waku Store and Waku Relay enabled. */ - async waitForConnectedPeer(protocols?: string[][]): Promise { - const desiredProtocols = protocols ?? [ - [StoreCodec], - [LightPushCodec], - RelayCodecs, - ]; + async waitForRemotePeer(protocols?: Protocols[]): Promise { + const desiredProtocols = protocols ?? [Protocols.Relay, Protocols.Store]; - await Promise.all( - desiredProtocols.map((desiredProtocolVersions) => { - const peers = new Array(); - desiredProtocolVersions.forEach((proto) => { - getPeersForProtocol(this.libp2p, proto).forEach((peer) => - peers.push(peer) + const promises = []; + + if (desiredProtocols.includes(Protocols.Relay)) { + const peers = []; + + RelayCodecs.forEach((proto) => { + getPeersForProtocol(this.libp2p, proto).forEach((peer) => + peers.push(peer) + ); + }); + + if (peers.length == 0) { + // No peer available for this protocol, waiting to connect to one. + const promise = new Promise((resolve) => { + this.libp2p.peerStore.on( + 'change:protocols', + ({ protocols: connectedPeerProtocols }) => { + RelayCodecs.forEach((relayProto) => { + if (connectedPeerProtocols.includes(relayProto)) { + // Relay peer is ready once subscription has happen. + this.libp2p.pubsub.once('pubsub:subscription-change', () => { + dbg('Resolving for', relayProto, connectedPeerProtocols); + resolve(); + }); + } + }); + } ); }); + promises.push(promise); + } - if (peers.length > 0) { - return Promise.resolve(); - } else { + if (desiredProtocols.includes(Protocols.Store)) { + const peers = getPeersForProtocol(this.libp2p, StoreCodec); + + if (peers.length == 0) { // No peer available for this protocol, waiting to connect to one. - return new Promise((resolve) => { + const promise = new Promise((resolve) => { this.libp2p.peerStore.on( 'change:protocols', ({ protocols: connectedPeerProtocols }) => { - desiredProtocolVersions.forEach((desiredProto) => { - if (connectedPeerProtocols.includes(desiredProto)) { - dbg('Resolving for', desiredProto, connectedPeerProtocols); - resolve(); - } - }); + if (connectedPeerProtocols.includes(StoreCodec)) { + dbg('Resolving for', StoreCodec, connectedPeerProtocols); + resolve(); + } } ); }); + promises.push(promise); } - }) - ); + } + + if (desiredProtocols.includes(Protocols.LightPush)) { + const peers = getPeersForProtocol(this.libp2p, LightPushCodec); + + if (peers.length == 0) { + // No peer available for this protocol, waiting to connect to one. + const promise = new Promise((resolve) => { + this.libp2p.peerStore.on( + 'change:protocols', + ({ protocols: connectedPeerProtocols }) => { + if (connectedPeerProtocols.includes(LightPushCodec)) { + dbg('Resolving for', LightPushCodec, connectedPeerProtocols); + resolve(); + } + } + ); + }); + + promises.push(promise); + } + } + + await Promise.all(promises); + } + + await Promise.all(promises); } private startKeepAlive( diff --git a/src/lib/waku_light_push/index.node.spec.ts b/src/lib/waku_light_push/index.node.spec.ts index 81d123ee6a..fce98f5a77 100644 --- a/src/lib/waku_light_push/index.node.spec.ts +++ b/src/lib/waku_light_push/index.node.spec.ts @@ -2,7 +2,7 @@ import { expect } from 'chai'; import { makeLogFileName, NimWaku, NOISE_KEY_1 } from '../../test_utils'; import { delay } from '../delay'; -import { Waku } from '../waku'; +import { Protocols, Waku } from '../waku'; import { WakuMessage } from '../waku_message'; const TestContentTopic = '/test/1/waku-light-push/utf8'; @@ -26,7 +26,7 @@ describe('Waku Light Push [node only]', () => { staticNoiseKey: NOISE_KEY_1, }); await waku.dial(await nimWaku.getMultiaddrWithId()); - await waku.waitForConnectedPeer(); + await waku.waitForRemotePeer([Protocols.LightPush]); const messageText = 'Light Push works!'; const message = await WakuMessage.fromUtf8String( @@ -62,7 +62,7 @@ describe('Waku Light Push [node only]', () => { staticNoiseKey: NOISE_KEY_1, }); await waku.dial(await nimWaku.getMultiaddrWithId()); - await waku.waitForConnectedPeer(); + await waku.waitForRemotePeer([Protocols.LightPush]); const nimPeerId = await nimWaku.getPeerId(); diff --git a/src/lib/waku_message/index.node.spec.ts b/src/lib/waku_message/index.node.spec.ts index 119c5ed80b..c0aa8f72dd 100644 --- a/src/lib/waku_message/index.node.spec.ts +++ b/src/lib/waku_message/index.node.spec.ts @@ -9,8 +9,7 @@ import { } from '../../test_utils'; import { delay } from '../delay'; import { hexToBuf } from '../utils'; -import { Waku } from '../waku'; -import { RelayCodecs } from '../waku_relay'; +import { Protocols, Waku } from '../waku'; import { generatePrivateKey, @@ -39,7 +38,7 @@ describe('Waku Message [node only]', function () { await nimWaku.start({ rpcPrivate: true }); await waku.dial(await nimWaku.getMultiaddrWithId()); - await waku.waitForConnectedPeer([RelayCodecs]); + await waku.waitForRemotePeer([Protocols.Relay]); let peers = await waku.relay.getPeers(); while (peers.size === 0) { diff --git a/src/lib/waku_relay/index.node.spec.ts b/src/lib/waku_relay/index.node.spec.ts index f4427fca46..59e46194a8 100644 --- a/src/lib/waku_relay/index.node.spec.ts +++ b/src/lib/waku_relay/index.node.spec.ts @@ -8,7 +8,7 @@ import { NOISE_KEY_2, } from '../../test_utils'; import { delay } from '../delay'; -import { DefaultPubSubTopic, Waku } from '../waku'; +import { DefaultPubSubTopic, Protocols, Waku } from '../waku'; import { DecryptionMethod, WakuMessage } from '../waku_message'; import { generatePrivateKey, @@ -16,8 +16,6 @@ import { getPublicKey, } from '../waku_message/version_1'; -import { RelayCodecs } from './constants'; - const log = debug('waku:test'); const TestContentTopic = '/test/1/waku-relay/utf8'; @@ -50,16 +48,8 @@ describe('Waku Relay [node only]', () => { log('Wait for mutual pubsub subscription'); await Promise.all([ - new Promise((resolve) => - waku1.libp2p.pubsub.once('pubsub:subscription-change', () => - resolve(null) - ) - ), - new Promise((resolve) => - waku2.libp2p.pubsub.once('pubsub:subscription-change', () => - resolve(null) - ) - ), + waku1.waitForRemotePeer([Protocols.Relay]), + waku2.waitForRemotePeer([Protocols.Relay]), ]); log('before each hook done'); }); @@ -279,16 +269,8 @@ describe('Waku Relay [node only]', () => { waku3.addPeerToAddressBook(waku2.libp2p.peerId, waku2.libp2p.multiaddrs); await Promise.all([ - new Promise((resolve) => - waku1.libp2p.pubsub.once('pubsub:subscription-change', () => - resolve(null) - ) - ), - new Promise((resolve) => - waku2.libp2p.pubsub.once('pubsub:subscription-change', () => - resolve(null) - ) - ), + waku1.waitForRemotePeer([Protocols.Relay]), + waku2.waitForRemotePeer([Protocols.Relay]), // No subscription change expected for Waku 3 ]); @@ -336,12 +318,7 @@ describe('Waku Relay [node only]', () => { await nimWaku.start(); await waku.dial(await nimWaku.getMultiaddrWithId()); - await waku.waitForConnectedPeer([RelayCodecs]); - - // Wait for one heartbeat to ensure mesh is updated - await new Promise((resolve) => { - waku.libp2p.pubsub.once('gossipsub:heartbeat', resolve); - }); + await waku.waitForRemotePeer([Protocols.Relay]); }); afterEach(async function () { @@ -444,17 +421,8 @@ describe('Waku Relay [node only]', () => { // Wait for identify protocol to finish await Promise.all([ - waku1.waitForConnectedPeer([RelayCodecs]), - waku2.waitForConnectedPeer([RelayCodecs]), - ]); - - await Promise.all([ - new Promise((resolve) => - waku1.libp2p.pubsub.once('gossipsub:heartbeat', resolve) - ), - new Promise((resolve) => - waku2.libp2p.pubsub.once('gossipsub:heartbeat', resolve) - ), + waku1.waitForRemotePeer([Protocols.Relay]), + waku2.waitForRemotePeer([Protocols.Relay]), ]); await delay(2000); diff --git a/src/lib/waku_store/index.node.spec.ts b/src/lib/waku_store/index.node.spec.ts index aa5489e76a..6db27c6aff 100644 --- a/src/lib/waku_store/index.node.spec.ts +++ b/src/lib/waku_store/index.node.spec.ts @@ -8,7 +8,7 @@ import { NOISE_KEY_2, } from '../../test_utils'; import { delay } from '../delay'; -import { Waku } from '../waku'; +import { Protocols, Waku } from '../waku'; import { DecryptionMethod, WakuMessage } from '../waku_message'; import { generatePrivateKey, @@ -18,8 +18,6 @@ import { import { PageDirection } from './history_rpc'; -import { StoreCodec } from './index'; - const dbg = debug('waku:test:store'); const TestContentTopic = '/test/1/waku-store/utf8'; @@ -51,7 +49,7 @@ describe('Waku Store', () => { staticNoiseKey: NOISE_KEY_1, }); await waku.dial(await nimWaku.getMultiaddrWithId()); - await waku.waitForConnectedPeer([[StoreCodec]]); + await waku.waitForRemotePeer([Protocols.Store]); const messages = await waku.store.queryHistory([]); expect(messages?.length).eq(2); @@ -81,7 +79,7 @@ describe('Waku Store', () => { staticNoiseKey: NOISE_KEY_1, }); await waku.dial(await nimWaku.getMultiaddrWithId()); - await waku.waitForConnectedPeer([[StoreCodec]]); + await waku.waitForRemotePeer([Protocols.Store]); let messages: WakuMessage[] = []; @@ -118,7 +116,7 @@ describe('Waku Store', () => { staticNoiseKey: NOISE_KEY_1, }); await waku.dial(await nimWaku.getMultiaddrWithId()); - await waku.waitForConnectedPeer([[StoreCodec]]); + await waku.waitForRemotePeer([Protocols.Store]); let messages: WakuMessage[] = []; const desiredMsgs = 14; @@ -152,7 +150,7 @@ describe('Waku Store', () => { staticNoiseKey: NOISE_KEY_1, }); await waku.dial(await nimWaku.getMultiaddrWithId()); - await waku.waitForConnectedPeer([[StoreCodec]]); + await waku.waitForRemotePeer([Protocols.Store]); const messages = await waku.store.queryHistory([], { pageDirection: PageDirection.FORWARD, @@ -189,7 +187,7 @@ describe('Waku Store', () => { staticNoiseKey: NOISE_KEY_1, }); await waku.dial(await nimWaku.getMultiaddrWithId()); - await waku.waitForConnectedPeer([[StoreCodec]]); + await waku.waitForRemotePeer([Protocols.Store]); const nimPeerId = await nimWaku.getPeerId(); @@ -453,7 +451,7 @@ describe('Waku Store', () => { staticNoiseKey: NOISE_KEY_1, }); await waku.dial(await nimWaku.getMultiaddrWithId()); - await waku.waitForConnectedPeer([[StoreCodec]]); + await waku.waitForRemotePeer([Protocols.Store]); const nimPeerId = await nimWaku.getPeerId();