From 4b31a6aabace70d35bbf1950e284055700177233 Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Tue, 13 Apr 2021 22:36:37 +1000 Subject: [PATCH] Wait for identify protocol to finish when dialing Removes most `delay()`. --- src/chat/index.ts | 13 ------- src/{test_utils => lib}/delay.ts | 0 src/lib/waku.ts | 67 +++++++++++++++++++++++++++++++- src/lib/waku_relay/index.spec.ts | 32 ++------------- src/lib/waku_store/index.spec.ts | 10 ----- src/test_utils/async_fs.ts | 2 +- src/test_utils/index.ts | 1 - 7 files changed, 70 insertions(+), 55 deletions(-) rename src/{test_utils => lib}/delay.ts (100%) diff --git a/src/chat/index.ts b/src/chat/index.ts index b2aef93765..c555c5db43 100644 --- a/src/chat/index.ts +++ b/src/chat/index.ts @@ -7,7 +7,6 @@ import PeerId from 'peer-id'; import Waku from '../lib/waku'; import { WakuMessage } from '../lib/waku_message'; import { RelayDefaultTopic } from '../lib/waku_relay'; -import { delay } from '../test_utils/'; import { ChatMessage } from './chat_message'; @@ -49,23 +48,11 @@ const ChatContentTopic = 'dingpu'; await waku.dial(opts.staticNode); } - await new Promise((resolve) => - waku.libp2p.pubsub.once('gossipsub:heartbeat', resolve) - ); - - // TODO: identify if it is possible to listen to an event to confirm dial - // finished instead of an arbitrary delay. Tracked with - // https://github.com/status-im/js-waku/issues/18 - await delay(2000); // TODO: Automatically subscribe, tracked with // https://github.com/status-im/js-waku/issues/17 await waku.relay.subscribe(); console.log('Subscribed to waku relay'); - await new Promise((resolve) => - waku.libp2p.pubsub.once('gossipsub:heartbeat', resolve) - ); - const staticNodeId = opts.staticNode?.getPeerId(); if (staticNodeId) { const storePeerId = PeerId.createFromB58String(staticNodeId); diff --git a/src/test_utils/delay.ts b/src/lib/delay.ts similarity index 100% rename from src/test_utils/delay.ts rename to src/lib/delay.ts diff --git a/src/lib/waku.ts b/src/lib/waku.ts index 357d84537e..b8c63fd7d4 100644 --- a/src/lib/waku.ts +++ b/src/lib/waku.ts @@ -4,11 +4,16 @@ import { bytes } from 'libp2p-noise/dist/src/@types/basic'; import { Noise } from 'libp2p-noise/dist/src/noise'; import TCP from 'libp2p-tcp'; import Multiaddr from 'multiaddr'; +import pTimeout from 'p-timeout'; import PeerId from 'peer-id'; +import { delay } from './delay'; import { RelayCodec, WakuRelay, WakuRelayPubsub } from './waku_relay'; import { StoreCodec, WakuStore } from './waku_store'; +const WaitForIdentityFreqMs = 50; +const WaitForIdentityTimeoutMs = 2_000; + export interface CreateOptions { listenAddresses: string[]; staticNoiseKey: bytes | undefined; @@ -63,12 +68,57 @@ export default class Waku { * @param peer The peer to dial */ async dial(peer: PeerId | Multiaddr | string) { - return this.libp2p.dialProtocol(peer, [RelayCodec, StoreCodec]); + await this.libp2p.dialProtocol(peer, [RelayCodec, StoreCodec]); + const peerId = toPeerId(peer); + await this.waitForIdentify( + peerId, + WaitForIdentityFreqMs, + WaitForIdentityTimeoutMs + ); } async dialWithMultiAddr(peerId: PeerId, multiaddr: Multiaddr[]) { this.libp2p.peerStore.addressBook.set(peerId, multiaddr); await this.libp2p.dialProtocol(peerId, RelayCodec); + await this.waitForIdentify( + peerId, + WaitForIdentityFreqMs, + WaitForIdentityTimeoutMs + ); + } + + /** + * Wait for the identify protocol to be finished. This helps ensure + * we know what protocols the peer implements + * @param peerId + * @param frequencyMilliseconds + * @param maxTimeoutMilliseconds + * @throws If there is no known connection with this peer. + */ + async waitForIdentify( + peerId: PeerId, + frequencyMilliseconds: number, + maxTimeoutMilliseconds: number + ): Promise { + const checkProtocols = this._waitForIdentify.bind( + this, + peerId, + frequencyMilliseconds + )(); + + await pTimeout(checkProtocols, maxTimeoutMilliseconds); + } + + async _waitForIdentify(peerId: PeerId, frequencyMilliseconds: number) { + while (true) { + const peer = this.libp2p.peerStore.get(peerId); + if (!peer) throw 'No connection to peer'; + if (peer.protocols.length > 0) { + return; + } else { + await delay(frequencyMilliseconds); + } + } } async stop() { @@ -91,3 +141,18 @@ export default class Waku { return multiAddrWithId; } } + +function toPeerId(peer: PeerId | Multiaddr | string): PeerId { + if (typeof peer === 'string') { + peer = new Multiaddr(peer); + } + + if (Multiaddr.isMultiaddr(peer)) { + try { + peer = PeerId.createFromB58String(peer.getPeerId()); + } catch (err) { + throw `${peer} is not a valid peer type`; + } + } + return peer; +} diff --git a/src/lib/waku_relay/index.spec.ts b/src/lib/waku_relay/index.spec.ts index 2b27b16ed3..98da87731f 100644 --- a/src/lib/waku_relay/index.spec.ts +++ b/src/lib/waku_relay/index.spec.ts @@ -2,9 +2,9 @@ import { expect } from 'chai'; import Pubsub from 'libp2p-interfaces/src/pubsub'; import { NOISE_KEY_1, NOISE_KEY_2 } from '../../test_utils/constants'; -import { delay } from '../../test_utils/delay'; import { makeLogFileName } from '../../test_utils/log_file'; import { NimWaku } from '../../test_utils/nim_waku'; +import { delay } from '../delay'; import Waku from '../waku'; import { WakuMessage } from '../waku_message'; @@ -27,15 +27,6 @@ describe('Waku Relay', () => { await waku1.dialWithMultiAddr(waku2.libp2p.peerId, waku2.libp2p.multiaddrs); - await Promise.all([ - new Promise((resolve) => - waku1.libp2p.pubsub.once('gossipsub:heartbeat', resolve) - ), - new Promise((resolve) => - waku2.libp2p.pubsub.once('gossipsub:heartbeat', resolve) - ), - ]); - await waku1.relay.subscribe(); await waku2.relay.subscribe(); @@ -176,16 +167,7 @@ describe('Waku Relay', () => { await waku.dial(await nimWaku.getMultiaddrWithId()); - await delay(100); - await new Promise((resolve) => - waku.libp2p.pubsub.once('gossipsub:heartbeat', resolve) - ); - await waku.relay.subscribe(); - - await new Promise((resolve) => - waku.libp2p.pubsub.once('gossipsub:heartbeat', resolve) - ); }); afterEach(async function () { @@ -222,6 +204,8 @@ describe('Waku Relay', () => { }); it('Nim publishes to js', async function () { + await delay(200); + const message = WakuMessage.fromUtf8String('Here is another message.'); const receivedPromise = waitForNextData(waku.libp2p.pubsub); @@ -259,16 +243,6 @@ describe('Waku Relay', () => { waku2.dial(nimWakuMultiaddr), ]); - await delay(100); - await Promise.all([ - new Promise((resolve) => - waku1.libp2p.pubsub.once('gossipsub:heartbeat', resolve) - ), - new Promise((resolve) => - waku2.libp2p.pubsub.once('gossipsub:heartbeat', resolve) - ), - ]); - await Promise.all([waku1.relay.subscribe(), waku2.relay.subscribe()]); await Promise.all([ diff --git a/src/lib/waku_store/index.spec.ts b/src/lib/waku_store/index.spec.ts index ccc21a89ac..3a3d593c32 100644 --- a/src/lib/waku_store/index.spec.ts +++ b/src/lib/waku_store/index.spec.ts @@ -1,7 +1,6 @@ import { expect } from 'chai'; import { - delay, makeLogFileName, NimWaku, NOISE_KEY_1, @@ -23,11 +22,6 @@ describe('Waku Store', () => { const waku0 = await Waku.create({ staticNoiseKey: NOISE_KEY_2 }); await waku0.dial(await nimWaku.getMultiaddrWithId()); - await delay(100); - await new Promise((resolve) => - waku0.libp2p.pubsub.once('gossipsub:heartbeat', resolve) - ); - await waku0.relay.subscribe(); await new Promise((resolve) => @@ -50,8 +44,6 @@ describe('Waku Store', () => { waku = await Waku.create({ staticNoiseKey: NOISE_KEY_1 }); await waku.dial(await nimWaku.getMultiaddrWithId()); - await delay(500); - const nimPeerId = await nimWaku.getPeerId(); const messages = await waku.store.queryHistory(nimPeerId); @@ -73,8 +65,6 @@ describe('Waku Store', () => { waku = await Waku.create({ staticNoiseKey: NOISE_KEY_1 }); await waku.dial(await nimWaku.getMultiaddrWithId()); - await delay(500); - const nimPeerId = await nimWaku.getPeerId(); const messages = await waku.store.queryHistory(nimPeerId); diff --git a/src/test_utils/async_fs.ts b/src/test_utils/async_fs.ts index 40f4d3549c..ebc63689f9 100644 --- a/src/test_utils/async_fs.ts +++ b/src/test_utils/async_fs.ts @@ -1,7 +1,7 @@ import fs, { promises as asyncFs } from 'fs'; import { promisify } from 'util'; -import { delay } from './delay'; +import { delay } from '../lib/delay'; export const existsAsync = (filepath: string) => asyncFs.access(filepath, fs.constants.F_OK); diff --git a/src/test_utils/index.ts b/src/test_utils/index.ts index 62a0c6e5aa..59bbdc199b 100644 --- a/src/test_utils/index.ts +++ b/src/test_utils/index.ts @@ -1,5 +1,4 @@ export * from './async_fs'; export * from './constants'; -export * from './delay'; export * from './log_file'; export * from './nim_waku';