37: Use peerStore event to determine if identify is done & Auto-subscribe r=D4nte a=D4nte




Co-authored-by: Franck Royer <franck@royer.one>
This commit is contained in:
bors[bot] 2021-04-20 05:26:29 +00:00 committed by GitHub
commit 5d71cb2969
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 124 additions and 174 deletions

View File

@ -3,11 +3,11 @@ import util from 'util';
import TCP from 'libp2p-tcp'; import TCP from 'libp2p-tcp';
import Multiaddr from 'multiaddr'; import Multiaddr from 'multiaddr';
import PeerId from 'peer-id';
import Waku from '../lib/waku'; import Waku from '../lib/waku';
import { WakuMessage } from '../lib/waku_message'; import { WakuMessage } from '../lib/waku_message';
import { RelayDefaultTopic } from '../lib/waku_relay'; import { RelayDefaultTopic } from '../lib/waku_relay';
import { StoreCodec } from '../lib/waku_store';
import { ChatMessage } from './chat_message'; import { ChatMessage } from './chat_message';
@ -20,22 +20,12 @@ const ChatContentTopic = 'dingpu';
listenAddresses: [opts.listenAddr], listenAddresses: [opts.listenAddr],
modules: { transport: [TCP] }, modules: { transport: [TCP] },
}); });
console.log('PeerId: ', waku.libp2p.peerId); console.log('PeerId: ', waku.libp2p.peerId.toB58String());
console.log('Listening on '); console.log('Listening on ');
waku.libp2p.multiaddrs.forEach((address) => { waku.libp2p.multiaddrs.forEach((address) => {
console.log(`\t- ${address}`); console.log(`\t- ${address}`);
}); });
// TODO: Automatically subscribe, tracked with
// https://github.com/status-im/js-waku/issues/17
await waku.relay.subscribe();
console.log('Subscribed to waku relay');
if (opts.staticNode) {
console.log(`Dialing ${opts.staticNode}`);
await waku.dial(opts.staticNode);
}
const rl = readline.createInterface({ const rl = readline.createInterface({
input: process.stdin, input: process.stdin,
output: process.stdout, output: process.stdout,
@ -60,13 +50,22 @@ const ChatContentTopic = 'dingpu';
} }
}); });
const staticNodeId = opts.staticNode?.getPeerId(); if (opts.staticNode) {
if (staticNodeId) { console.log(`Dialing ${opts.staticNode}`);
const storePeerId = PeerId.createFromB58String(staticNodeId); await waku.dial(opts.staticNode);
}
// If we connect to a peer with WakuStore, we run the protocol
// TODO: Instead of doing it `once` it should always be done but
// only new messages should be printed
waku.libp2p.peerStore.once(
'change:protocols',
async ({ peerId, protocols }) => {
if (protocols.includes(StoreCodec)) {
console.log( console.log(
`Retrieving archived messages from ${storePeerId.toB58String()}` `Retrieving archived messages from ${peerId.toB58String()}`
); );
const messages = await waku.store.queryHistory(storePeerId, [ const messages = await waku.store.queryHistory(peerId, [
ChatContentTopic, ChatContentTopic,
]); ]);
messages?.map((msg) => { messages?.map((msg) => {
@ -76,6 +75,8 @@ const ChatContentTopic = 'dingpu';
} }
}); });
} }
}
);
console.log('Ready to chat!'); console.log('Ready to chat!');
rl.prompt(); rl.prompt();
@ -84,7 +85,7 @@ const ChatContentTopic = 'dingpu';
const chatMessage = new ChatMessage(new Date(), nick, line); const chatMessage = new ChatMessage(new Date(), nick, line);
const msg = WakuMessage.fromBytes(chatMessage.encode(), ChatContentTopic); const msg = WakuMessage.fromBytes(chatMessage.encode(), ChatContentTopic);
await waku.relay.publish(msg); await waku.relay.send(msg);
} }
})(); })();

View File

