From 9a305033bc4cb21c9f7849c0cb3aabfe703aa52c Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Thu, 15 Apr 2021 14:28:18 +1000 Subject: [PATCH] Use peerStore event to determine if identify is done --- src/chat/index.ts | 50 +++++++++++++----------- src/lib/waku.ts | 66 -------------------------------- src/lib/waku_relay/index.spec.ts | 12 +++++- src/lib/waku_store/index.spec.ts | 42 ++++++++------------ 4 files changed, 56 insertions(+), 114 deletions(-) diff --git a/src/chat/index.ts b/src/chat/index.ts index 614277d11b..7194524f5f 100644 --- a/src/chat/index.ts +++ b/src/chat/index.ts @@ -3,11 +3,11 @@ import util from 'util'; import TCP from 'libp2p-tcp'; import Multiaddr from 'multiaddr'; -import PeerId from 'peer-id'; import Waku from '../lib/waku'; import { WakuMessage } from '../lib/waku_message'; import { RelayDefaultTopic } from '../lib/waku_relay'; +import { StoreCodec } from '../lib/waku_store'; import { ChatMessage } from './chat_message'; @@ -20,7 +20,7 @@ const ChatContentTopic = 'dingpu'; listenAddresses: [opts.listenAddr], modules: { transport: [TCP] }, }); - console.log('PeerId: ', waku.libp2p.peerId); + console.log('PeerId: ', waku.libp2p.peerId.toB58String()); console.log('Listening on '); waku.libp2p.multiaddrs.forEach((address) => { console.log(`\t- ${address}`); @@ -31,11 +31,6 @@ const ChatContentTopic = 'dingpu'; await waku.relay.subscribe(); console.log('Subscribed to waku relay'); - if (opts.staticNode) { - console.log(`Dialing ${opts.staticNode}`); - await waku.dial(opts.staticNode); - } - const rl = readline.createInterface({ input: process.stdin, output: process.stdout, @@ -60,23 +55,34 @@ const ChatContentTopic = 'dingpu'; } }); - const staticNodeId = opts.staticNode?.getPeerId(); - if (staticNodeId) { - const storePeerId = PeerId.createFromB58String(staticNodeId); - console.log( - `Retrieving archived messages from ${storePeerId.toB58String()}` - ); - const messages = await waku.store.queryHistory(storePeerId, [ - ChatContentTopic, - ]); - messages?.map((msg) => { - if (msg.payload) { - const chatMsg = ChatMessage.decode(msg.payload); - printMessage(chatMsg); - } - }); + if (opts.staticNode) { + console.log(`Dialing ${opts.staticNode}`); + await waku.dial(opts.staticNode); } + // If we connect to a peer with WakuStore, we run the protocol + // TODO: Instead of doing it `once` it should always be done but + // only new messages should be printed + waku.libp2p.peerStore.once( + 'change:protocols', + async ({ peerId, protocols }) => { + if (protocols.includes(StoreCodec)) { + console.log( + `Retrieving archived messages from ${peerId.toB58String()}` + ); + const messages = await waku.store.queryHistory(peerId, [ + ChatContentTopic, + ]); + messages?.map((msg) => { + if (msg.payload) { + const chatMsg = ChatMessage.decode(msg.payload); + printMessage(chatMsg); + } + }); + } + } + ); + console.log('Ready to chat!'); rl.prompt(); for await (const line of rl) { diff --git a/src/lib/waku.ts b/src/lib/waku.ts index cfc5ccf898..5040743932 100644 --- a/src/lib/waku.ts +++ b/src/lib/waku.ts @@ -4,16 +4,11 @@ import { bytes } from 'libp2p-noise/dist/src/@types/basic'; import { Noise } from 'libp2p-noise/dist/src/noise'; import Websockets from 'libp2p-websockets'; import Multiaddr from 'multiaddr'; -import pTimeout from 'p-timeout'; import PeerId from 'peer-id'; -import { delay } from './delay'; import { RelayCodec, WakuRelay, WakuRelayPubsub } from './waku_relay'; import { StoreCodec, WakuStore } from './waku_store'; -const WaitForIdentityFreqMs = 50; -const WaitForIdentityTimeoutMs = 2_000; - export interface CreateOptions { listenAddresses: string[]; staticNoiseKey: bytes | undefined; @@ -86,56 +81,10 @@ export default class Waku { */ async dial(peer: PeerId | Multiaddr | string) { await this.libp2p.dialProtocol(peer, [RelayCodec, StoreCodec]); - const peerId = toPeerId(peer); - await this.waitForIdentify( - peerId, - WaitForIdentityFreqMs, - WaitForIdentityTimeoutMs - ); } async dialWithMultiAddr(peerId: PeerId, multiaddr: Multiaddr[]) { this.libp2p.peerStore.addressBook.set(peerId, multiaddr); - await this.libp2p.dialProtocol(peerId, RelayCodec); - await this.waitForIdentify( - peerId, - WaitForIdentityFreqMs, - WaitForIdentityTimeoutMs - ); - } - - /** - * Wait for the identify protocol to be finished. This helps ensure - * we know what protocols the peer implements - * @param peerId - * @param frequencyMilliseconds - * @param maxTimeoutMilliseconds - * @throws If there is no known connection with this peer. - */ - async waitForIdentify( - peerId: PeerId, - frequencyMilliseconds: number, - maxTimeoutMilliseconds: number - ): Promise { - const checkProtocols = this._waitForIdentify.bind( - this, - peerId, - frequencyMilliseconds - )(); - - await pTimeout(checkProtocols, maxTimeoutMilliseconds); - } - - async _waitForIdentify(peerId: PeerId, frequencyMilliseconds: number) { - while (true) { - const peer = this.libp2p.peerStore.get(peerId); - if (!peer) throw 'No connection to peer'; - if (peer.protocols.length > 0) { - return; - } else { - await delay(frequencyMilliseconds); - } - } } async stop() { @@ -158,18 +107,3 @@ export default class Waku { return multiAddrWithId; } } - -function toPeerId(peer: PeerId | Multiaddr | string): PeerId { - if (typeof peer === 'string') { - peer = new Multiaddr(peer); - } - - if (Multiaddr.isMultiaddr(peer)) { - try { - peer = PeerId.createFromB58String(peer.getPeerId()); - } catch (err) { - throw `${peer} is not a valid peer type`; - } - } - return peer; -} diff --git a/src/lib/waku_relay/index.spec.ts b/src/lib/waku_relay/index.spec.ts index 5a5b735fec..d85ece6a13 100644 --- a/src/lib/waku_relay/index.spec.ts +++ b/src/lib/waku_relay/index.spec.ts @@ -180,9 +180,19 @@ describe('Waku Relay', () => { nimWaku = new NimWaku(this.test?.ctx?.currentTest?.title + ''); await nimWaku.start(); + await waku.relay.subscribe(); + await waku.dial(await nimWaku.getMultiaddrWithId()); - await waku.relay.subscribe(); + // Wait for identify protocol to finish + await new Promise((resolve) => { + waku.libp2p.peerStore.once('change:protocols', resolve); + }); + + // Wait for one heartbeat to ensure mesh is updated + await new Promise((resolve) => { + waku.libp2p.pubsub.once('gossipsub:heartbeat', resolve); + }); }); afterEach(async function () { diff --git a/src/lib/waku_store/index.spec.ts b/src/lib/waku_store/index.spec.ts index 84ea481b0f..817059ff7d 100644 --- a/src/lib/waku_store/index.spec.ts +++ b/src/lib/waku_store/index.spec.ts @@ -1,12 +1,7 @@ import { expect } from 'chai'; import TCP from 'libp2p-tcp'; -import { - makeLogFileName, - NimWaku, - NOISE_KEY_1, - NOISE_KEY_2, -} from '../../test_utils'; +import { makeLogFileName, NimWaku, NOISE_KEY_1 } from '../../test_utils'; import Waku from '../waku'; import { WakuMessage } from '../waku_message'; @@ -14,25 +9,6 @@ describe('Waku Store', () => { let waku: Waku; let nimWaku: NimWaku; - beforeEach(async function () { - this.timeout(5_000); - - nimWaku = new NimWaku(makeLogFileName(this)); - await nimWaku.start({ store: true }); - - const waku0 = await Waku.create({ - staticNoiseKey: NOISE_KEY_2, - modules: { transport: [TCP] }, - }); - await waku0.dial(await nimWaku.getMultiaddrWithId()); - - await waku0.relay.subscribe(); - - await new Promise((resolve) => - waku0.libp2p.pubsub.once('gossipsub:heartbeat', resolve) - ); - }); - afterEach(async function () { nimWaku ? nimWaku.stop() : null; waku ? await waku.stop() : null; @@ -41,6 +17,9 @@ describe('Waku Store', () => { it('Retrieves history', async function () { this.timeout(5_000); + nimWaku = new NimWaku(makeLogFileName(this)); + await nimWaku.start({ store: true }); + for (let i = 0; i < 2; i++) { await nimWaku.sendMessage(WakuMessage.fromUtf8String(`Message ${i}`)); } @@ -51,6 +30,11 @@ describe('Waku Store', () => { }); await waku.dial(await nimWaku.getMultiaddrWithId()); + // Wait for identify protocol to finish + await new Promise((resolve) => { + waku.libp2p.peerStore.once('change:protocols', resolve); + }); + const nimPeerId = await nimWaku.getPeerId(); const messages = await waku.store.queryHistory(nimPeerId); @@ -65,6 +49,9 @@ describe('Waku Store', () => { it('Retrieves all historical elements in chronological order through paging', async function () { this.timeout(5_000); + nimWaku = new NimWaku(makeLogFileName(this)); + await nimWaku.start({ store: true }); + for (let i = 0; i < 15; i++) { await nimWaku.sendMessage(WakuMessage.fromUtf8String(`Message ${i}`)); } @@ -75,6 +62,11 @@ describe('Waku Store', () => { }); await waku.dial(await nimWaku.getMultiaddrWithId()); + // Wait for identify protocol to finish + await new Promise((resolve) => { + waku.libp2p.peerStore.once('change:protocols', resolve); + }); + const nimPeerId = await nimWaku.getPeerId(); const messages = await waku.store.queryHistory(nimPeerId);