Wait for identify protocol to finish when dialing

Removes most `delay()`.
This commit is contained in:
Franck Royer 2021-04-13 22:36:37 +10:00
parent 41fa29feac
commit 4b31a6aaba
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4
7 changed files with 70 additions and 55 deletions

View File

@ -7,7 +7,6 @@ import PeerId from 'peer-id';
import Waku from '../lib/waku'; import Waku from '../lib/waku';
import { WakuMessage } from '../lib/waku_message'; import { WakuMessage } from '../lib/waku_message';
import { RelayDefaultTopic } from '../lib/waku_relay'; import { RelayDefaultTopic } from '../lib/waku_relay';
import { delay } from '../test_utils/';
import { ChatMessage } from './chat_message'; import { ChatMessage } from './chat_message';
@ -49,23 +48,11 @@ const ChatContentTopic = 'dingpu';
await waku.dial(opts.staticNode); await waku.dial(opts.staticNode);
} }
await new Promise((resolve) =>
waku.libp2p.pubsub.once('gossipsub:heartbeat', resolve)
);
// TODO: identify if it is possible to listen to an event to confirm dial
// finished instead of an arbitrary delay. Tracked with
// https://github.com/status-im/js-waku/issues/18
await delay(2000);
// TODO: Automatically subscribe, tracked with // TODO: Automatically subscribe, tracked with
// https://github.com/status-im/js-waku/issues/17 // https://github.com/status-im/js-waku/issues/17
await waku.relay.subscribe(); await waku.relay.subscribe();
console.log('Subscribed to waku relay'); console.log('Subscribed to waku relay');
await new Promise((resolve) =>
waku.libp2p.pubsub.once('gossipsub:heartbeat', resolve)
);
const staticNodeId = opts.staticNode?.getPeerId(); const staticNodeId = opts.staticNode?.getPeerId();
if (staticNodeId) { if (staticNodeId) {
const storePeerId = PeerId.createFromB58String(staticNodeId); const storePeerId = PeerId.createFromB58String(staticNodeId);

View File

@ -4,11 +4,16 @@ import { bytes } from 'libp2p-noise/dist/src/@types/basic';
import { Noise } from 'libp2p-noise/dist/src/noise'; import { Noise } from 'libp2p-noise/dist/src/noise';
import TCP from 'libp2p-tcp'; import TCP from 'libp2p-tcp';
import Multiaddr from 'multiaddr'; import Multiaddr from 'multiaddr';
import pTimeout from 'p-timeout';
import PeerId from 'peer-id'; import PeerId from 'peer-id';
import { delay } from './delay';
import { RelayCodec, WakuRelay, WakuRelayPubsub } from './waku_relay'; import { RelayCodec, WakuRelay, WakuRelayPubsub } from './waku_relay';
import { StoreCodec, WakuStore } from './waku_store'; import { StoreCodec, WakuStore } from './waku_store';
const WaitForIdentityFreqMs = 50;
const WaitForIdentityTimeoutMs = 2_000;
export interface CreateOptions { export interface CreateOptions {
listenAddresses: string[]; listenAddresses: string[];
staticNoiseKey: bytes | undefined; staticNoiseKey: bytes | undefined;
@ -63,12 +68,57 @@ export default class Waku {
* @param peer The peer to dial * @param peer The peer to dial
*/ */
async dial(peer: PeerId | Multiaddr | string) { async dial(peer: PeerId | Multiaddr | string) {
return this.libp2p.dialProtocol(peer, [RelayCodec, StoreCodec]); await this.libp2p.dialProtocol(peer, [RelayCodec, StoreCodec]);
const peerId = toPeerId(peer);
await this.waitForIdentify(
peerId,
WaitForIdentityFreqMs,
WaitForIdentityTimeoutMs
);
} }
async dialWithMultiAddr(peerId: PeerId, multiaddr: Multiaddr[]) { async dialWithMultiAddr(peerId: PeerId, multiaddr: Multiaddr[]) {
this.libp2p.peerStore.addressBook.set(peerId, multiaddr); this.libp2p.peerStore.addressBook.set(peerId, multiaddr);
await this.libp2p.dialProtocol(peerId, RelayCodec); await this.libp2p.dialProtocol(peerId, RelayCodec);
await this.waitForIdentify(
peerId,
WaitForIdentityFreqMs,
WaitForIdentityTimeoutMs
);
}
/**
* Wait for the identify protocol to be finished. This helps ensure
* we know what protocols the peer implements
* @param peerId
* @param frequencyMilliseconds
* @param maxTimeoutMilliseconds
* @throws If there is no known connection with this peer.
*/
async waitForIdentify(
peerId: PeerId,
frequencyMilliseconds: number,
maxTimeoutMilliseconds: number
): Promise<void> {
const checkProtocols = this._waitForIdentify.bind(
this,
peerId,
frequencyMilliseconds
)();
await pTimeout(checkProtocols, maxTimeoutMilliseconds);
}
async _waitForIdentify(peerId: PeerId, frequencyMilliseconds: number) {
while (true) {
const peer = this.libp2p.peerStore.get(peerId);
if (!peer) throw 'No connection to peer';
if (peer.protocols.length > 0) {
return;
} else {
await delay(frequencyMilliseconds);
}
}
} }
async stop() { async stop() {
@ -91,3 +141,18 @@ export default class Waku {
return multiAddrWithId; return multiAddrWithId;
} }
} }
function toPeerId(peer: PeerId | Multiaddr | string): PeerId {
if (typeof peer === 'string') {
peer = new Multiaddr(peer);
}
if (Multiaddr.isMultiaddr(peer)) {
try {
peer = PeerId.createFromB58String(peer.getPeerId());
} catch (err) {
throw `${peer} is not a valid peer type`;
}
}
return peer;
}

View File

@ -2,9 +2,9 @@ import { expect } from 'chai';
import Pubsub from 'libp2p-interfaces/src/pubsub'; import Pubsub from 'libp2p-interfaces/src/pubsub';
import { NOISE_KEY_1, NOISE_KEY_2 } from '../../test_utils/constants'; import { NOISE_KEY_1, NOISE_KEY_2 } from '../../test_utils/constants';
import { delay } from '../../test_utils/delay';
import { makeLogFileName } from '../../test_utils/log_file'; import { makeLogFileName } from '../../test_utils/log_file';
import { NimWaku } from '../../test_utils/nim_waku'; import { NimWaku } from '../../test_utils/nim_waku';
import { delay } from '../delay';
import Waku from '../waku'; import Waku from '../waku';
import { WakuMessage } from '../waku_message'; import { WakuMessage } from '../waku_message';
@ -27,15 +27,6 @@ describe('Waku Relay', () => {
await waku1.dialWithMultiAddr(waku2.libp2p.peerId, waku2.libp2p.multiaddrs); await waku1.dialWithMultiAddr(waku2.libp2p.peerId, waku2.libp2p.multiaddrs);
await Promise.all([
new Promise((resolve) =>
waku1.libp2p.pubsub.once('gossipsub:heartbeat', resolve)
),
new Promise((resolve) =>
waku2.libp2p.pubsub.once('gossipsub:heartbeat', resolve)
),
]);
await waku1.relay.subscribe(); await waku1.relay.subscribe();
await waku2.relay.subscribe(); await waku2.relay.subscribe();
@ -176,16 +167,7 @@ describe('Waku Relay', () => {
await waku.dial(await nimWaku.getMultiaddrWithId()); await waku.dial(await nimWaku.getMultiaddrWithId());
await delay(100);
await new Promise((resolve) =>
waku.libp2p.pubsub.once('gossipsub:heartbeat', resolve)
);
await waku.relay.subscribe(); await waku.relay.subscribe();
await new Promise((resolve) =>
waku.libp2p.pubsub.once('gossipsub:heartbeat', resolve)
);
}); });
afterEach(async function () { afterEach(async function () {
@ -222,6 +204,8 @@ describe('Waku Relay', () => {
}); });
it('Nim publishes to js', async function () { it('Nim publishes to js', async function () {
await delay(200);
const message = WakuMessage.fromUtf8String('Here is another message.'); const message = WakuMessage.fromUtf8String('Here is another message.');
const receivedPromise = waitForNextData(waku.libp2p.pubsub); const receivedPromise = waitForNextData(waku.libp2p.pubsub);
@ -259,16 +243,6 @@ describe('Waku Relay', () => {
waku2.dial(nimWakuMultiaddr), waku2.dial(nimWakuMultiaddr),
]); ]);
await delay(100);
await Promise.all([
new Promise((resolve) =>
waku1.libp2p.pubsub.once('gossipsub:heartbeat', resolve)
),
new Promise((resolve) =>
waku2.libp2p.pubsub.once('gossipsub:heartbeat', resolve)
),
]);
await Promise.all([waku1.relay.subscribe(), waku2.relay.subscribe()]); await Promise.all([waku1.relay.subscribe(), waku2.relay.subscribe()]);
await Promise.all([ await Promise.all([

View File

@ -1,7 +1,6 @@
import { expect } from 'chai'; import { expect } from 'chai';
import { import {
delay,
makeLogFileName, makeLogFileName,
NimWaku, NimWaku,
NOISE_KEY_1, NOISE_KEY_1,
@ -23,11 +22,6 @@ describe('Waku Store', () => {
const waku0 = await Waku.create({ staticNoiseKey: NOISE_KEY_2 }); const waku0 = await Waku.create({ staticNoiseKey: NOISE_KEY_2 });
await waku0.dial(await nimWaku.getMultiaddrWithId()); await waku0.dial(await nimWaku.getMultiaddrWithId());
await delay(100);
await new Promise((resolve) =>
waku0.libp2p.pubsub.once('gossipsub:heartbeat', resolve)
);
await waku0.relay.subscribe(); await waku0.relay.subscribe();
await new Promise((resolve) => await new Promise((resolve) =>
@ -50,8 +44,6 @@ describe('Waku Store', () => {
waku = await Waku.create({ staticNoiseKey: NOISE_KEY_1 }); waku = await Waku.create({ staticNoiseKey: NOISE_KEY_1 });
await waku.dial(await nimWaku.getMultiaddrWithId()); await waku.dial(await nimWaku.getMultiaddrWithId());
await delay(500);
const nimPeerId = await nimWaku.getPeerId(); const nimPeerId = await nimWaku.getPeerId();
const messages = await waku.store.queryHistory(nimPeerId); const messages = await waku.store.queryHistory(nimPeerId);
@ -73,8 +65,6 @@ describe('Waku Store', () => {
waku = await Waku.create({ staticNoiseKey: NOISE_KEY_1 }); waku = await Waku.create({ staticNoiseKey: NOISE_KEY_1 });
await waku.dial(await nimWaku.getMultiaddrWithId()); await waku.dial(await nimWaku.getMultiaddrWithId());
await delay(500);
const nimPeerId = await nimWaku.getPeerId(); const nimPeerId = await nimWaku.getPeerId();
const messages = await waku.store.queryHistory(nimPeerId); const messages = await waku.store.queryHistory(nimPeerId);

View File

@ -1,7 +1,7 @@
import fs, { promises as asyncFs } from 'fs'; import fs, { promises as asyncFs } from 'fs';
import { promisify } from 'util'; import { promisify } from 'util';
import { delay } from './delay'; import { delay } from '../lib/delay';
export const existsAsync = (filepath: string) => export const existsAsync = (filepath: string) =>
asyncFs.access(filepath, fs.constants.F_OK); asyncFs.access(filepath, fs.constants.F_OK);

View File

@ -1,5 +1,4 @@
export * from './async_fs'; export * from './async_fs';
export * from './constants'; export * from './constants';
export * from './delay';
export * from './log_file'; export * from './log_file';
export * from './nim_waku'; export * from './nim_waku';