Replace `waitForConnectedPeer` with `waitForRemotePeer` (#469)

This commit is contained in:
Franck R 2022-01-30 21:56:21 +11:00 committed by GitHub
parent fcfa9e28db
commit d9248dbc56
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 106 additions and 99 deletions

View File

@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Test: Upgrade nim-waku node to v0.7. - Test: Upgrade nim-waku node to v0.7.
- Doc: Renamed "DappConnect" to "Waku Connect". - Doc: Renamed "DappConnect" to "Waku Connect".
- Docs: API Docs are now available at https://js-waku.wakuconnect.dev/. - Docs: API Docs are now available at https://js-waku.wakuconnect.dev/.
- **Breaking**: Replace `waitForConnectedPeer` with `waitForRemotePeer`; the new method checks that the peer is ready before resolving the promise.
## [0.15.0] - 2022-01-17 ## [0.15.0] - 2022-01-17

View File

@ -28,7 +28,7 @@ function App() {
Waku.create({ bootstrap: { default: true } }).then((waku) => { Waku.create({ bootstrap: { default: true } }).then((waku) => {
setWaku(waku); setWaku(waku);
setWakuStatus('Connecting'); setWakuStatus('Connecting');
waku.waitForConnectedPeer().then(() => { waku.waitForRemotePeer().then(() => {
setWakuStatus('Ready'); setWakuStatus('Ready');
}); });
}); });

View File

@ -34,7 +34,7 @@ function App() {
// We do not handle disconnection/re-connection in this example // We do not handle disconnection/re-connection in this example
if (wakuStatus === 'Connected') return; if (wakuStatus === 'Connected') return;
waku.waitForConnectedPeer().then(() => { waku.waitForRemotePeer().then(() => {
// We are now connected to a store node // We are now connected to a store node
setWakuStatus('Connected'); setWakuStatus('Connected');
}); });

View File

@ -129,7 +129,7 @@ export default function App() {
if (historicalMessagesRetrieved) return; if (historicalMessagesRetrieved) return;
const retrieveMessages = async () => { const retrieveMessages = async () => {
await waku.waitForConnectedPeer(); await waku.waitForRemotePeer();
console.log(`Retrieving archived messages}`); console.log(`Retrieving archived messages}`);
try { try {

View File

@ -10,10 +10,9 @@ import {
} from '../test_utils/'; } from '../test_utils/';
import { delay } from './delay'; import { delay } from './delay';
import { Waku } from './waku'; import { Protocols, Waku } from './waku';
import { WakuMessage } from './waku_message'; import { WakuMessage } from './waku_message';
import { generateSymmetricKey } from './waku_message/version_1'; import { generateSymmetricKey } from './waku_message/version_1';
import { RelayCodecs } from './waku_relay';
const dbg = debug('waku:test'); const dbg = debug('waku:test');
@ -39,7 +38,7 @@ describe('Waku Dial [node only]', function () {
staticNoiseKey: NOISE_KEY_1, staticNoiseKey: NOISE_KEY_1,
}); });
await waku.dial(multiAddrWithId); await waku.dial(multiAddrWithId);
await waku.waitForConnectedPeer([RelayCodecs]); await waku.waitForRemotePeer([Protocols.Relay]);
let nimPeers = await nimWaku.peers(); let nimPeers = await nimWaku.peers();
while (nimPeers.length === 0) { while (nimPeers.length === 0) {
@ -176,16 +175,8 @@ describe('Decryption Keys', () => {
waku1.addPeerToAddressBook(waku2.libp2p.peerId, waku2.libp2p.multiaddrs); waku1.addPeerToAddressBook(waku2.libp2p.peerId, waku2.libp2p.multiaddrs);
await Promise.all([ await Promise.all([
new Promise((resolve) => waku1.waitForRemotePeer([Protocols.Relay]),
waku1.libp2p.pubsub.once('pubsub:subscription-change', () => waku2.waitForRemotePeer([Protocols.Relay]),
resolve(null)
)
),
new Promise((resolve) =>
waku2.libp2p.pubsub.once('pubsub:subscription-change', () =>
resolve(null)
)
),
]); ]);
}); });

View File

@ -13,7 +13,6 @@ import Websockets from 'libp2p-websockets';
// eslint-disable-next-line @typescript-eslint/ban-ts-comment // eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore: No types available // @ts-ignore: No types available
import filters from 'libp2p-websockets/src/filters'; import filters from 'libp2p-websockets/src/filters';
import { Peer } from 'libp2p/dist/src/peer-store';
import Ping from 'libp2p/src/ping'; import Ping from 'libp2p/src/ping';
import { Multiaddr, multiaddr } from 'multiaddr'; import { Multiaddr, multiaddr } from 'multiaddr';
import PeerId from 'peer-id'; import PeerId from 'peer-id';
@ -38,6 +37,12 @@ export const DefaultPubSubTopic = '/waku/2/default-waku/proto';
const dbg = debug('waku:waku'); const dbg = debug('waku:waku');
export enum Protocols {
Relay = 'relay',
Store = 'store',
LightPush = 'lightpush',
}
export interface CreateOptions { export interface CreateOptions {
/** /**
* The PubSub Topic to use. Defaults to {@link DefaultPubSubTopic}. * The PubSub Topic to use. Defaults to {@link DefaultPubSubTopic}.
@ -305,46 +310,91 @@ export class Waku {
} }
/** /**
* Wait to be connected to a peer. Useful when using the [[CreateOptions.bootstrap]] * Wait for a remote peer to be ready given the passed protocols.
* with [[Waku.create]]. The Promise resolves only once we are connected to a * Useful when using the [[CreateOptions.bootstrap]] with [[Waku.create]].
* Store peer, Relay peer and Light Push peer. *
* @default Remote peer must have Waku Store and Waku Relay enabled.
*/ */
async waitForConnectedPeer(protocols?: string[][]): Promise<void> { async waitForRemotePeer(protocols?: Protocols[]): Promise<void> {
const desiredProtocols = protocols ?? [ const desiredProtocols = protocols ?? [Protocols.Relay, Protocols.Store];
[StoreCodec],
[LightPushCodec],
RelayCodecs,
];
await Promise.all( const promises = [];
desiredProtocols.map((desiredProtocolVersions) => {
const peers = new Array<Peer>(); if (desiredProtocols.includes(Protocols.Relay)) {
desiredProtocolVersions.forEach((proto) => { const peers = [];
getPeersForProtocol(this.libp2p, proto).forEach((peer) =>
peers.push(peer) RelayCodecs.forEach((proto) => {
getPeersForProtocol(this.libp2p, proto).forEach((peer) =>
peers.push(peer)
);
});
if (peers.length == 0) {
// No peer available for this protocol, waiting to connect to one.
const promise = new Promise<void>((resolve) => {
this.libp2p.peerStore.on(
'change:protocols',
({ protocols: connectedPeerProtocols }) => {
RelayCodecs.forEach((relayProto) => {
if (connectedPeerProtocols.includes(relayProto)) {
// Relay peer is ready once subscription has happen.
this.libp2p.pubsub.once('pubsub:subscription-change', () => {
dbg('Resolving for', relayProto, connectedPeerProtocols);
resolve();
});
}
});
}
); );
}); });
promises.push(promise);
}
if (peers.length > 0) { if (desiredProtocols.includes(Protocols.Store)) {
return Promise.resolve(); const peers = getPeersForProtocol(this.libp2p, StoreCodec);
} else {
if (peers.length == 0) {
// No peer available for this protocol, waiting to connect to one. // No peer available for this protocol, waiting to connect to one.
return new Promise<void>((resolve) => { const promise = new Promise<void>((resolve) => {
this.libp2p.peerStore.on( this.libp2p.peerStore.on(
'change:protocols', 'change:protocols',
({ protocols: connectedPeerProtocols }) => { ({ protocols: connectedPeerProtocols }) => {
desiredProtocolVersions.forEach((desiredProto) => { if (connectedPeerProtocols.includes(StoreCodec)) {
if (connectedPeerProtocols.includes(desiredProto)) { dbg('Resolving for', StoreCodec, connectedPeerProtocols);
dbg('Resolving for', desiredProto, connectedPeerProtocols); resolve();
resolve(); }
}
});
} }
); );
}); });
promises.push(promise);
} }
}) }
);
if (desiredProtocols.includes(Protocols.LightPush)) {
const peers = getPeersForProtocol(this.libp2p, LightPushCodec);
if (peers.length == 0) {
// No peer available for this protocol, waiting to connect to one.
const promise = new Promise<void>((resolve) => {
this.libp2p.peerStore.on(
'change:protocols',
({ protocols: connectedPeerProtocols }) => {
if (connectedPeerProtocols.includes(LightPushCodec)) {
dbg('Resolving for', LightPushCodec, connectedPeerProtocols);
resolve();
}
}
);
});
promises.push(promise);
}
}
await Promise.all(promises);
}
await Promise.all(promises);
} }
private startKeepAlive( private startKeepAlive(

View File

@ -2,7 +2,7 @@ import { expect } from 'chai';
import { makeLogFileName, NimWaku, NOISE_KEY_1 } from '../../test_utils'; import { makeLogFileName, NimWaku, NOISE_KEY_1 } from '../../test_utils';
import { delay } from '../delay'; import { delay } from '../delay';
import { Waku } from '../waku'; import { Protocols, Waku } from '../waku';
import { WakuMessage } from '../waku_message'; import { WakuMessage } from '../waku_message';
const TestContentTopic = '/test/1/waku-light-push/utf8'; const TestContentTopic = '/test/1/waku-light-push/utf8';
@ -26,7 +26,7 @@ describe('Waku Light Push [node only]', () => {
staticNoiseKey: NOISE_KEY_1, staticNoiseKey: NOISE_KEY_1,
}); });
await waku.dial(await nimWaku.getMultiaddrWithId()); await waku.dial(await nimWaku.getMultiaddrWithId());
await waku.waitForConnectedPeer(); await waku.waitForRemotePeer([Protocols.LightPush]);
const messageText = 'Light Push works!'; const messageText = 'Light Push works!';
const message = await WakuMessage.fromUtf8String( const message = await WakuMessage.fromUtf8String(
@ -62,7 +62,7 @@ describe('Waku Light Push [node only]', () => {
staticNoiseKey: NOISE_KEY_1, staticNoiseKey: NOISE_KEY_1,
}); });
await waku.dial(await nimWaku.getMultiaddrWithId()); await waku.dial(await nimWaku.getMultiaddrWithId());
await waku.waitForConnectedPeer(); await waku.waitForRemotePeer([Protocols.LightPush]);
const nimPeerId = await nimWaku.getPeerId(); const nimPeerId = await nimWaku.getPeerId();

View File

@ -9,8 +9,7 @@ import {
} from '../../test_utils'; } from '../../test_utils';
import { delay } from '../delay'; import { delay } from '../delay';
import { hexToBuf } from '../utils'; import { hexToBuf } from '../utils';
import { Waku } from '../waku'; import { Protocols, Waku } from '../waku';
import { RelayCodecs } from '../waku_relay';
import { import {
generatePrivateKey, generatePrivateKey,
@ -39,7 +38,7 @@ describe('Waku Message [node only]', function () {
await nimWaku.start({ rpcPrivate: true }); await nimWaku.start({ rpcPrivate: true });
await waku.dial(await nimWaku.getMultiaddrWithId()); await waku.dial(await nimWaku.getMultiaddrWithId());
await waku.waitForConnectedPeer([RelayCodecs]); await waku.waitForRemotePeer([Protocols.Relay]);
let peers = await waku.relay.getPeers(); let peers = await waku.relay.getPeers();
while (peers.size === 0) { while (peers.size === 0) {

View File

@ -8,7 +8,7 @@ import {
NOISE_KEY_2, NOISE_KEY_2,
} from '../../test_utils'; } from '../../test_utils';
import { delay } from '../delay'; import { delay } from '../delay';
import { DefaultPubSubTopic, Waku } from '../waku'; import { DefaultPubSubTopic, Protocols, Waku } from '../waku';
import { DecryptionMethod, WakuMessage } from '../waku_message'; import { DecryptionMethod, WakuMessage } from '../waku_message';
import { import {
generatePrivateKey, generatePrivateKey,
@ -16,8 +16,6 @@ import {
getPublicKey, getPublicKey,
} from '../waku_message/version_1'; } from '../waku_message/version_1';
import { RelayCodecs } from './constants';
const log = debug('waku:test'); const log = debug('waku:test');
const TestContentTopic = '/test/1/waku-relay/utf8'; const TestContentTopic = '/test/1/waku-relay/utf8';
@ -50,16 +48,8 @@ describe('Waku Relay [node only]', () => {
log('Wait for mutual pubsub subscription'); log('Wait for mutual pubsub subscription');
await Promise.all([ await Promise.all([
new Promise((resolve) => waku1.waitForRemotePeer([Protocols.Relay]),
waku1.libp2p.pubsub.once('pubsub:subscription-change', () => waku2.waitForRemotePeer([Protocols.Relay]),
resolve(null)
)
),
new Promise((resolve) =>
waku2.libp2p.pubsub.once('pubsub:subscription-change', () =>
resolve(null)
)
),
]); ]);
log('before each hook done'); log('before each hook done');
}); });
@ -279,16 +269,8 @@ describe('Waku Relay [node only]', () => {
waku3.addPeerToAddressBook(waku2.libp2p.peerId, waku2.libp2p.multiaddrs); waku3.addPeerToAddressBook(waku2.libp2p.peerId, waku2.libp2p.multiaddrs);
await Promise.all([ await Promise.all([
new Promise((resolve) => waku1.waitForRemotePeer([Protocols.Relay]),
waku1.libp2p.pubsub.once('pubsub:subscription-change', () => waku2.waitForRemotePeer([Protocols.Relay]),
resolve(null)
)
),
new Promise((resolve) =>
waku2.libp2p.pubsub.once('pubsub:subscription-change', () =>
resolve(null)
)
),
// No subscription change expected for Waku 3 // No subscription change expected for Waku 3
]); ]);
@ -336,12 +318,7 @@ describe('Waku Relay [node only]', () => {
await nimWaku.start(); await nimWaku.start();
await waku.dial(await nimWaku.getMultiaddrWithId()); await waku.dial(await nimWaku.getMultiaddrWithId());
await waku.waitForConnectedPeer([RelayCodecs]); await waku.waitForRemotePeer([Protocols.Relay]);
// Wait for one heartbeat to ensure mesh is updated
await new Promise((resolve) => {
waku.libp2p.pubsub.once('gossipsub:heartbeat', resolve);
});
}); });
afterEach(async function () { afterEach(async function () {
@ -444,17 +421,8 @@ describe('Waku Relay [node only]', () => {
// Wait for identify protocol to finish // Wait for identify protocol to finish
await Promise.all([ await Promise.all([
waku1.waitForConnectedPeer([RelayCodecs]), waku1.waitForRemotePeer([Protocols.Relay]),
waku2.waitForConnectedPeer([RelayCodecs]), waku2.waitForRemotePeer([Protocols.Relay]),
]);
await Promise.all([
new Promise((resolve) =>
waku1.libp2p.pubsub.once('gossipsub:heartbeat', resolve)
),
new Promise((resolve) =>
waku2.libp2p.pubsub.once('gossipsub:heartbeat', resolve)
),
]); ]);
await delay(2000); await delay(2000);

View File

@ -8,7 +8,7 @@ import {
NOISE_KEY_2, NOISE_KEY_2,
} from '../../test_utils'; } from '../../test_utils';
import { delay } from '../delay'; import { delay } from '../delay';
import { Waku } from '../waku'; import { Protocols, Waku } from '../waku';
import { DecryptionMethod, WakuMessage } from '../waku_message'; import { DecryptionMethod, WakuMessage } from '../waku_message';
import { import {
generatePrivateKey, generatePrivateKey,
@ -18,8 +18,6 @@ import {
import { PageDirection } from './history_rpc'; import { PageDirection } from './history_rpc';
import { StoreCodec } from './index';
const dbg = debug('waku:test:store'); const dbg = debug('waku:test:store');
const TestContentTopic = '/test/1/waku-store/utf8'; const TestContentTopic = '/test/1/waku-store/utf8';
@ -51,7 +49,7 @@ describe('Waku Store', () => {
staticNoiseKey: NOISE_KEY_1, staticNoiseKey: NOISE_KEY_1,
}); });
await waku.dial(await nimWaku.getMultiaddrWithId()); await waku.dial(await nimWaku.getMultiaddrWithId());
await waku.waitForConnectedPeer([[StoreCodec]]); await waku.waitForRemotePeer([Protocols.Store]);
const messages = await waku.store.queryHistory([]); const messages = await waku.store.queryHistory([]);
expect(messages?.length).eq(2); expect(messages?.length).eq(2);
@ -81,7 +79,7 @@ describe('Waku Store', () => {
staticNoiseKey: NOISE_KEY_1, staticNoiseKey: NOISE_KEY_1,
}); });
await waku.dial(await nimWaku.getMultiaddrWithId()); await waku.dial(await nimWaku.getMultiaddrWithId());
await waku.waitForConnectedPeer([[StoreCodec]]); await waku.waitForRemotePeer([Protocols.Store]);
let messages: WakuMessage[] = []; let messages: WakuMessage[] = [];
@ -118,7 +116,7 @@ describe('Waku Store', () => {
staticNoiseKey: NOISE_KEY_1, staticNoiseKey: NOISE_KEY_1,
}); });
await waku.dial(await nimWaku.getMultiaddrWithId()); await waku.dial(await nimWaku.getMultiaddrWithId());
await waku.waitForConnectedPeer([[StoreCodec]]); await waku.waitForRemotePeer([Protocols.Store]);
let messages: WakuMessage[] = []; let messages: WakuMessage[] = [];
const desiredMsgs = 14; const desiredMsgs = 14;
@ -152,7 +150,7 @@ describe('Waku Store', () => {
staticNoiseKey: NOISE_KEY_1, staticNoiseKey: NOISE_KEY_1,
}); });
await waku.dial(await nimWaku.getMultiaddrWithId()); await waku.dial(await nimWaku.getMultiaddrWithId());
await waku.waitForConnectedPeer([[StoreCodec]]); await waku.waitForRemotePeer([Protocols.Store]);
const messages = await waku.store.queryHistory([], { const messages = await waku.store.queryHistory([], {
pageDirection: PageDirection.FORWARD, pageDirection: PageDirection.FORWARD,
@ -189,7 +187,7 @@ describe('Waku Store', () => {
staticNoiseKey: NOISE_KEY_1, staticNoiseKey: NOISE_KEY_1,
}); });
await waku.dial(await nimWaku.getMultiaddrWithId()); await waku.dial(await nimWaku.getMultiaddrWithId());
await waku.waitForConnectedPeer([[StoreCodec]]); await waku.waitForRemotePeer([Protocols.Store]);
const nimPeerId = await nimWaku.getPeerId(); const nimPeerId = await nimWaku.getPeerId();
@ -453,7 +451,7 @@ describe('Waku Store', () => {
staticNoiseKey: NOISE_KEY_1, staticNoiseKey: NOISE_KEY_1,
}); });
await waku.dial(await nimWaku.getMultiaddrWithId()); await waku.dial(await nimWaku.getMultiaddrWithId());
await waku.waitForConnectedPeer([[StoreCodec]]); await waku.waitForRemotePeer([Protocols.Store]);
const nimPeerId = await nimWaku.getPeerId(); const nimPeerId = await nimWaku.getPeerId();