Bump libp2p from 0.32.4 to 0.36.2, libp2p-gossipsub from 0.12.1 to 0.13.0 (#470)

Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
This commit is contained in:
Franck R 2022-02-02 15:12:08 +11:00 committed by GitHub
parent f2cf00bf44
commit 69f0005445
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 620 additions and 857 deletions

View File

@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased] ## [Unreleased]
### Changed
- **Breaking**: Upgrade `libp2p` to `0.36.2` & `libp2p-gossipsub` to `0.13.0`. Some APIs are now async.
## [0.16.0] - 2022-01-31 ## [0.16.0] - 2022-01-31
### Changed ### Changed

View File

@ -143,10 +143,16 @@ function App() {
useEffect(() => { useEffect(() => {
if (!waku) return; if (!waku) return;
const interval = setInterval(() => { const interval = setInterval(async () => {
let lightPushPeers = 0;
// eslint-disable-next-line @typescript-eslint/no-unused-vars
for await (const _peer of waku.store.peers) {
lightPushPeers++;
}
setPeerStats({ setPeerStats({
relayPeers: waku.relay.getPeers().size, relayPeers: waku.relay.getPeers().size,
lightPushPeers: waku.lightPush.peers.length, lightPushPeers,
}); });
}, 1000); }, 1000);
return () => clearInterval(interval); return () => clearInterval(interval);

View File

@ -162,10 +162,16 @@ function App() {
useEffect(() => { useEffect(() => {
if (!waku) return; if (!waku) return;
const interval = setInterval(() => { const interval = setInterval(async () => {
let lightPushPeers = 0;
// eslint-disable-next-line @typescript-eslint/no-unused-vars
for await (const _peer of waku.store.peers) {
lightPushPeers++;
}
setPeerStats({ setPeerStats({
relayPeers: waku.relay.getPeers().size, relayPeers: waku.relay.getPeers().size,
lightPushPeers: waku.lightPush.peers.length, lightPushPeers,
}); });
}, 1000); }, 1000);
return () => clearInterval(interval); return () => clearInterval(interval);

View File

@ -130,7 +130,7 @@ export default function App() {
const retrieveMessages = async () => { const retrieveMessages = async () => {
await waku.waitForRemotePeer(); await waku.waitForRemotePeer();
console.log(`Retrieving archived messages}`); console.log(`Retrieving archived messages`);
try { try {
retrieveStoreMessages(waku, dispatchMessages).then((length) => { retrieveStoreMessages(waku, dispatchMessages).then((length) => {
@ -156,11 +156,14 @@ export default function App() {
nick={nick} nick={nick}
messages={messages} messages={messages}
commandHandler={(input: string) => { commandHandler={(input: string) => {
const { command, response } = handleCommand(input, waku, setNick); handleCommand(input, waku, setNick).then(
const commandMessages = response.map((msg) => { ({ command, response }) => {
return Message.fromUtf8String(command, msg); const commandMessages = response.map((msg) => {
}); return Message.fromUtf8String(command, msg);
dispatchMessages(commandMessages); });
dispatchMessages(commandMessages);
}
);
}} }}
/> />
</ThemeProvider> </ThemeProvider>

View File

@ -6,6 +6,8 @@ import { useWaku } from './WakuContext';
import { TitleBar } from '@livechat/ui-kit'; import { TitleBar } from '@livechat/ui-kit';
import { Message } from './Message'; import { Message } from './Message';
import { ChatMessage } from './chat_message'; import { ChatMessage } from './chat_message';
import { useEffect, useState } from 'react';
import PeerId from 'peer-id';
interface Props { interface Props {
messages: Message[]; messages: Message[];
@ -16,12 +18,41 @@ interface Props {
export default function Room(props: Props) { export default function Room(props: Props) {
const { waku } = useWaku(); const { waku } = useWaku();
let relayPeers = 0; const [peers, setPeers] = useState<PeerId[]>([]);
let storePeers = 0; const [storePeers, setStorePeers] = useState(0);
if (waku) { const [relayPeers, setRelayPeers] = useState(0);
relayPeers = waku.relay.getPeers().size;
storePeers = waku.store.peers.length; useEffect(() => {
} // Add a peer to the list every time a connection happen to ensure the stats update correctly
if (!waku) return;
const addPeer = (event: { peerId: PeerId }) => {
setPeers((peers) => {
return [...peers, event.peerId];
});
};
waku.libp2p.peerStore.on('change:protocols', addPeer);
return () => {
waku.libp2p.connectionManager.removeListener('change:protocols', addPeer);
};
}, [waku]);
useEffect(() => {
if (!waku) return;
setRelayPeers(waku.relay.getPeers().size);
(async () => {
let counter = 0;
// eslint-disable-next-line @typescript-eslint/no-unused-vars
for await (const _peer of waku.store.peers) {
counter++;
}
setStorePeers(counter);
})();
}, [waku, peers]);
return ( return (
<div <div

View File

@ -53,13 +53,18 @@ function connect(peer: string | undefined, waku: Waku | undefined): string[] {
} }
} }
function peers(waku: Waku | undefined): string[] { async function peers(waku: Waku | undefined): Promise<string[]> {
if (!waku) { if (!waku) {
return ['Waku node is starting']; return ['Waku node is starting'];
} }
let response: string[] = []; let response: string[] = [];
waku.libp2p.peerStore.peers.forEach((peer, peerId) => { const peers = [];
response.push(peerId + ':');
for await (const peer of waku.libp2p.peerStore.getPeers()) {
peers.push(peer);
}
Array.from(peers).forEach((peer) => {
response.push(peer.id.toB58String() + ':');
let addresses = ' addresses: ['; let addresses = ' addresses: [';
peer.addresses.forEach(({ multiaddr }) => { peer.addresses.forEach(({ multiaddr }) => {
addresses += ' ' + multiaddr.toString() + ','; addresses += ' ' + multiaddr.toString() + ',';
@ -104,11 +109,11 @@ function connections(waku: Waku | undefined): string[] {
return response; return response;
} }
export default function handleCommand( export default async function handleCommand(
input: string, input: string,
waku: Waku | undefined, waku: Waku | undefined,
setNick: (nick: string) => void setNick: (nick: string) => void
): { command: string; response: string[] } { ): Promise<{ command: string; response: string[] }> {
let response: string[] = []; let response: string[] = [];
const args = parseInput(input); const args = parseInput(input);
const command = args.shift()!; const command = args.shift()!;
@ -126,7 +131,7 @@ export default function handleCommand(
connect(args.shift(), waku).map((str) => response.push(str)); connect(args.shift(), waku).map((str) => response.push(str));
break; break;
case '/peers': case '/peers':
peers(waku).map((str) => response.push(str)); (await peers(waku)).map((str) => response.push(str));
break; break;
case '/connections': case '/connections':
connections(waku).map((str) => response.push(str)); connections(waku).map((str) => response.push(str));

1179
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@ -67,9 +67,9 @@
"it-concat": "^2.0.0", "it-concat": "^2.0.0",
"it-length-prefixed": "^5.0.2", "it-length-prefixed": "^5.0.2",
"js-sha3": "^0.8.0", "js-sha3": "^0.8.0",
"libp2p": "^0.32.4", "libp2p": "^0.36.2",
"libp2p-bootstrap": "^0.14.0", "libp2p-bootstrap": "^0.14.0",
"libp2p-gossipsub": "^0.12.1", "libp2p-gossipsub": "^0.13.0",
"libp2p-mplex": "^0.10.4", "libp2p-mplex": "^0.10.4",
"libp2p-websockets": "^0.16.1", "libp2p-websockets": "^0.16.1",
"multiaddr": "^10.0.1", "multiaddr": "^10.0.1",

View File

@ -5,7 +5,14 @@ import { Peer } from 'libp2p/src/peer-store';
* Returns a pseudo-random peer that supports the given protocol. * Returns a pseudo-random peer that supports the given protocol.
* Useful for protocols such as store and light push * Useful for protocols such as store and light push
*/ */
export function selectRandomPeer(peers: Peer[]): Peer | undefined { export async function selectRandomPeer(
peersIter: AsyncIterable<Peer>
): Promise<Peer | undefined> {
const peers = [];
for await (const peer of peersIter) {
peers.push(peer);
}
if (peers.length === 0) return; if (peers.length === 0) return;
const index = Math.round(Math.random() * (peers.length - 1)); const index = Math.round(Math.random() * (peers.length - 1));
@ -15,8 +22,14 @@ export function selectRandomPeer(peers: Peer[]): Peer | undefined {
/** /**
* Returns the list of peers that supports the given protocol. * Returns the list of peers that supports the given protocol.
*/ */
export function getPeersForProtocol(libp2p: Libp2p, protocol: string): Peer[] { export async function* getPeersForProtocol(
return Array.from(libp2p.peerStore.peers.values()).filter((peer) => libp2p: Libp2p,
peer.protocols.includes(protocol) protocol: string
); ): AsyncIterable<Peer> {
for await (const peer of libp2p.peerStore.getPeers()) {
if (!peer.protocols.includes(protocol)) {
continue;
}
yield peer;
}
} }

View File

@ -40,9 +40,7 @@ describe('Waku Dial [node only]', function () {
await waku.waitForRemotePeer([Protocols.Relay]); await waku.waitForRemotePeer([Protocols.Relay]);
const nimPeerId = await nimWaku.getPeerId(); const nimPeerId = await nimWaku.getPeerId();
const jsPeers = waku.libp2p.peerStore.peers; expect(await waku.libp2p.peerStore.has(nimPeerId)).to.be.true;
expect(jsPeers.has(nimPeerId.toB58String())).to.be.true;
}); });
}); });
@ -245,7 +243,12 @@ describe('Wait for remote peer / get peers', function () {
}); });
await waku.dial(multiAddrWithId); await waku.dial(multiAddrWithId);
await waku.waitForRemotePeer([Protocols.Store]); await waku.waitForRemotePeer([Protocols.Store]);
const peers = waku.store.peers.map((peer) => peer.id.toB58String());
const peers = [];
for await (const peer of waku.store.peers) {
peers.push(peer.id.toB58String());
}
const nimPeerId = multiAddrWithId.getPeerId(); const nimPeerId = multiAddrWithId.getPeerId();
expect(nimPeerId).to.not.be.undefined; expect(nimPeerId).to.not.be.undefined;
@ -263,7 +266,12 @@ describe('Wait for remote peer / get peers', function () {
}); });
await waku.dial(multiAddrWithId); await waku.dial(multiAddrWithId);
await waku.waitForRemotePeer([Protocols.LightPush]); await waku.waitForRemotePeer([Protocols.LightPush]);
const peers = waku.lightPush.peers.map((peer) => peer.id.toB58String());
const peers = [];
for await (const peer of waku.lightPush.peers) {
peers.push(peer.id.toB58String());
}
const nimPeerId = multiAddrWithId.getPeerId(); const nimPeerId = multiAddrWithId.getPeerId();
expect(nimPeerId).to.not.be.undefined; expect(nimPeerId).to.not.be.undefined;

View File

@ -13,7 +13,7 @@ import Websockets from 'libp2p-websockets';
// eslint-disable-next-line @typescript-eslint/ban-ts-comment // eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore: No types available // @ts-ignore: No types available
import filters from 'libp2p-websockets/src/filters'; import filters from 'libp2p-websockets/src/filters';
import Ping from 'libp2p/src/ping'; import PingService from 'libp2p/src/ping';
import { Multiaddr, multiaddr } from 'multiaddr'; import { Multiaddr, multiaddr } from 'multiaddr';
import PeerId from 'peer-id'; import PeerId from 'peer-id';
@ -336,9 +336,14 @@ export class Waku {
} }
if (desiredProtocols.includes(Protocols.Store)) { if (desiredProtocols.includes(Protocols.Store)) {
const peers = this.store.peers; let storePeerFound = false;
if (peers.length == 0) { for await (const _peer of this.store.peers) {
storePeerFound = true;
break;
}
if (!storePeerFound) {
// No peer available for this protocol, waiting to connect to one. // No peer available for this protocol, waiting to connect to one.
const promise = new Promise<void>((resolve) => { const promise = new Promise<void>((resolve) => {
this.libp2p.peerStore.on( this.libp2p.peerStore.on(
@ -356,9 +361,14 @@ export class Waku {
} }
if (desiredProtocols.includes(Protocols.LightPush)) { if (desiredProtocols.includes(Protocols.LightPush)) {
const peers = this.lightPush.peers; let lightPushPeerFound = false;
if (peers.length == 0) { for await (const _peer of this.lightPush.peers) {
lightPushPeerFound = true;
break;
}
if (!lightPushPeerFound) {
// No peer available for this protocol, waiting to connect to one. // No peer available for this protocol, waiting to connect to one.
const promise = new Promise<void>((resolve) => { const promise = new Promise<void>((resolve) => {
this.libp2p.peerStore.on( this.libp2p.peerStore.on(
@ -390,8 +400,11 @@ export class Waku {
const peerIdStr = peerId.toB58String(); const peerIdStr = peerId.toB58String();
if (pingPeriodSecs !== 0) { if (pingPeriodSecs !== 0) {
const pingService = new PingService(this.libp2p);
this.pingKeepAliveTimers[peerIdStr] = setInterval(() => { this.pingKeepAliveTimers[peerIdStr] = setInterval(() => {
Ping(this.libp2p, peerId); pingService.ping(peerId).catch((e) => {
dbg(`Ping failed (${peerIdStr})`, e);
});
}, pingPeriodSecs * 1000); }, pingPeriodSecs * 1000);
} }

View File

@ -52,10 +52,10 @@ export class WakuLightPush {
): Promise<PushResponse | null> { ): Promise<PushResponse | null> {
let peer; let peer;
if (opts?.peerId) { if (opts?.peerId) {
peer = this.libp2p.peerStore.get(opts.peerId); peer = await this.libp2p.peerStore.get(opts.peerId);
if (!peer) throw 'Peer is unknown'; if (!peer) throw 'Peer is unknown';
} else { } else {
peer = this.randomPeer; peer = await this.randomPeer;
} }
if (!peer) throw 'No peer available'; if (!peer) throw 'No peer available';
if (!peer.protocols.includes(LightPushCodec)) if (!peer.protocols.includes(LightPushCodec))
@ -99,7 +99,7 @@ export class WakuLightPush {
* Returns known peers from the address book (`libp2p.peerStore`) that support * Returns known peers from the address book (`libp2p.peerStore`) that support
* light push protocol. Waku may or may not be currently connected to these peers. * light push protocol. Waku may or may not be currently connected to these peers.
*/ */
get peers(): Peer[] { get peers(): AsyncIterable<Peer> {
return getPeersForProtocol(this.libp2p, LightPushCodec); return getPeersForProtocol(this.libp2p, LightPushCodec);
} }
@ -108,7 +108,7 @@ export class WakuLightPush {
* book (`libp2p.peerStore`). Waku may or may not be currently connected to * book (`libp2p.peerStore`). Waku may or may not be currently connected to
* this peer. * this peer.
*/ */
get randomPeer(): Peer | undefined { get randomPeer(): Promise<Peer | undefined> {
return selectRandomPeer(this.peers); return selectRandomPeer(this.peers);
} }
} }

View File

@ -428,12 +428,9 @@ describe('Waku Relay [node only]', () => {
await delay(2000); await delay(2000);
// Check that the two JS peers are NOT directly connected // Check that the two JS peers are NOT directly connected
expect( expect(await waku1.libp2p.peerStore.has(waku2.libp2p.peerId)).to.be
waku1.libp2p.peerStore.peers.has(waku2.libp2p.peerId.toB58String()) .false;
).to.be.false; expect(waku2.libp2p.peerStore.has(waku1.libp2p.peerId)).to.be.false;
expect(
waku2.libp2p.peerStore.peers.has(waku1.libp2p.peerId.toB58String())
).to.be.false;
const msgStr = 'Hello there!'; const msgStr = 'Hello there!';
const message = await WakuMessage.fromUtf8String( const message = await WakuMessage.fromUtf8String(

View File

@ -8,11 +8,7 @@ import {
PeerScoreParams, PeerScoreParams,
PeerScoreThresholds, PeerScoreThresholds,
} from 'libp2p-gossipsub/src/score'; } from 'libp2p-gossipsub/src/score';
import { import { createGossipRpc, shuffle } from 'libp2p-gossipsub/src/utils';
createGossipRpc,
messageIdToString,
shuffle,
} from 'libp2p-gossipsub/src/utils';
import { InMessage } from 'libp2p-interfaces/src/pubsub'; import { InMessage } from 'libp2p-interfaces/src/pubsub';
import { SignaturePolicy } from 'libp2p-interfaces/src/pubsub/signature-policy'; import { SignaturePolicy } from 'libp2p-interfaces/src/pubsub/signature-policy';
import PeerId from 'peer-id'; import PeerId from 'peer-id';
@ -112,8 +108,8 @@ export class WakuRelay extends Gossipsub {
* @override * @override
* @returns {void} * @returns {void}
*/ */
public start(): void { public async start(): Promise<void> {
super.start(); await super.start();
this.subscribe(this.pubSubTopic); this.subscribe(this.pubSubTopic);
} }
@ -331,17 +327,16 @@ export class WakuRelay extends Gossipsub {
* @returns {void} * @returns {void}
*/ */
async _publish(msg: InMessage): Promise<void> { async _publish(msg: InMessage): Promise<void> {
const msgIdStr = await this.getCanonicalMsgIdStr(msg);
if (msg.receivedFrom !== this.peerId.toB58String()) { if (msg.receivedFrom !== this.peerId.toB58String()) {
this.score.deliverMessage(msg); this.score.deliverMessage(msg, msgIdStr);
this.gossipTracer.deliverMessage(msg); this.gossipTracer.deliverMessage(msgIdStr);
} }
const msgID = await this.getMsgId(msg);
const msgIdStr = messageIdToString(msgID);
// put in seen cache // put in seen cache
this.seenCache.put(msgIdStr); this.seenCache.put(msgIdStr);
this.messageCache.put(msg); this.messageCache.put(msg, msgIdStr);
const toSend = new Set<string>(); const toSend = new Set<string>();
msg.topicIDs.forEach((topic) => { msg.topicIDs.forEach((topic) => {
@ -489,36 +484,47 @@ export class WakuRelay extends Gossipsub {
* @param {string} id * @param {string} id
* @param {string} topic * @param {string} topic
* @param {boolean} doPX * @param {boolean} doPX
* @returns {RPC.IControlPrune} * @returns {Promise<RPC.IControlPrune>}
*/ */
_makePrune(id: string, topic: string, doPX: boolean): RPC.IControlPrune { async _makePrune(
id: string,
topic: string,
doPX: boolean
): Promise<RPC.IControlPrune> {
// backoff is measured in seconds // backoff is measured in seconds
// RelayPruneBackoff is measured in milliseconds // RelayPruneBackoff is measured in milliseconds
const backoff = constants.RelayPruneBackoff / 1000; const backoff = constants.RelayPruneBackoff / 1000;
const px: RPC.IPeerInfo[] = []; if (!doPX) {
if (doPX) { return {
// select peers for Peer eXchange topicID: topic,
const peers = getRelayPeers( peers: [],
this, backoff: backoff,
topic, };
constants.RelayPrunePeers, }
(xid: string): boolean => {
return xid !== id && this.score.score(xid) >= 0; // select peers for Peer eXchange
} const peers = getRelayPeers(
); this,
peers.forEach((p) => { topic,
constants.RelayPrunePeers,
(xid: string): boolean => {
return xid !== id && this.score.score(xid) >= 0;
}
);
const px = await Promise.all(
Array.from(peers).map(async (p) => {
// see if we have a signed record to send back; if we don't, just send // see if we have a signed record to send back; if we don't, just send
// the peer ID and let the pruned peer find them in the DHT -- we can't trust // the peer ID and let the pruned peer find them in the DHT -- we can't trust
// unsigned address records through PX anyways // unsigned address records through PX anyways
// Finding signed records in the DHT is not supported at the time of writing in js-libp2p // Finding signed records in the DHT is not supported at the time of writing in js-libp2p
const peerId = PeerId.createFromB58String(p); const peerId = PeerId.createFromB58String(p);
px.push({ return {
peerID: peerId.toBytes(), peerID: peerId.toBytes(),
signedPeerRecord: signedPeerRecord:
this._libp2p.peerStore.addressBook.getRawEnvelope(peerId), await this._libp2p.peerStore.addressBook.getRawEnvelope(peerId),
}); };
}); })
} );
return { return {
topicID: topic, topicID: topic,
peers: px, peers: px,

View File

@ -268,10 +268,13 @@ describe('Waku Store', () => {
dbg('Waku nodes connected to nim Waku'); dbg('Waku nodes connected to nim Waku');
let lightPushPeers = waku1.lightPush.peers; let lightPushPeerFound = false;
while (lightPushPeers.length == 0) { while (!lightPushPeerFound) {
await delay(100); await delay(100);
lightPushPeers = waku1.lightPush.peers; for await (const _peer of waku1.lightPush.peers) {
lightPushPeerFound = true;
break;
}
} }
dbg('Sending messages using light push'); dbg('Sending messages using light push');
@ -282,10 +285,13 @@ describe('Waku Store', () => {
waku1.lightPush.push(clearMessage), waku1.lightPush.push(clearMessage),
]); ]);
let storePeers = waku2.store.peers; let storePeerFound = false;
while (storePeers.length == 0) { while (!storePeerFound) {
await delay(100); await delay(100);
storePeers = waku2.store.peers; for await (const _peer of waku2.store.peers) {
storePeerFound = true;
break;
}
} }
waku2.store.addDecryptionKey(symKey); waku2.store.addDecryptionKey(symKey);
@ -380,10 +386,13 @@ describe('Waku Store', () => {
dbg('Waku nodes connected to nim Waku'); dbg('Waku nodes connected to nim Waku');
let lightPushPeers = waku1.lightPush.peers; let lightPushPeerFound = false;
while (lightPushPeers.length == 0) { while (!lightPushPeerFound) {
await delay(100); await delay(100);
lightPushPeers = waku1.lightPush.peers; for await (const _peer of waku1.lightPush.peers) {
lightPushPeerFound = true;
break;
}
} }
dbg('Sending messages using light push'); dbg('Sending messages using light push');
@ -394,10 +403,13 @@ describe('Waku Store', () => {
waku1.lightPush.push(clearMessage), waku1.lightPush.push(clearMessage),
]); ]);
let storePeers = waku2.store.peers; let storePeerFound = false;
while (storePeers.length == 0) { while (!storePeerFound) {
await delay(100); await delay(100);
storePeers = waku2.store.peers; for await (const _peer of waku2.store.peers) {
storePeerFound = true;
break;
}
} }
waku2.addDecryptionKey(symKey, { waku2.addDecryptionKey(symKey, {

View File

@ -150,11 +150,11 @@ export class WakuStore {
let peer; let peer;
if (opts.peerId) { if (opts.peerId) {
peer = this.libp2p.peerStore.get(opts.peerId); peer = await this.libp2p.peerStore.get(opts.peerId);
if (!peer) if (!peer)
throw `Failed to retrieve connection details for provided peer in peer store: ${opts.peerId.toB58String()}`; throw `Failed to retrieve connection details for provided peer in peer store: ${opts.peerId.toB58String()}`;
} else { } else {
peer = this.randomPeer; peer = await this.randomPeer;
if (!peer) if (!peer)
throw 'Failed to find known peer that registers waku store protocol'; throw 'Failed to find known peer that registers waku store protocol';
} }
@ -290,7 +290,7 @@ export class WakuStore {
* Returns known peers from the address book (`libp2p.peerStore`) that support * Returns known peers from the address book (`libp2p.peerStore`) that support
* store protocol. Waku may or may not be currently connected to these peers. * store protocol. Waku may or may not be currently connected to these peers.
*/ */
get peers(): Peer[] { get peers(): AsyncIterable<Peer> {
return getPeersForProtocol(this.libp2p, StoreCodec); return getPeersForProtocol(this.libp2p, StoreCodec);
} }
@ -299,7 +299,7 @@ export class WakuStore {
* book (`libp2p.peerStore`). Waku may or may not be currently connected to * book (`libp2p.peerStore`). Waku may or may not be currently connected to
* this peer. * this peer.
*/ */
get randomPeer(): Peer | undefined { get randomPeer(): Promise<Peer | undefined> {
return selectRandomPeer(this.peers); return selectRandomPeer(this.peers);
} }
} }