From 47c55653716da7f2ccf9e77cc8d95313acd3672e Mon Sep 17 00:00:00 2001 From: Franck R Date: Mon, 31 Jan 2022 15:30:49 +1100 Subject: [PATCH] Wait for heartbeat before considering relay peer ready (#472) --- src/lib/waku.node.spec.ts | 29 +++++++++++------- src/lib/waku.ts | 4 ++- src/lib/waku_light_push/index.node.spec.ts | 10 +++++-- src/lib/waku_message/index.node.spec.ts | 35 +++++++++++++--------- src/lib/waku_relay/index.node.spec.ts | 21 ++++++------- src/lib/waku_relay/index.ts | 2 ++ src/lib/waku_store/index.node.spec.ts | 10 ++++--- src/test_utils/nim_waku.ts | 17 ++++------- 8 files changed, 75 insertions(+), 53 deletions(-) diff --git a/src/lib/waku.node.spec.ts b/src/lib/waku.node.spec.ts index d1ba993c23..455af33231 100644 --- a/src/lib/waku.node.spec.ts +++ b/src/lib/waku.node.spec.ts @@ -1,4 +1,5 @@ import { expect } from 'chai'; +import debug from 'debug'; import PeerId from 'peer-id'; import { @@ -12,6 +13,8 @@ import { Protocols, Waku } from './waku'; import { WakuMessage } from './waku_message'; import { generateSymmetricKey } from './waku_message/version_1'; +const dbg = debug('waku:test'); + const TestContentTopic = '/test/1/waku/utf8'; describe('Waku Dial [node only]', function () { @@ -20,8 +23,8 @@ describe('Waku Dial [node only]', function () { let nimWaku: NimWaku; afterEach(async function () { - nimWaku ? nimWaku.stop() : null; - waku ? await waku.stop() : null; + !!nimWaku && nimWaku.stop(); + !!waku && waku.stop().catch((e) => console.log('Waku failed to stop', e)); }); it('js connects to nim', async function () { @@ -48,8 +51,8 @@ describe('Waku Dial [node only]', function () { let nimWaku: NimWaku; afterEach(async function () { - nimWaku ? nimWaku.stop() : null; - waku ? await waku.stop() : null; + !!nimWaku && nimWaku.stop(); + !!waku && waku.stop().catch((e) => console.log('Waku failed to stop', e)); }); before(function () { @@ -83,8 +86,8 @@ describe('Waku Dial [node only]', function () { let nimWaku: NimWaku; afterEach(async function () { - nimWaku ? nimWaku.stop() : null; - waku ? await waku.stop() : null; + !!nimWaku && nimWaku.stop(); + !!waku && waku.stop().catch((e) => console.log('Waku failed to stop', e)); }); it('Passing an array', async function () { @@ -145,6 +148,7 @@ describe('Decryption Keys', () => { let waku1: Waku; let waku2: Waku; beforeEach(async function () { + this.timeout(5000); [waku1, waku2] = await Promise.all([ Waku.create({ staticNoiseKey: NOISE_KEY_1 }), Waku.create({ @@ -162,9 +166,8 @@ describe('Decryption Keys', () => { }); afterEach(async function () { - this.timeout(5000); - await waku1.stop(); - await waku2.stop(); + !!waku1 && waku1.stop().catch((e) => console.log('Waku failed to stop', e)); + !!waku2 && waku2.stop().catch((e) => console.log('Waku failed to stop', e)); }); it('Used by Waku Relay', async function () { @@ -205,8 +208,8 @@ describe('Wait for remote peer / get peers', function () { let nimWaku: NimWaku; afterEach(async function () { - nimWaku ? nimWaku.stop() : null; - waku ? await waku.stop() : null; + !!nimWaku && nimWaku.stop(); + !!waku && waku.stop().catch((e) => console.log('Waku failed to stop', e)); }); it('Relay', async function () { @@ -215,11 +218,15 @@ describe('Wait for remote peer / get peers', function () { await nimWaku.start(); const multiAddrWithId = await nimWaku.getMultiaddrWithId(); + dbg('Create'); waku = await Waku.create({ staticNoiseKey: NOISE_KEY_1, }); + dbg('Dial'); await waku.dial(multiAddrWithId); + dbg('waitForRemotePeer'); await waku.waitForRemotePeer([Protocols.Relay]); + dbg('Done, get peers'); const peers = waku.relay.getPeers(); const nimPeerId = multiAddrWithId.getPeerId(); diff --git a/src/lib/waku.ts b/src/lib/waku.ts index 597f3cc345..e1e322d296 100644 --- a/src/lib/waku.ts +++ b/src/lib/waku.ts @@ -326,7 +326,9 @@ export class Waku { // No peer yet available, wait for a subscription const promise = new Promise((resolve) => { this.libp2p.pubsub.once('pubsub:subscription-change', () => { - resolve(); + // Remote peer subscribed to topic, now wait for a heartbeat + // so that the mesh is updated and the remote peer added to it + this.libp2p.pubsub.once('gossipsub:heartbeat', resolve); }); }); promises.push(promise); diff --git a/src/lib/waku_light_push/index.node.spec.ts b/src/lib/waku_light_push/index.node.spec.ts index fce98f5a77..731050eb2f 100644 --- a/src/lib/waku_light_push/index.node.spec.ts +++ b/src/lib/waku_light_push/index.node.spec.ts @@ -1,10 +1,13 @@ import { expect } from 'chai'; +import debug from 'debug'; import { makeLogFileName, NimWaku, NOISE_KEY_1 } from '../../test_utils'; import { delay } from '../delay'; import { Protocols, Waku } from '../waku'; import { WakuMessage } from '../waku_message'; +const dbg = debug('waku:test:lightpush'); + const TestContentTopic = '/test/1/waku-light-push/utf8'; describe('Waku Light Push [node only]', () => { @@ -12,8 +15,8 @@ describe('Waku Light Push [node only]', () => { let nimWaku: NimWaku; afterEach(async function () { - nimWaku ? nimWaku.stop() : null; - waku ? await waku.stop() : null; + !!nimWaku && nimWaku.stop(); + !!waku && waku.stop().catch((e) => console.log('Waku failed to stop', e)); }); it('Push successfully', async function () { @@ -72,13 +75,16 @@ describe('Waku Light Push [node only]', () => { TestContentTopic ); + dbg('Send message via lightpush'); const pushResponse = await waku.lightPush.push(message, { peerId: nimPeerId, }); + dbg('Ack received', pushResponse); expect(pushResponse?.isSuccess).to.be.true; let msgs: WakuMessage[] = []; + dbg('Waiting for message to show on nim-waku side'); while (msgs.length === 0) { await delay(200); msgs = await nimWaku.messages(); diff --git a/src/lib/waku_message/index.node.spec.ts b/src/lib/waku_message/index.node.spec.ts index c0aa8f72dd..a8b7e8fadb 100644 --- a/src/lib/waku_message/index.node.spec.ts +++ b/src/lib/waku_message/index.node.spec.ts @@ -35,21 +35,19 @@ describe('Waku Message [node only]', function () { }); nimWaku = new NimWaku(makeLogFileName(this)); - await nimWaku.start({ rpcPrivate: true }); + dbg('Starting nim-waku node'); + await nimWaku.start({ rpcPrivate: true, lightpush: true }); + dbg('Dialing to nim-waku node'); await waku.dial(await nimWaku.getMultiaddrWithId()); - await waku.waitForRemotePeer([Protocols.Relay]); - - let peers = await waku.relay.getPeers(); - while (peers.size === 0) { - await delay(200); - peers = await waku.relay.getPeers(); - } + dbg('Wait for remote peer'); + await waku.waitForRemotePeer([Protocols.Relay, Protocols.LightPush]); + dbg('Remote peer ready'); }); afterEach(async function () { - nimWaku ? nimWaku.stop() : null; - waku ? await waku.stop() : null; + !!nimWaku && nimWaku.stop(); + !!waku && waku.stop().catch((e) => console.log('Waku failed to stop', e)); }); it('JS decrypts nim message [asymmetric, no signature]', async function () { @@ -86,11 +84,13 @@ describe('Waku Message [node only]', function () { it('Js encrypts message for nim [asymmetric, no signature]', async function () { this.timeout(5000); + dbg('Ask nim-waku to generate asymmetric key pair'); const keyPair = await nimWaku.getAsymmetricKeyPair(); const privateKey = hexToBuf(keyPair.privateKey); const publicKey = hexToBuf(keyPair.publicKey); const messageText = 'This is a message I am going to encrypt'; + dbg('Encrypt message'); const message = await WakuMessage.fromUtf8String( messageText, TestContentTopic, @@ -99,15 +99,18 @@ describe('Waku Message [node only]', function () { } ); + dbg('Send message over relay'); await waku.relay.send(message); let msgs: WakuRelayMessage[] = []; while (msgs.length === 0) { + dbg('Wait for message to be seen by nim-waku'); await delay(200); msgs = await nimWaku.getAsymmetricMessages(privateKey); } + dbg('Check message content'); expect(msgs[0].contentTopic).to.equal(message.contentTopic); expect(hexToBuf(msgs[0].payload).toString('utf-8')).to.equal(messageText); }); @@ -121,6 +124,7 @@ describe('Waku Message [node only]', function () { payload: Buffer.from(messageText, 'utf-8').toString('hex'), }; + dbg('Generate symmetric key'); const symKey = generateSymmetricKey(); waku.relay.addDecryptionKey(symKey); @@ -131,10 +135,11 @@ describe('Waku Message [node only]', function () { } ); - dbg('Post message'); + dbg('Post message using nim-waku'); await nimWaku.postSymmetricMessage(message, symKey); - + dbg('Wait for message to be received by js-waku'); const receivedMsg = await receivedMsgPromise; + dbg('Message received by js-waku'); expect(receivedMsg.contentTopic).to.eq(message.contentTopic); expect(receivedMsg.version).to.eq(1); @@ -144,8 +149,9 @@ describe('Waku Message [node only]', function () { it('Js encrypts message for nim [symmetric, no signature]', async function () { this.timeout(5000); + dbg('Getting symmetric key from nim-waku'); const symKey = await nimWaku.getSymmetricKey(); - + dbg('Encrypting message with js-waku'); const messageText = 'This is a message I am going to encrypt with a symmetric key'; const message = await WakuMessage.fromUtf8String( @@ -155,13 +161,14 @@ describe('Waku Message [node only]', function () { symKey: symKey, } ); - + dbg('Sending message over relay'); await waku.relay.send(message); let msgs: WakuRelayMessage[] = []; while (msgs.length === 0) { await delay(200); + dbg('Getting messages from nim-waku'); msgs = await nimWaku.getSymmetricMessages(symKey); } diff --git a/src/lib/waku_relay/index.node.spec.ts b/src/lib/waku_relay/index.node.spec.ts index 59e46194a8..92f0956742 100644 --- a/src/lib/waku_relay/index.node.spec.ts +++ b/src/lib/waku_relay/index.node.spec.ts @@ -55,9 +55,10 @@ describe('Waku Relay [node only]', () => { }); afterEach(async function () { - this.timeout(5000); - await waku1.stop(); - await waku2.stop(); + !!waku1 && + waku1.stop().catch((e) => console.log('Waku failed to stop', e)); + !!waku2 && + waku2.stop().catch((e) => console.log('Waku failed to stop', e)); }); it('Subscribe', async function () { @@ -322,8 +323,8 @@ describe('Waku Relay [node only]', () => { }); afterEach(async function () { - nimWaku ? nimWaku.stop() : null; - waku ? await waku.stop() : null; + !!nimWaku && nimWaku.stop(); + !!waku && waku.stop().catch((e) => console.log('Waku failed to stop', e)); }); it('nim subscribes to js', async function () { @@ -392,11 +393,11 @@ describe('Waku Relay [node only]', () => { let nimWaku: NimWaku; afterEach(async function () { - nimWaku ? nimWaku.stop() : null; - await Promise.all([ - waku1 ? await waku1.stop() : null, - waku2 ? await waku2.stop() : null, - ]); + !!nimWaku && nimWaku.stop(); + !!waku1 && + waku1.stop().catch((e) => console.log('Waku failed to stop', e)); + !!waku2 && + waku2.stop().catch((e) => console.log('Waku failed to stop', e)); }); it('Js publishes, other Js receives', async function () { diff --git a/src/lib/waku_relay/index.ts b/src/lib/waku_relay/index.ts index b4b1f677ad..1005179b30 100644 --- a/src/lib/waku_relay/index.ts +++ b/src/lib/waku_relay/index.ts @@ -385,10 +385,12 @@ export class WakuRelay extends Gossipsub { }); // Publish messages to peers const rpc = createGossipRpc([Gossipsub.utils.normalizeOutRpcMessage(msg)]); + dbg(`Relay message to ${toSend.size} peers`); toSend.forEach((id) => { if (id === msg.from) { return; } + dbg('Relay message to', id); this._sendRpc(id, rpc); }); } diff --git a/src/lib/waku_store/index.node.spec.ts b/src/lib/waku_store/index.node.spec.ts index 6db27c6aff..96754e5cc8 100644 --- a/src/lib/waku_store/index.node.spec.ts +++ b/src/lib/waku_store/index.node.spec.ts @@ -27,8 +27,8 @@ describe('Waku Store', () => { let nimWaku: NimWaku; afterEach(async function () { - nimWaku ? nimWaku.stop() : null; - waku ? await waku.stop() : null; + !!nimWaku && nimWaku.stop(); + !!waku && waku.stop().catch((e) => console.log('Waku failed to stop', e)); }); it('Retrieves history', async function () { @@ -301,7 +301,8 @@ describe('Waku Store', () => { expect(messages[1].payloadAsUtf8).to.eq(encryptedSymmetricMessageText); expect(messages[2].payloadAsUtf8).to.eq(encryptedAsymmetricMessageText); - await Promise.all([waku1.stop(), waku2.stop()]); + !!waku1 && waku1.stop().catch((e) => console.log('Waku failed to stop', e)); + !!waku2 && waku2.stop().catch((e) => console.log('Waku failed to stop', e)); }); it('Retrieves history with asymmetric & symmetric encrypted messages on different content topics', async function () { @@ -415,7 +416,8 @@ describe('Waku Store', () => { expect(messages[1].payloadAsUtf8).to.eq(encryptedSymmetricMessageText); expect(messages[2].payloadAsUtf8).to.eq(encryptedAsymmetricMessageText); - await Promise.all([waku1.stop(), waku2.stop()]); + !!waku1 && waku1.stop().catch((e) => console.log('Waku failed to stop', e)); + !!waku2 && waku2.stop().catch((e) => console.log('Waku failed to stop', e)); }); it('Retrieves history using start and end time', async function () { diff --git a/src/test_utils/nim_waku.ts b/src/test_utils/nim_waku.ts index 4dbc7d1931..45acb94466 100644 --- a/src/test_utils/nim_waku.ts +++ b/src/test_utils/nim_waku.ts @@ -12,7 +12,6 @@ import debug from 'debug'; import { Multiaddr, multiaddr } from 'multiaddr'; import PeerId from 'peer-id'; -import { delay } from '../lib/delay'; import { hexToBuf } from '../lib/utils'; import { DefaultPubSubTopic } from '../lib/waku'; import { WakuMessage } from '../lib/waku_message'; @@ -141,16 +140,12 @@ export class NimWaku { } public stop(): void { - // If killed too fast the SIGINT may not be registered - delay(100).then(() => { - dbg( - `nim-waku ${ - this.process ? this.process.pid : this.pid - } getting SIGINT at ${new Date().toLocaleTimeString()}` - ); - this.process ? this.process.kill('SIGINT') : null; - this.process = undefined; - }); + const pid = this.process ? this.process.pid : this.pid; + dbg(`nim-waku ${pid} getting SIGINT at ${new Date().toLocaleTimeString()}`); + if (!this.process) throw 'nim-waku process not set'; + const res = this.process.kill('SIGINT'); + dbg(`nim-waku ${pid} interrupted:`, res); + this.process = undefined; } async waitForLog(msg: string, timeout: number): Promise {