diff --git a/src/lib/waku.node.spec.ts b/src/lib/waku.node.spec.ts index c3c3617a1c..d1ba993c23 100644 --- a/src/lib/waku.node.spec.ts +++ b/src/lib/waku.node.spec.ts @@ -1,5 +1,4 @@ import { expect } from 'chai'; -import debug from 'debug'; import PeerId from 'peer-id'; import { @@ -9,13 +8,10 @@ import { NOISE_KEY_2, } from '../test_utils/'; -import { delay } from './delay'; import { Protocols, Waku } from './waku'; import { WakuMessage } from './waku_message'; import { generateSymmetricKey } from './waku_message/version_1'; -const dbg = debug('waku:test'); - const TestContentTopic = '/test/1/waku/utf8'; describe('Waku Dial [node only]', function () { @@ -28,7 +24,7 @@ describe('Waku Dial [node only]', function () { waku ? await waku.stop() : null; }); - it.skip('js connects to nim', async function () { + it('js connects to nim', async function () { this.timeout(20_000); nimWaku = new NimWaku(makeLogFileName(this)); await nimWaku.start(); @@ -40,21 +36,6 @@ describe('Waku Dial [node only]', function () { await waku.dial(multiAddrWithId); await waku.waitForRemotePeer([Protocols.Relay]); - let nimPeers = await nimWaku.peers(); - while (nimPeers.length === 0) { - await delay(200); - nimPeers = await nimWaku.peers(); - dbg('nimPeers', nimPeers); - } - - expect(nimPeers).to.deep.equal([ - { - multiaddr: multiAddrWithId, - protocol: '/vac/waku/relay/2.0.0', - connected: true, - }, - ]); - const nimPeerId = await nimWaku.getPeerId(); const jsPeers = waku.libp2p.peerStore.peers; @@ -218,3 +199,67 @@ describe('Decryption Keys', () => { expect(receivedMsg.timestamp?.valueOf()).to.eq(messageTimestamp.valueOf()); }); }); + +describe('Wait for remote peer / get peers', function () { + let waku: Waku; + let nimWaku: NimWaku; + + afterEach(async function () { + nimWaku ? nimWaku.stop() : null; + waku ? await waku.stop() : null; + }); + + it('Relay', async function () { + this.timeout(20_000); + nimWaku = new NimWaku(makeLogFileName(this)); + await nimWaku.start(); + const multiAddrWithId = await nimWaku.getMultiaddrWithId(); + + waku = await Waku.create({ + staticNoiseKey: NOISE_KEY_1, + }); + await waku.dial(multiAddrWithId); + await waku.waitForRemotePeer([Protocols.Relay]); + const peers = waku.relay.getPeers(); + const nimPeerId = multiAddrWithId.getPeerId(); + + expect(nimPeerId).to.not.be.undefined; + expect(peers.has(nimPeerId as string)).to.be.true; + }); + + it('Store', async function () { + this.timeout(20_000); + nimWaku = new NimWaku(makeLogFileName(this)); + await nimWaku.start({ persistMessages: true }); + const multiAddrWithId = await nimWaku.getMultiaddrWithId(); + + waku = await Waku.create({ + staticNoiseKey: NOISE_KEY_1, + }); + await waku.dial(multiAddrWithId); + await waku.waitForRemotePeer([Protocols.Store]); + const peers = waku.store.peers.map((peer) => peer.id.toB58String()); + const nimPeerId = multiAddrWithId.getPeerId(); + + expect(nimPeerId).to.not.be.undefined; + expect(peers.includes(nimPeerId as string)).to.be.true; + }); + + it('LightPush', async function () { + this.timeout(20_000); + nimWaku = new NimWaku(makeLogFileName(this)); + await nimWaku.start({ lightpush: true }); + const multiAddrWithId = await nimWaku.getMultiaddrWithId(); + + waku = await Waku.create({ + staticNoiseKey: NOISE_KEY_1, + }); + await waku.dial(multiAddrWithId); + await waku.waitForRemotePeer([Protocols.LightPush]); + const peers = waku.lightPush.peers.map((peer) => peer.id.toB58String()); + const nimPeerId = multiAddrWithId.getPeerId(); + + expect(nimPeerId).to.not.be.undefined; + expect(peers.includes(nimPeerId as string)).to.be.true; + }); +}); diff --git a/src/lib/waku.ts b/src/lib/waku.ts index fb7e632287..597f3cc345 100644 --- a/src/lib/waku.ts +++ b/src/lib/waku.ts @@ -18,7 +18,6 @@ import { Multiaddr, multiaddr } from 'multiaddr'; import PeerId from 'peer-id'; import { Bootstrap, BootstrapOptions } from './discovery'; -import { getPeersForProtocol } from './select_peer'; import { LightPushCodec, WakuLightPush } from './waku_light_push'; import { DecryptionMethod, WakuMessage } from './waku_message'; import { RelayCodecs, WakuRelay } from './waku_relay'; @@ -321,13 +320,21 @@ export class Waku { const promises = []; if (desiredProtocols.includes(Protocols.Relay)) { - const peers = []; + const peers = this.relay.getPeers(); - RelayCodecs.forEach((proto) => { - getPeersForProtocol(this.libp2p, proto).forEach((peer) => - peers.push(peer) - ); - }); + if (peers.size == 0) { + // No peer yet available, wait for a subscription + const promise = new Promise((resolve) => { + this.libp2p.pubsub.once('pubsub:subscription-change', () => { + resolve(); + }); + }); + promises.push(promise); + } + } + + if (desiredProtocols.includes(Protocols.Store)) { + const peers = this.store.peers; if (peers.length == 0) { // No peer available for this protocol, waiting to connect to one. @@ -335,63 +342,36 @@ export class Waku { this.libp2p.peerStore.on( 'change:protocols', ({ protocols: connectedPeerProtocols }) => { - RelayCodecs.forEach((relayProto) => { - if (connectedPeerProtocols.includes(relayProto)) { - // Relay peer is ready once subscription has happen. - this.libp2p.pubsub.once('pubsub:subscription-change', () => { - dbg('Resolving for', relayProto, connectedPeerProtocols); - resolve(); - }); - } - }); + if (connectedPeerProtocols.includes(StoreCodec)) { + dbg('Resolving for', StoreCodec, connectedPeerProtocols); + resolve(); + } } ); }); promises.push(promise); } + } - if (desiredProtocols.includes(Protocols.Store)) { - const peers = getPeersForProtocol(this.libp2p, StoreCodec); + if (desiredProtocols.includes(Protocols.LightPush)) { + const peers = this.lightPush.peers; - if (peers.length == 0) { - // No peer available for this protocol, waiting to connect to one. - const promise = new Promise((resolve) => { - this.libp2p.peerStore.on( - 'change:protocols', - ({ protocols: connectedPeerProtocols }) => { - if (connectedPeerProtocols.includes(StoreCodec)) { - dbg('Resolving for', StoreCodec, connectedPeerProtocols); - resolve(); - } + if (peers.length == 0) { + // No peer available for this protocol, waiting to connect to one. + const promise = new Promise((resolve) => { + this.libp2p.peerStore.on( + 'change:protocols', + ({ protocols: connectedPeerProtocols }) => { + if (connectedPeerProtocols.includes(LightPushCodec)) { + dbg('Resolving for', LightPushCodec, connectedPeerProtocols); + resolve(); } - ); - }); - promises.push(promise); - } + } + ); + }); + + promises.push(promise); } - - if (desiredProtocols.includes(Protocols.LightPush)) { - const peers = getPeersForProtocol(this.libp2p, LightPushCodec); - - if (peers.length == 0) { - // No peer available for this protocol, waiting to connect to one. - const promise = new Promise((resolve) => { - this.libp2p.peerStore.on( - 'change:protocols', - ({ protocols: connectedPeerProtocols }) => { - if (connectedPeerProtocols.includes(LightPushCodec)) { - dbg('Resolving for', LightPushCodec, connectedPeerProtocols); - resolve(); - } - } - ); - }); - - promises.push(promise); - } - } - - await Promise.all(promises); } await Promise.all(promises); diff --git a/src/lib/waku_relay/index.ts b/src/lib/waku_relay/index.ts index 0b6073752a..b4b1f677ad 100644 --- a/src/lib/waku_relay/index.ts +++ b/src/lib/waku_relay/index.ts @@ -202,7 +202,7 @@ export class WakuRelay extends Gossipsub { } /** - * Return the relay peers we are connected to and we would publish a message to + * Return the relay peers we are connected to, and we would publish a message to */ getPeers(): Set { return getRelayPeers(this, this.pubSubTopic, this._options.D, (id) => { diff --git a/src/test_utils/log_file.ts b/src/test_utils/log_file.ts index 4eca0e4bce..a861b68805 100644 --- a/src/test_utils/log_file.ts +++ b/src/test_utils/log_file.ts @@ -48,7 +48,7 @@ async function find(tail: Tail, line: string): Promise { } function clean(str: string): string { - return str.replace(/ /g, '_').replace(/[':()]/g, ''); + return str.replace(/ /g, '_').replace(/[':()/]/g, ''); } export function makeLogFileName(ctx: Context): string {