From 939b5fb20a638b2099ca47dd5fbcaab911211ece Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Wed, 16 Jun 2021 23:37:13 +1000 Subject: [PATCH] Auto select peer if none provided for store and light push protocols --- CHANGELOG.md | 5 ++++- examples/cli-chat/src/chat.ts | 20 ++------------------ src/lib/select_peer.ts | 18 ++++++++++++++++++ src/lib/waku_light_push/index.spec.ts | 8 ++++---- src/lib/waku_light_push/index.ts | 22 ++++++++++++++++++---- src/lib/waku_store/index.spec.ts | 6 ------ src/lib/waku_store/index.ts | 13 ++++++++++--- 7 files changed, 56 insertions(+), 36 deletions(-) create mode 100644 src/lib/select_peer.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 3011f14017..7c385cdd44 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,9 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] -## Added +### Added - `WakuRelay.deleteObserver` to allow removal of observers, useful when a React component add observers when mounting and needs to delete it when unmounting. +### Changed +- **Breaking**: Auto select peer if none provided for store and light push protocols. + ## [0.7.0] - 2021-06-15 ### Changed diff --git a/examples/cli-chat/src/chat.ts b/examples/cli-chat/src/chat.ts index b399d84134..6961ddca2c 100644 --- a/examples/cli-chat/src/chat.ts +++ b/examples/cli-chat/src/chat.ts @@ -6,7 +6,6 @@ import { Direction, Environment, getStatusFleetNodes, - LightPushCodec, Protocol, StoreCodec, Waku, @@ -14,7 +13,6 @@ import { } from 'js-waku'; import TCP from 'libp2p-tcp'; import { multiaddr, Multiaddr } from 'multiaddr'; -import PeerId from 'peer-id'; const ChatContentTopic = '/toy-chat/2/huilong/proto'; @@ -100,20 +98,6 @@ export default async function startChat(): Promise { } ); - let lightPushNode: PeerId | undefined = undefined; - // Select a node for light pushing (any node). - if (opts.lightPush) { - waku.libp2p.peerStore.on( - 'change:protocols', - async ({ peerId, protocols }) => { - if (!lightPushNode && protocols.includes(LightPushCodec)) { - console.log(`Using ${peerId.toB58String()} to light push messages`); - lightPushNode = peerId; - } - } - ); - } - console.log('Ready to chat!'); rl.prompt(); for await (const line of rl) { @@ -121,8 +105,8 @@ export default async function startChat(): Promise { const chatMessage = ChatMessage.fromUtf8String(new Date(), nick, line); const msg = WakuMessage.fromBytes(chatMessage.encode(), ChatContentTopic); - if (lightPushNode && opts.lightPush) { - await waku.lightPush.push(lightPushNode, msg); + if (opts.lightPush) { + await waku.lightPush.push(msg); } else { await waku.relay.send(msg); } diff --git a/src/lib/select_peer.ts b/src/lib/select_peer.ts new file mode 100644 index 0000000000..8e01850ec6 --- /dev/null +++ b/src/lib/select_peer.ts @@ -0,0 +1,18 @@ +import Libp2p from 'libp2p'; +import { Peer } from 'libp2p/src/peer-store'; + +/** + * Returns a pseudo-random peer that supports the given protocol. + * Useful for protocols such as store and light push + */ +export function selectRandomPeer( + libp2p: Libp2p, + protocol: string +): Peer | undefined { + const allPeers = Array.from(libp2p.peerStore.peers.values()); + const size = allPeers.length; + const peers = allPeers.filter((peer) => peer.protocols.includes(protocol)); + if (peers.length === 0) return; + const index = Math.round(Math.random() * (size - 1)); + return allPeers[index]; +} diff --git a/src/lib/waku_light_push/index.spec.ts b/src/lib/waku_light_push/index.spec.ts index ad9e1e062d..607f1ca04c 100644 --- a/src/lib/waku_light_push/index.spec.ts +++ b/src/lib/waku_light_push/index.spec.ts @@ -32,12 +32,10 @@ describe('Waku Light Push', () => { waku.libp2p.peerStore.once('change:protocols', resolve); }); - const nimPeerId = await nimWaku.getPeerId(); - const messageText = 'Light Push works!'; const message = WakuMessage.fromUtf8String(messageText); - const pushResponse = await waku.lightPush.push(nimPeerId, message); + const pushResponse = await waku.lightPush.push(message); expect(pushResponse?.isSuccess).to.be.true; let msgs: WakuMessage[] = []; @@ -77,7 +75,9 @@ describe('Waku Light Push', () => { const messageText = 'Light Push works!'; const message = WakuMessage.fromUtf8String(messageText); - const pushResponse = await waku.lightPush.push(nimPeerId, message); + const pushResponse = await waku.lightPush.push(message, { + peerId: nimPeerId, + }); expect(pushResponse?.isSuccess).to.be.true; let msgs: WakuMessage[] = []; diff --git a/src/lib/waku_light_push/index.ts b/src/lib/waku_light_push/index.ts index da86796906..88551756de 100644 --- a/src/lib/waku_light_push/index.ts +++ b/src/lib/waku_light_push/index.ts @@ -5,6 +5,7 @@ import Libp2p from 'libp2p'; import PeerId from 'peer-id'; import { PushResponse } from '../../proto/waku/v2/light_push'; +import { selectRandomPeer } from '../select_peer'; import { WakuMessage } from '../waku_message'; import { DefaultPubsubTopic } from '../waku_relay'; @@ -25,6 +26,11 @@ export interface CreateOptions { pubsubTopic?: string; } +export interface PushOptions { + peerId?: PeerId; + pubsubTopic?: string; +} + /** * Implements the [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/). */ @@ -40,12 +46,17 @@ export class WakuLightPush { } async push( - peerId: PeerId, message: WakuMessage, - pubsubTopic: string = this.pubsubTopic + opts?: PushOptions ): Promise { - const peer = this.libp2p.peerStore.get(peerId); - if (!peer) throw 'Peer is unknown'; + let peer; + if (opts?.peerId) { + peer = this.libp2p.peerStore.get(opts.peerId); + if (!peer) throw 'Peer is unknown'; + } else { + peer = selectRandomPeer(this.libp2p, LightPushCodec); + } + if (!peer) throw 'No peer available'; if (!peer.protocols.includes(LightPushCodec)) throw 'Peer does not register waku light push protocol'; @@ -54,6 +65,9 @@ export class WakuLightPush { const { stream } = await connection.newStream(LightPushCodec); try { + const pubsubTopic = opts?.pubsubTopic + ? opts.pubsubTopic + : this.pubsubTopic; const query = PushRPC.createRequest(message, pubsubTopic); const res = await pipe( [query.encode()], diff --git a/src/lib/waku_store/index.spec.ts b/src/lib/waku_store/index.spec.ts index 3caf210352..9e0ee71ea2 100644 --- a/src/lib/waku_store/index.spec.ts +++ b/src/lib/waku_store/index.spec.ts @@ -39,10 +39,7 @@ describe('Waku Store', () => { waku.libp2p.peerStore.once('change:protocols', resolve); }); - const nimPeerId = await nimWaku.getPeerId(); - const messages = await waku.store.queryHistory({ - peerId: nimPeerId, contentTopics: [], }); @@ -76,10 +73,7 @@ describe('Waku Store', () => { waku.libp2p.peerStore.once('change:protocols', resolve); }); - const nimPeerId = await nimWaku.getPeerId(); - const messages = await waku.store.queryHistory({ - peerId: nimPeerId, contentTopics: [DefaultContentTopic], direction: Direction.FORWARD, }); diff --git a/src/lib/waku_store/index.ts b/src/lib/waku_store/index.ts index bb14790f2d..88102ce2e2 100644 --- a/src/lib/waku_store/index.ts +++ b/src/lib/waku_store/index.ts @@ -4,6 +4,7 @@ import pipe from 'it-pipe'; import Libp2p from 'libp2p'; import PeerId from 'peer-id'; +import { selectRandomPeer } from '../select_peer'; import { WakuMessage } from '../waku_message'; import { DefaultPubsubTopic } from '../waku_relay'; @@ -26,7 +27,7 @@ export interface CreateOptions { } export interface QueryOptions { - peerId: PeerId; + peerId?: PeerId; contentTopics: string[]; pubsubTopic?: string; direction?: Direction; @@ -70,8 +71,14 @@ export class WakuStore { options ); - const peer = this.libp2p.peerStore.get(opts.peerId); - if (!peer) throw 'Peer is unknown'; + let peer; + if (opts.peerId) { + peer = this.libp2p.peerStore.get(opts.peerId); + if (!peer) throw 'Peer is unknown'; + } else { + peer = selectRandomPeer(this.libp2p, StoreCodec); + } + if (!peer) throw 'No peer available'; if (!peer.protocols.includes(StoreCodec)) throw 'Peer does not register waku store protocol'; const connection = this.libp2p.connectionManager.get(peer.id);