Wait for heartbeat before considering relay peer ready (#472)

This commit is contained in:
Franck R 2022-01-31 15:30:49 +11:00 committed by GitHub
parent 03491a892b
commit 47c5565371
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 75 additions and 53 deletions

View File

@ -1,4 +1,5 @@
import { expect } from 'chai';
import debug from 'debug';
import PeerId from 'peer-id';
import {
@ -12,6 +13,8 @@ import { Protocols, Waku } from './waku';
import { WakuMessage } from './waku_message';
import { generateSymmetricKey } from './waku_message/version_1';
const dbg = debug('waku:test');
const TestContentTopic = '/test/1/waku/utf8';
describe('Waku Dial [node only]', function () {
@ -20,8 +23,8 @@ describe('Waku Dial [node only]', function () {
let nimWaku: NimWaku;
afterEach(async function () {
nimWaku ? nimWaku.stop() : null;
waku ? await waku.stop() : null;
!!nimWaku && nimWaku.stop();
!!waku && waku.stop().catch((e) => console.log('Waku failed to stop', e));
});
it('js connects to nim', async function () {
@ -48,8 +51,8 @@ describe('Waku Dial [node only]', function () {
let nimWaku: NimWaku;
afterEach(async function () {
nimWaku ? nimWaku.stop() : null;
waku ? await waku.stop() : null;
!!nimWaku && nimWaku.stop();
!!waku && waku.stop().catch((e) => console.log('Waku failed to stop', e));
});
before(function () {
@ -83,8 +86,8 @@ describe('Waku Dial [node only]', function () {
let nimWaku: NimWaku;
afterEach(async function () {
nimWaku ? nimWaku.stop() : null;
waku ? await waku.stop() : null;
!!nimWaku && nimWaku.stop();
!!waku && waku.stop().catch((e) => console.log('Waku failed to stop', e));
});
it('Passing an array', async function () {
@ -145,6 +148,7 @@ describe('Decryption Keys', () => {
let waku1: Waku;
let waku2: Waku;
beforeEach(async function () {
this.timeout(5000);
[waku1, waku2] = await Promise.all([
Waku.create({ staticNoiseKey: NOISE_KEY_1 }),
Waku.create({
@ -162,9 +166,8 @@ describe('Decryption Keys', () => {
});
afterEach(async function () {
this.timeout(5000);
await waku1.stop();
await waku2.stop();
!!waku1 && waku1.stop().catch((e) => console.log('Waku failed to stop', e));
!!waku2 && waku2.stop().catch((e) => console.log('Waku failed to stop', e));
});
it('Used by Waku Relay', async function () {
@ -205,8 +208,8 @@ describe('Wait for remote peer / get peers', function () {
let nimWaku: NimWaku;
afterEach(async function () {
nimWaku ? nimWaku.stop() : null;
waku ? await waku.stop() : null;
!!nimWaku && nimWaku.stop();
!!waku && waku.stop().catch((e) => console.log('Waku failed to stop', e));
});
it('Relay', async function () {
@ -215,11 +218,15 @@ describe('Wait for remote peer / get peers', function () {
await nimWaku.start();
const multiAddrWithId = await nimWaku.getMultiaddrWithId();
dbg('Create');
waku = await Waku.create({
staticNoiseKey: NOISE_KEY_1,
});
dbg('Dial');
await waku.dial(multiAddrWithId);
dbg('waitForRemotePeer');
await waku.waitForRemotePeer([Protocols.Relay]);
dbg('Done, get peers');
const peers = waku.relay.getPeers();
const nimPeerId = multiAddrWithId.getPeerId();

View File

@ -326,7 +326,9 @@ export class Waku {
// No peer yet available, wait for a subscription
const promise = new Promise<void>((resolve) => {
this.libp2p.pubsub.once('pubsub:subscription-change', () => {
resolve();
// Remote peer subscribed to topic, now wait for a heartbeat
// so that the mesh is updated and the remote peer added to it
this.libp2p.pubsub.once('gossipsub:heartbeat', resolve);
});
});
promises.push(promise);

View File

@ -1,10 +1,13 @@
import { expect } from 'chai';
import debug from 'debug';
import { makeLogFileName, NimWaku, NOISE_KEY_1 } from '../../test_utils';
import { delay } from '../delay';
import { Protocols, Waku } from '../waku';
import { WakuMessage } from '../waku_message';
const dbg = debug('waku:test:lightpush');
const TestContentTopic = '/test/1/waku-light-push/utf8';
describe('Waku Light Push [node only]', () => {
@ -12,8 +15,8 @@ describe('Waku Light Push [node only]', () => {
let nimWaku: NimWaku;
afterEach(async function () {
nimWaku ? nimWaku.stop() : null;
waku ? await waku.stop() : null;
!!nimWaku && nimWaku.stop();
!!waku && waku.stop().catch((e) => console.log('Waku failed to stop', e));
});
it('Push successfully', async function () {
@ -72,13 +75,16 @@ describe('Waku Light Push [node only]', () => {
TestContentTopic
);
dbg('Send message via lightpush');
const pushResponse = await waku.lightPush.push(message, {
peerId: nimPeerId,
});
dbg('Ack received', pushResponse);
expect(pushResponse?.isSuccess).to.be.true;
let msgs: WakuMessage[] = [];
dbg('Waiting for message to show on nim-waku side');
while (msgs.length === 0) {
await delay(200);
msgs = await nimWaku.messages();

View File

@ -35,21 +35,19 @@ describe('Waku Message [node only]', function () {
});
nimWaku = new NimWaku(makeLogFileName(this));
await nimWaku.start({ rpcPrivate: true });
dbg('Starting nim-waku node');
await nimWaku.start({ rpcPrivate: true, lightpush: true });
dbg('Dialing to nim-waku node');
await waku.dial(await nimWaku.getMultiaddrWithId());
await waku.waitForRemotePeer([Protocols.Relay]);
let peers = await waku.relay.getPeers();
while (peers.size === 0) {
await delay(200);
peers = await waku.relay.getPeers();
}
dbg('Wait for remote peer');
await waku.waitForRemotePeer([Protocols.Relay, Protocols.LightPush]);
dbg('Remote peer ready');
});
afterEach(async function () {
nimWaku ? nimWaku.stop() : null;
waku ? await waku.stop() : null;
!!nimWaku && nimWaku.stop();
!!waku && waku.stop().catch((e) => console.log('Waku failed to stop', e));
});
it('JS decrypts nim message [asymmetric, no signature]', async function () {
@ -86,11 +84,13 @@ describe('Waku Message [node only]', function () {
it('Js encrypts message for nim [asymmetric, no signature]', async function () {
this.timeout(5000);
dbg('Ask nim-waku to generate asymmetric key pair');
const keyPair = await nimWaku.getAsymmetricKeyPair();
const privateKey = hexToBuf(keyPair.privateKey);
const publicKey = hexToBuf(keyPair.publicKey);
const messageText = 'This is a message I am going to encrypt';
dbg('Encrypt message');
const message = await WakuMessage.fromUtf8String(
messageText,
TestContentTopic,
@ -99,15 +99,18 @@ describe('Waku Message [node only]', function () {
}
);
dbg('Send message over relay');
await waku.relay.send(message);
let msgs: WakuRelayMessage[] = [];
while (msgs.length === 0) {
dbg('Wait for message to be seen by nim-waku');
await delay(200);
msgs = await nimWaku.getAsymmetricMessages(privateKey);
}
dbg('Check message content');
expect(msgs[0].contentTopic).to.equal(message.contentTopic);
expect(hexToBuf(msgs[0].payload).toString('utf-8')).to.equal(messageText);
});
@ -121,6 +124,7 @@ describe('Waku Message [node only]', function () {
payload: Buffer.from(messageText, 'utf-8').toString('hex'),
};
dbg('Generate symmetric key');
const symKey = generateSymmetricKey();
waku.relay.addDecryptionKey(symKey);
@ -131,10 +135,11 @@ describe('Waku Message [node only]', function () {
}
);
dbg('Post message');
dbg('Post message using nim-waku');
await nimWaku.postSymmetricMessage(message, symKey);
dbg('Wait for message to be received by js-waku');
const receivedMsg = await receivedMsgPromise;
dbg('Message received by js-waku');
expect(receivedMsg.contentTopic).to.eq(message.contentTopic);
expect(receivedMsg.version).to.eq(1);
@ -144,8 +149,9 @@ describe('Waku Message [node only]', function () {
it('Js encrypts message for nim [symmetric, no signature]', async function () {
this.timeout(5000);
dbg('Getting symmetric key from nim-waku');
const symKey = await nimWaku.getSymmetricKey();
dbg('Encrypting message with js-waku');
const messageText =
'This is a message I am going to encrypt with a symmetric key';
const message = await WakuMessage.fromUtf8String(
@ -155,13 +161,14 @@ describe('Waku Message [node only]', function () {
symKey: symKey,
}
);
dbg('Sending message over relay');
await waku.relay.send(message);
let msgs: WakuRelayMessage[] = [];
while (msgs.length === 0) {
await delay(200);
dbg('Getting messages from nim-waku');
msgs = await nimWaku.getSymmetricMessages(symKey);
}

View File

@ -55,9 +55,10 @@ describe('Waku Relay [node only]', () => {
});
afterEach(async function () {
this.timeout(5000);
await waku1.stop();
await waku2.stop();
!!waku1 &&
waku1.stop().catch((e) => console.log('Waku failed to stop', e));
!!waku2 &&
waku2.stop().catch((e) => console.log('Waku failed to stop', e));
});
it('Subscribe', async function () {
@ -322,8 +323,8 @@ describe('Waku Relay [node only]', () => {
});
afterEach(async function () {
nimWaku ? nimWaku.stop() : null;
waku ? await waku.stop() : null;
!!nimWaku && nimWaku.stop();
!!waku && waku.stop().catch((e) => console.log('Waku failed to stop', e));
});
it('nim subscribes to js', async function () {
@ -392,11 +393,11 @@ describe('Waku Relay [node only]', () => {
let nimWaku: NimWaku;
afterEach(async function () {
nimWaku ? nimWaku.stop() : null;
await Promise.all([
waku1 ? await waku1.stop() : null,
waku2 ? await waku2.stop() : null,
]);
!!nimWaku && nimWaku.stop();
!!waku1 &&
waku1.stop().catch((e) => console.log('Waku failed to stop', e));
!!waku2 &&
waku2.stop().catch((e) => console.log('Waku failed to stop', e));
});
it('Js publishes, other Js receives', async function () {

View File

@ -385,10 +385,12 @@ export class WakuRelay extends Gossipsub {
});
// Publish messages to peers
const rpc = createGossipRpc([Gossipsub.utils.normalizeOutRpcMessage(msg)]);
dbg(`Relay message to ${toSend.size} peers`);
toSend.forEach((id) => {
if (id === msg.from) {
return;
}
dbg('Relay message to', id);
this._sendRpc(id, rpc);
});
}

View File

@ -27,8 +27,8 @@ describe('Waku Store', () => {
let nimWaku: NimWaku;
afterEach(async function () {
nimWaku ? nimWaku.stop() : null;
waku ? await waku.stop() : null;
!!nimWaku && nimWaku.stop();
!!waku && waku.stop().catch((e) => console.log('Waku failed to stop', e));
});
it('Retrieves history', async function () {
@ -301,7 +301,8 @@ describe('Waku Store', () => {
expect(messages[1].payloadAsUtf8).to.eq(encryptedSymmetricMessageText);
expect(messages[2].payloadAsUtf8).to.eq(encryptedAsymmetricMessageText);
await Promise.all([waku1.stop(), waku2.stop()]);
!!waku1 && waku1.stop().catch((e) => console.log('Waku failed to stop', e));
!!waku2 && waku2.stop().catch((e) => console.log('Waku failed to stop', e));
});
it('Retrieves history with asymmetric & symmetric encrypted messages on different content topics', async function () {
@ -415,7 +416,8 @@ describe('Waku Store', () => {
expect(messages[1].payloadAsUtf8).to.eq(encryptedSymmetricMessageText);
expect(messages[2].payloadAsUtf8).to.eq(encryptedAsymmetricMessageText);
await Promise.all([waku1.stop(), waku2.stop()]);
!!waku1 && waku1.stop().catch((e) => console.log('Waku failed to stop', e));
!!waku2 && waku2.stop().catch((e) => console.log('Waku failed to stop', e));
});
it('Retrieves history using start and end time', async function () {

View File

@ -12,7 +12,6 @@ import debug from 'debug';
import { Multiaddr, multiaddr } from 'multiaddr';
import PeerId from 'peer-id';
import { delay } from '../lib/delay';
import { hexToBuf } from '../lib/utils';
import { DefaultPubSubTopic } from '../lib/waku';
import { WakuMessage } from '../lib/waku_message';
@ -141,16 +140,12 @@ export class NimWaku {
}
public stop(): void {
// If killed too fast the SIGINT may not be registered
delay(100).then(() => {
dbg(
`nim-waku ${
this.process ? this.process.pid : this.pid
} getting SIGINT at ${new Date().toLocaleTimeString()}`
);
this.process ? this.process.kill('SIGINT') : null;
this.process = undefined;
});
const pid = this.process ? this.process.pid : this.pid;
dbg(`nim-waku ${pid} getting SIGINT at ${new Date().toLocaleTimeString()}`);
if (!this.process) throw 'nim-waku process not set';
const res = this.process.kill('SIGINT');
dbg(`nim-waku ${pid} interrupted:`, res);
this.process = undefined;
}
async waitForLog(msg: string, timeout: number): Promise<void> {