@ -4,16 +4,11 @@ import { bytes } from 'libp2p-noise/dist/src/@types/basic';
import { Noise } from 'libp2p-noise/dist/src/noise'; import { Noise } from 'libp2p-noise/dist/src/noise';
import Websockets from 'libp2p-websockets'; import Websockets from 'libp2p-websockets';
import Multiaddr from 'multiaddr'; import Multiaddr from 'multiaddr';
import pTimeout from 'p-timeout';
import PeerId from 'peer-id'; import PeerId from 'peer-id';
import { delay } from './delay'; import { RelayCodec, WakuRelay } from './waku_relay';
import { RelayCodec, WakuRelay, WakuRelayPubsub } from './waku_relay';
import { StoreCodec, WakuStore } from './waku_store'; import { StoreCodec, WakuStore } from './waku_store';
const WaitForIdentityFreqMs = 50;
const WaitForIdentityTimeoutMs = 2_000;
export interface CreateOptions { export interface CreateOptions {
listenAddresses: string[]; listenAddresses: string[];
staticNoiseKey: bytes | undefined; staticNoiseKey: bytes | undefined;
@ -26,11 +21,15 @@ export interface CreateOptions {
} }
export default class Waku { export default class Waku {
private constructor( public libp2p: Libp2p;
public libp2p: Libp2p, public relay: WakuRelay;
public relay: WakuRelay, public store: WakuStore;
public store: WakuStore
) {} private constructor(libp2p: Libp2p, store: WakuStore) {
this.libp2p = libp2p;
this.relay = (libp2p.pubsub as unknown) as WakuRelay;
this.store = store;
}
/** /**
* Create new waku node * Create new waku node
@ -69,7 +68,7 @@ export default class Waku {
connEncryption: [new Noise(opts.staticNoiseKey)], connEncryption: [new Noise(opts.staticNoiseKey)],
// eslint-disable-next-line @typescript-eslint/ban-ts-comment // eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore: Type needs update // @ts-ignore: Type needs update
pubsub: WakuRelayPubsub, pubsub: WakuRelay,
}, },
}); });
@ -77,7 +76,7 @@ export default class Waku {
await libp2p.start(); await libp2p.start();
return new Waku(libp2p, new WakuRelay(libp2p.pubsub), wakuStore); return new Waku(libp2p, wakuStore);
} }
/** /**
@ -86,56 +85,10 @@ export default class Waku {
*/ */
async dial(peer: PeerId | Multiaddr | string) { async dial(peer: PeerId | Multiaddr | string) {
await this.libp2p.dialProtocol(peer, [RelayCodec, StoreCodec]); await this.libp2p.dialProtocol(peer, [RelayCodec, StoreCodec]);
const peerId = toPeerId(peer);
await this.waitForIdentify(
peerId,
WaitForIdentityFreqMs,
WaitForIdentityTimeoutMs
);
} }
async dialWithMultiAddr(peerId: PeerId, multiaddr: Multiaddr[]) { async dialWithMultiAddr(peerId: PeerId, multiaddr: Multiaddr[]) {
this.libp2p.peerStore.addressBook.set(peerId, multiaddr); this.libp2p.peerStore.addressBook.set(peerId, multiaddr);
await this.libp2p.dialProtocol(peerId, RelayCodec);
await this.waitForIdentify(
peerId,
WaitForIdentityFreqMs,
WaitForIdentityTimeoutMs
);
}
/**
* Wait for the identify protocol to be finished. This helps ensure
* we know what protocols the peer implements
* @param peerId
* @param frequencyMilliseconds
* @param maxTimeoutMilliseconds
* @throws If there is no known connection with this peer.
*/
async waitForIdentify(
peerId: PeerId,
frequencyMilliseconds: number,
maxTimeoutMilliseconds: number
): Promise<void> {
const checkProtocols = this._waitForIdentify.bind(
this,
peerId,
frequencyMilliseconds
)();
await pTimeout(checkProtocols, maxTimeoutMilliseconds);
}
async _waitForIdentify(peerId: PeerId, frequencyMilliseconds: number) {
while (true) {
const peer = this.libp2p.peerStore.get(peerId);
if (!peer) throw 'No connection to peer';
if (peer.protocols.length > 0) {
return;
} else {
await delay(frequencyMilliseconds);
}
}
} }
async stop() { async stop() {
@ -158,18 +111,3 @@ export default class Waku {
return multiAddrWithId; return multiAddrWithId;
} }
} }
function toPeerId(peer: PeerId | Multiaddr | string): PeerId {
if (typeof peer === 'string') {
peer = new Multiaddr(peer);
}
if (Multiaddr.isMultiaddr(peer)) {
try {
peer = PeerId.createFromB58String(peer.getPeerId());
} catch (err) {
throw `${peer} is not a valid peer type`;
}
}
return peer;
}

View File

@ -1,6 +1,7 @@
import Gossipsub from 'libp2p-gossipsub';
import { shuffle } from 'libp2p-gossipsub/src/utils'; import { shuffle } from 'libp2p-gossipsub/src/utils';
import { RelayCodec, WakuRelayPubsub } from './index'; import { RelayCodec } from './index';
/** /**
* Given a topic, returns up to count peers subscribed to that topic * Given a topic, returns up to count peers subscribed to that topic
@ -14,7 +15,7 @@ import { RelayCodec, WakuRelayPubsub } from './index';
* *
*/ */
export function getRelayPeers( export function getRelayPeers(
router: WakuRelayPubsub, router: Gossipsub,
topic: string, topic: string,
count: number, count: number,
filter: (id: string) => boolean = () => true filter: (id: string) => boolean = () => true

View File

@ -34,9 +34,6 @@ describe('Waku Relay', () => {
await waku1.dialWithMultiAddr(waku2.libp2p.peerId, waku2.libp2p.multiaddrs); await waku1.dialWithMultiAddr(waku2.libp2p.peerId, waku2.libp2p.multiaddrs);
await waku1.relay.subscribe();
await waku2.relay.subscribe();
await Promise.all([ await Promise.all([
new Promise((resolve) => new Promise((resolve) =>
waku1.libp2p.pubsub.once('pubsub:subscription-change', () => waku1.libp2p.pubsub.once('pubsub:subscription-change', () =>
@ -79,7 +76,7 @@ describe('Waku Relay', () => {
const receivedPromise = waitForNextData(waku2.libp2p.pubsub); const receivedPromise = waitForNextData(waku2.libp2p.pubsub);
await waku1.relay.publish(message); await waku1.relay.send(message);
const receivedMsg = await receivedPromise; const receivedMsg = await receivedPromise;
@ -107,7 +104,6 @@ describe('Waku Relay', () => {
nimWaku = new NimWaku(makeLogFileName(this)); nimWaku = new NimWaku(makeLogFileName(this));
await nimWaku.start({ staticnode: multiAddrWithId }); await nimWaku.start({ staticnode: multiAddrWithId });
await waku.relay.subscribe();
await new Promise((resolve) => await new Promise((resolve) =>
waku.libp2p.pubsub.once('gossipsub:heartbeat', resolve) waku.libp2p.pubsub.once('gossipsub:heartbeat', resolve)
); );
@ -132,7 +128,7 @@ describe('Waku Relay', () => {
const message = WakuMessage.fromUtf8String('This is a message'); const message = WakuMessage.fromUtf8String('This is a message');
await waku.relay.publish(message); await waku.relay.send(message);
let msgs = []; let msgs = [];
@ -182,7 +178,15 @@ describe('Waku Relay', () => {
await waku.dial(await nimWaku.getMultiaddrWithId()); await waku.dial(await nimWaku.getMultiaddrWithId());
await waku.relay.subscribe(); // Wait for identify protocol to finish
await new Promise((resolve) => {
waku.libp2p.peerStore.once('change:protocols', resolve);
});
// Wait for one heartbeat to ensure mesh is updated
await new Promise((resolve) => {
waku.libp2p.pubsub.once('gossipsub:heartbeat', resolve);
});
}); });
afterEach(async function () { afterEach(async function () {
@ -200,15 +204,16 @@ describe('Waku Relay', () => {
}); });
it('Js publishes to nim', async function () { it('Js publishes to nim', async function () {
this.timeout(5000); this.timeout(30000);
const message = WakuMessage.fromUtf8String('This is a message'); const message = WakuMessage.fromUtf8String('This is a message');
await delay(1000);
await waku.relay.publish(message); await waku.relay.send(message);
let msgs = []; let msgs = [];
while (msgs.length === 0) { while (msgs.length === 0) {
console.log('Waiting for messages');
await delay(200); await delay(200);
msgs = await nimWaku.messages(); msgs = await nimWaku.messages();
} }
@ -239,13 +244,21 @@ describe('Waku Relay', () => {
}); });
}); });
describe('js to nim to js', function () { describe.skip('js to nim to js', function () {
let waku1: Waku; let waku1: Waku;
let waku2: Waku; let waku2: Waku;
let nimWaku: NimWaku; let nimWaku: NimWaku;
beforeEach(async function () { afterEach(async function () {
this.timeout(10_000); nimWaku ? nimWaku.stop() : null;
await Promise.all([
waku1 ? await waku1.stop() : null,
waku2 ? await waku2.stop() : null,
]);
});
it('Js publishes, other Js receives', async function () {
this.timeout(60_000);
[waku1, waku2] = await Promise.all([ [waku1, waku2] = await Promise.all([
Waku.create({ Waku.create({
staticNoiseKey: NOISE_KEY_1, staticNoiseKey: NOISE_KEY_1,
@ -257,7 +270,7 @@ describe('Waku Relay', () => {
}), }),
]); ]);
nimWaku = new NimWaku(this.test?.ctx?.currentTest?.title + ''); nimWaku = new NimWaku(makeLogFileName(this));
await nimWaku.start(); await nimWaku.start();
const nimWakuMultiaddr = await nimWaku.getMultiaddrWithId(); const nimWakuMultiaddr = await nimWaku.getMultiaddrWithId();
@ -266,7 +279,15 @@ describe('Waku Relay', () => {
waku2.dial(nimWakuMultiaddr), waku2.dial(nimWakuMultiaddr),
]); ]);
await Promise.all([waku1.relay.subscribe(), waku2.relay.subscribe()]); // Wait for identify protocol to finish
await Promise.all([
new Promise((resolve) =>
waku1.libp2p.peerStore.once('change:protocols', resolve)
),
new Promise((resolve) =>
waku2.libp2p.peerStore.once('change:protocols', resolve)
),
]);
await Promise.all([ await Promise.all([
new Promise((resolve) => new Promise((resolve) =>
@ -276,17 +297,8 @@ describe('Waku Relay', () => {
waku2.libp2p.pubsub.once('gossipsub:heartbeat', resolve) waku2.libp2p.pubsub.once('gossipsub:heartbeat', resolve)
), ),
]); ]);
});
afterEach(async function () { await delay(2000);
nimWaku ? nimWaku.stop() : null;
await Promise.all([
waku1 ? await waku1.stop() : null,
waku2 ? await waku2.stop() : null,
]);
});
it('Js publishes, other Js receives', async function () {
// Check that the two JS peers are NOT directly connected // Check that the two JS peers are NOT directly connected
expect( expect(
waku1.libp2p.peerStore.peers.has(waku2.libp2p.peerId.toB58String()) waku1.libp2p.peerStore.peers.has(waku2.libp2p.peerId.toB58String())
@ -300,8 +312,8 @@ describe('Waku Relay', () => {
const waku2ReceivedPromise = waitForNextData(waku2.libp2p.pubsub); const waku2ReceivedPromise = waitForNextData(waku2.libp2p.pubsub);
await waku1.relay.publish(message); await waku1.relay.send(message);
console.log('Waiting for message');
const waku2ReceivedMsg = await waku2ReceivedPromise; const waku2ReceivedMsg = await waku2ReceivedPromise;
expect(waku2ReceivedMsg.utf8Payload()).to.eq(msgStr); expect(waku2ReceivedMsg.utf8Payload()).to.eq(msgStr);

View File

@ -6,7 +6,7 @@ import {
messageIdToString, messageIdToString,
shuffle, shuffle,
} from 'libp2p-gossipsub/src/utils'; } from 'libp2p-gossipsub/src/utils';
import Pubsub, { InMessage } from 'libp2p-interfaces/src/pubsub'; import { InMessage } from 'libp2p-interfaces/src/pubsub';
import { SignaturePolicy } from 'libp2p-interfaces/src/pubsub/signature-policy'; import { SignaturePolicy } from 'libp2p-interfaces/src/pubsub/signature-policy';
import PeerId from 'peer-id'; import PeerId from 'peer-id';
@ -19,8 +19,7 @@ import { RelayHeartbeat } from './relay_heartbeat';
export * from './constants'; export * from './constants';
export * from './relay_heartbeat'; export * from './relay_heartbeat';
// This is the class to pass to libp2p as pubsub protocol export class WakuRelay extends Gossipsub {
export class WakuRelayPubsub extends Gossipsub {
heartbeat: RelayHeartbeat; heartbeat: RelayHeartbeat;
/** /**
@ -43,6 +42,28 @@ export class WakuRelayPubsub extends Gossipsub {
Object.assign(this, { multicodecs }); Object.assign(this, { multicodecs });
} }
/**
* Mounts the gossipsub protocol onto the libp2p node
* and subscribes to the default topic
* @override
* @returns {void}
*/
start() {
super.start();
super.subscribe(constants.RelayDefaultTopic);
}
/**
* Send Waku messages under default topic
* @override
* @param {WakuMessage} message
* @returns {Promise<void>}
*/
async send(message: WakuMessage) {
const msg = message.toBinary();
await super.publish(constants.RelayDefaultTopic, Buffer.from(msg));
}
/** /**
* Join topic * Join topic
* @param {string} topic * @param {string} topic
@ -291,18 +312,3 @@ export class WakuRelayPubsub extends Gossipsub {
}; };
} }
} }
// This class provides an interface to execute the waku relay protocol
export class WakuRelay {
constructor(private pubsub: Pubsub) {}
// At this stage we are always using the same topic so we do not pass it as a parameter
async subscribe() {
await this.pubsub.subscribe(constants.RelayDefaultTopic);
}
async publish(message: WakuMessage) {
const msg = message.toBinary();
await this.pubsub.publish(constants.RelayDefaultTopic, msg);
}
}

View File

@ -1,12 +1,7 @@
import { expect } from 'chai'; import { expect } from 'chai';
import TCP from 'libp2p-tcp'; import TCP from 'libp2p-tcp';
import { import { makeLogFileName, NimWaku, NOISE_KEY_1 } from '../../test_utils';
makeLogFileName,
NimWaku,
NOISE_KEY_1,
NOISE_KEY_2,
} from '../../test_utils';
import Waku from '../waku'; import Waku from '../waku';
import { WakuMessage } from '../waku_message'; import { WakuMessage } from '../waku_message';
@ -14,25 +9,6 @@ describe('Waku Store', () => {
let waku: Waku; let waku: Waku;
let nimWaku: NimWaku; let nimWaku: NimWaku;
beforeEach(async function () {
this.timeout(5_000);
nimWaku = new NimWaku(makeLogFileName(this));
await nimWaku.start({ store: true });
const waku0 = await Waku.create({
staticNoiseKey: NOISE_KEY_2,
modules: { transport: [TCP] },
});
await waku0.dial(await nimWaku.getMultiaddrWithId());
await waku0.relay.subscribe();
await new Promise((resolve) =>
waku0.libp2p.pubsub.once('gossipsub:heartbeat', resolve)
);
});
afterEach(async function () { afterEach(async function () {
nimWaku ? nimWaku.stop() : null; nimWaku ? nimWaku.stop() : null;
waku ? await waku.stop() : null; waku ? await waku.stop() : null;
@ -41,6 +17,9 @@ describe('Waku Store', () => {
it('Retrieves history', async function () { it('Retrieves history', async function () {
this.timeout(5_000); this.timeout(5_000);
nimWaku = new NimWaku(makeLogFileName(this));
await nimWaku.start({ store: true });
for (let i = 0; i < 2; i++) { for (let i = 0; i < 2; i++) {
await nimWaku.sendMessage(WakuMessage.fromUtf8String(`Message ${i}`)); await nimWaku.sendMessage(WakuMessage.fromUtf8String(`Message ${i}`));
} }
@ -51,6 +30,11 @@ describe('Waku Store', () => {
}); });
await waku.dial(await nimWaku.getMultiaddrWithId()); await waku.dial(await nimWaku.getMultiaddrWithId());
// Wait for identify protocol to finish
await new Promise((resolve) => {
waku.libp2p.peerStore.once('change:protocols', resolve);
});
const nimPeerId = await nimWaku.getPeerId(); const nimPeerId = await nimWaku.getPeerId();
const messages = await waku.store.queryHistory(nimPeerId); const messages = await waku.store.queryHistory(nimPeerId);
@ -65,6 +49,9 @@ describe('Waku Store', () => {
it('Retrieves all historical elements in chronological order through paging', async function () { it('Retrieves all historical elements in chronological order through paging', async function () {
this.timeout(5_000); this.timeout(5_000);
nimWaku = new NimWaku(makeLogFileName(this));
await nimWaku.start({ store: true });
for (let i = 0; i < 15; i++) { for (let i = 0; i < 15; i++) {
await nimWaku.sendMessage(WakuMessage.fromUtf8String(`Message ${i}`)); await nimWaku.sendMessage(WakuMessage.fromUtf8String(`Message ${i}`));
} }
@ -75,6 +62,11 @@ describe('Waku Store', () => {
}); });
await waku.dial(await nimWaku.getMultiaddrWithId()); await waku.dial(await nimWaku.getMultiaddrWithId());
// Wait for identify protocol to finish
await new Promise((resolve) => {
waku.libp2p.peerStore.once('change:protocols', resolve);
});
const nimPeerId = await nimWaku.getPeerId(); const nimPeerId = await nimWaku.getPeerId();
const messages = await waku.store.queryHistory(nimPeerId); const messages = await waku.store.queryHistory(nimPeerId);