mirror of https://github.com/waku-org/js-waku.git
Use peerStore event to determine if identify is done
This commit is contained in:
parent
9d64ac84ad
commit
9a305033bc
|
@ -3,11 +3,11 @@ import util from 'util';
|
|||
|
||||
import TCP from 'libp2p-tcp';
|
||||
import Multiaddr from 'multiaddr';
|
||||
import PeerId from 'peer-id';
|
||||
|
||||
import Waku from '../lib/waku';
|
||||
import { WakuMessage } from '../lib/waku_message';
|
||||
import { RelayDefaultTopic } from '../lib/waku_relay';
|
||||
import { StoreCodec } from '../lib/waku_store';
|
||||
|
||||
import { ChatMessage } from './chat_message';
|
||||
|
||||
|
@ -20,7 +20,7 @@ const ChatContentTopic = 'dingpu';
|
|||
listenAddresses: [opts.listenAddr],
|
||||
modules: { transport: [TCP] },
|
||||
});
|
||||
console.log('PeerId: ', waku.libp2p.peerId);
|
||||
console.log('PeerId: ', waku.libp2p.peerId.toB58String());
|
||||
console.log('Listening on ');
|
||||
waku.libp2p.multiaddrs.forEach((address) => {
|
||||
console.log(`\t- ${address}`);
|
||||
|
@ -31,11 +31,6 @@ const ChatContentTopic = 'dingpu';
|
|||
await waku.relay.subscribe();
|
||||
console.log('Subscribed to waku relay');
|
||||
|
||||
if (opts.staticNode) {
|
||||
console.log(`Dialing ${opts.staticNode}`);
|
||||
await waku.dial(opts.staticNode);
|
||||
}
|
||||
|
||||
const rl = readline.createInterface({
|
||||
input: process.stdin,
|
||||
output: process.stdout,
|
||||
|
@ -60,23 +55,34 @@ const ChatContentTopic = 'dingpu';
|
|||
}
|
||||
});
|
||||
|
||||
const staticNodeId = opts.staticNode?.getPeerId();
|
||||
if (staticNodeId) {
|
||||
const storePeerId = PeerId.createFromB58String(staticNodeId);
|
||||
console.log(
|
||||
`Retrieving archived messages from ${storePeerId.toB58String()}`
|
||||
);
|
||||
const messages = await waku.store.queryHistory(storePeerId, [
|
||||
ChatContentTopic,
|
||||
]);
|
||||
messages?.map((msg) => {
|
||||
if (msg.payload) {
|
||||
const chatMsg = ChatMessage.decode(msg.payload);
|
||||
printMessage(chatMsg);
|
||||
}
|
||||
});
|
||||
if (opts.staticNode) {
|
||||
console.log(`Dialing ${opts.staticNode}`);
|
||||
await waku.dial(opts.staticNode);
|
||||
}
|
||||
|
||||
// If we connect to a peer with WakuStore, we run the protocol
|
||||
// TODO: Instead of doing it `once` it should always be done but
|
||||
// only new messages should be printed
|
||||
waku.libp2p.peerStore.once(
|
||||
'change:protocols',
|
||||
async ({ peerId, protocols }) => {
|
||||
if (protocols.includes(StoreCodec)) {
|
||||
console.log(
|
||||
`Retrieving archived messages from ${peerId.toB58String()}`
|
||||
);
|
||||
const messages = await waku.store.queryHistory(peerId, [
|
||||
ChatContentTopic,
|
||||
]);
|
||||
messages?.map((msg) => {
|
||||
if (msg.payload) {
|
||||
const chatMsg = ChatMessage.decode(msg.payload);
|
||||
printMessage(chatMsg);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
console.log('Ready to chat!');
|
||||
rl.prompt();
|
||||
for await (const line of rl) {
|
||||
|
|
|
@ -4,16 +4,11 @@ import { bytes } from 'libp2p-noise/dist/src/@types/basic';
|
|||
import { Noise } from 'libp2p-noise/dist/src/noise';
|
||||
import Websockets from 'libp2p-websockets';
|
||||
import Multiaddr from 'multiaddr';
|
||||
import pTimeout from 'p-timeout';
|
||||
import PeerId from 'peer-id';
|
||||
|
||||
import { delay } from './delay';
|
||||
import { RelayCodec, WakuRelay, WakuRelayPubsub } from './waku_relay';
|
||||
import { StoreCodec, WakuStore } from './waku_store';
|
||||
|
||||
const WaitForIdentityFreqMs = 50;
|
||||
const WaitForIdentityTimeoutMs = 2_000;
|
||||
|
||||
export interface CreateOptions {
|
||||
listenAddresses: string[];
|
||||
staticNoiseKey: bytes | undefined;
|
||||
|
@ -86,56 +81,10 @@ export default class Waku {
|
|||
*/
|
||||
async dial(peer: PeerId | Multiaddr | string) {
|
||||
await this.libp2p.dialProtocol(peer, [RelayCodec, StoreCodec]);
|
||||
const peerId = toPeerId(peer);
|
||||
await this.waitForIdentify(
|
||||
peerId,
|
||||
WaitForIdentityFreqMs,
|
||||
WaitForIdentityTimeoutMs
|
||||
);
|
||||
}
|
||||
|
||||
async dialWithMultiAddr(peerId: PeerId, multiaddr: Multiaddr[]) {
|
||||
this.libp2p.peerStore.addressBook.set(peerId, multiaddr);
|
||||
await this.libp2p.dialProtocol(peerId, RelayCodec);
|
||||
await this.waitForIdentify(
|
||||
peerId,
|
||||
WaitForIdentityFreqMs,
|
||||
WaitForIdentityTimeoutMs
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for the identify protocol to be finished. This helps ensure
|
||||
* we know what protocols the peer implements
|
||||
* @param peerId
|
||||
* @param frequencyMilliseconds
|
||||
* @param maxTimeoutMilliseconds
|
||||
* @throws If there is no known connection with this peer.
|
||||
*/
|
||||
async waitForIdentify(
|
||||
peerId: PeerId,
|
||||
frequencyMilliseconds: number,
|
||||
maxTimeoutMilliseconds: number
|
||||
): Promise<void> {
|
||||
const checkProtocols = this._waitForIdentify.bind(
|
||||
this,
|
||||
peerId,
|
||||
frequencyMilliseconds
|
||||
)();
|
||||
|
||||
await pTimeout(checkProtocols, maxTimeoutMilliseconds);
|
||||
}
|
||||
|
||||
async _waitForIdentify(peerId: PeerId, frequencyMilliseconds: number) {
|
||||
while (true) {
|
||||
const peer = this.libp2p.peerStore.get(peerId);
|
||||
if (!peer) throw 'No connection to peer';
|
||||
if (peer.protocols.length > 0) {
|
||||
return;
|
||||
} else {
|
||||
await delay(frequencyMilliseconds);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async stop() {
|
||||
|
@ -158,18 +107,3 @@ export default class Waku {
|
|||
return multiAddrWithId;
|
||||
}
|
||||
}
|
||||
|
||||
function toPeerId(peer: PeerId | Multiaddr | string): PeerId {
|
||||
if (typeof peer === 'string') {
|
||||
peer = new Multiaddr(peer);
|
||||
}
|
||||
|
||||
if (Multiaddr.isMultiaddr(peer)) {
|
||||
try {
|
||||
peer = PeerId.createFromB58String(peer.getPeerId());
|
||||
} catch (err) {
|
||||
throw `${peer} is not a valid peer type`;
|
||||
}
|
||||
}
|
||||
return peer;
|
||||
}
|
||||
|
|
|
@ -180,9 +180,19 @@ describe('Waku Relay', () => {
|
|||
nimWaku = new NimWaku(this.test?.ctx?.currentTest?.title + '');
|
||||
await nimWaku.start();
|
||||
|
||||
await waku.relay.subscribe();
|
||||
|
||||
await waku.dial(await nimWaku.getMultiaddrWithId());
|
||||
|
||||
await waku.relay.subscribe();
|
||||
// Wait for identify protocol to finish
|
||||
await new Promise((resolve) => {
|
||||
waku.libp2p.peerStore.once('change:protocols', resolve);
|
||||
});
|
||||
|
||||
// Wait for one heartbeat to ensure mesh is updated
|
||||
await new Promise((resolve) => {
|
||||
waku.libp2p.pubsub.once('gossipsub:heartbeat', resolve);
|
||||
});
|
||||
});
|
||||
|
||||
afterEach(async function () {
|
||||
|
|
|
@ -1,12 +1,7 @@
|
|||
import { expect } from 'chai';
|
||||
import TCP from 'libp2p-tcp';
|
||||
|
||||
import {
|
||||
makeLogFileName,
|
||||
NimWaku,
|
||||
NOISE_KEY_1,
|
||||
NOISE_KEY_2,
|
||||
} from '../../test_utils';
|
||||
import { makeLogFileName, NimWaku, NOISE_KEY_1 } from '../../test_utils';
|
||||
import Waku from '../waku';
|
||||
import { WakuMessage } from '../waku_message';
|
||||
|
||||
|
@ -14,25 +9,6 @@ describe('Waku Store', () => {
|
|||
let waku: Waku;
|
||||
let nimWaku: NimWaku;
|
||||
|
||||
beforeEach(async function () {
|
||||
this.timeout(5_000);
|
||||
|
||||
nimWaku = new NimWaku(makeLogFileName(this));
|
||||
await nimWaku.start({ store: true });
|
||||
|
||||
const waku0 = await Waku.create({
|
||||
staticNoiseKey: NOISE_KEY_2,
|
||||
modules: { transport: [TCP] },
|
||||
});
|
||||
await waku0.dial(await nimWaku.getMultiaddrWithId());
|
||||
|
||||
await waku0.relay.subscribe();
|
||||
|
||||
await new Promise((resolve) =>
|
||||
waku0.libp2p.pubsub.once('gossipsub:heartbeat', resolve)
|
||||
);
|
||||
});
|
||||
|
||||
afterEach(async function () {
|
||||
nimWaku ? nimWaku.stop() : null;
|
||||
waku ? await waku.stop() : null;
|
||||
|
@ -41,6 +17,9 @@ describe('Waku Store', () => {
|
|||
it('Retrieves history', async function () {
|
||||
this.timeout(5_000);
|
||||
|
||||
nimWaku = new NimWaku(makeLogFileName(this));
|
||||
await nimWaku.start({ store: true });
|
||||
|
||||
for (let i = 0; i < 2; i++) {
|
||||
await nimWaku.sendMessage(WakuMessage.fromUtf8String(`Message ${i}`));
|
||||
}
|
||||
|
@ -51,6 +30,11 @@ describe('Waku Store', () => {
|
|||
});
|
||||
await waku.dial(await nimWaku.getMultiaddrWithId());
|
||||
|
||||
// Wait for identify protocol to finish
|
||||
await new Promise((resolve) => {
|
||||
waku.libp2p.peerStore.once('change:protocols', resolve);
|
||||
});
|
||||
|
||||
const nimPeerId = await nimWaku.getPeerId();
|
||||
|
||||
const messages = await waku.store.queryHistory(nimPeerId);
|
||||
|
@ -65,6 +49,9 @@ describe('Waku Store', () => {
|
|||
it('Retrieves all historical elements in chronological order through paging', async function () {
|
||||
this.timeout(5_000);
|
||||
|
||||
nimWaku = new NimWaku(makeLogFileName(this));
|
||||
await nimWaku.start({ store: true });
|
||||
|
||||
for (let i = 0; i < 15; i++) {
|
||||
await nimWaku.sendMessage(WakuMessage.fromUtf8String(`Message ${i}`));
|
||||
}
|
||||
|
@ -75,6 +62,11 @@ describe('Waku Store', () => {
|
|||
});
|
||||
await waku.dial(await nimWaku.getMultiaddrWithId());
|
||||
|
||||
// Wait for identify protocol to finish
|
||||
await new Promise((resolve) => {
|
||||
waku.libp2p.peerStore.once('change:protocols', resolve);
|
||||
});
|
||||
|
||||
const nimPeerId = await nimWaku.getPeerId();
|
||||
|
||||
const messages = await waku.store.queryHistory(nimPeerId);
|
||||
|
|
Loading…
Reference in New Issue