diff --git a/CHANGELOG.md b/CHANGELOG.md index 94c321adb3..7c385cdd44 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added +- `WakuRelay.deleteObserver` to allow removal of observers, useful when a React component add observers when mounting and needs to delete it when unmounting. + +### Changed +- **Breaking**: Auto select peer if none provided for store and light push protocols. + ## [0.7.0] - 2021-06-15 ### Changed diff --git a/examples/cli-chat/src/chat.ts b/examples/cli-chat/src/chat.ts index b399d84134..6961ddca2c 100644 --- a/examples/cli-chat/src/chat.ts +++ b/examples/cli-chat/src/chat.ts @@ -6,7 +6,6 @@ import { Direction, Environment, getStatusFleetNodes, - LightPushCodec, Protocol, StoreCodec, Waku, @@ -14,7 +13,6 @@ import { } from 'js-waku'; import TCP from 'libp2p-tcp'; import { multiaddr, Multiaddr } from 'multiaddr'; -import PeerId from 'peer-id'; const ChatContentTopic = '/toy-chat/2/huilong/proto'; @@ -100,20 +98,6 @@ export default async function startChat(): Promise { } ); - let lightPushNode: PeerId | undefined = undefined; - // Select a node for light pushing (any node). - if (opts.lightPush) { - waku.libp2p.peerStore.on( - 'change:protocols', - async ({ peerId, protocols }) => { - if (!lightPushNode && protocols.includes(LightPushCodec)) { - console.log(`Using ${peerId.toB58String()} to light push messages`); - lightPushNode = peerId; - } - } - ); - } - console.log('Ready to chat!'); rl.prompt(); for await (const line of rl) { @@ -121,8 +105,8 @@ export default async function startChat(): Promise { const chatMessage = ChatMessage.fromUtf8String(new Date(), nick, line); const msg = WakuMessage.fromBytes(chatMessage.encode(), ChatContentTopic); - if (lightPushNode && opts.lightPush) { - await waku.lightPush.push(lightPushNode, msg); + if (opts.lightPush) { + await waku.lightPush.push(msg); } else { await waku.relay.send(msg); } diff --git a/src/lib/select_peer.ts b/src/lib/select_peer.ts new file mode 100644 index 0000000000..8e01850ec6 --- /dev/null +++ b/src/lib/select_peer.ts @@ -0,0 +1,18 @@ +import Libp2p from 'libp2p'; +import { Peer } from 'libp2p/src/peer-store'; + +/** + * Returns a pseudo-random peer that supports the given protocol. + * Useful for protocols such as store and light push + */ +export function selectRandomPeer( + libp2p: Libp2p, + protocol: string +): Peer | undefined { + const allPeers = Array.from(libp2p.peerStore.peers.values()); + const size = allPeers.length; + const peers = allPeers.filter((peer) => peer.protocols.includes(protocol)); + if (peers.length === 0) return; + const index = Math.round(Math.random() * (size - 1)); + return allPeers[index]; +} diff --git a/src/lib/waku_light_push/index.spec.ts b/src/lib/waku_light_push/index.spec.ts index ad9e1e062d..607f1ca04c 100644 --- a/src/lib/waku_light_push/index.spec.ts +++ b/src/lib/waku_light_push/index.spec.ts @@ -32,12 +32,10 @@ describe('Waku Light Push', () => { waku.libp2p.peerStore.once('change:protocols', resolve); }); - const nimPeerId = await nimWaku.getPeerId(); - const messageText = 'Light Push works!'; const message = WakuMessage.fromUtf8String(messageText); - const pushResponse = await waku.lightPush.push(nimPeerId, message); + const pushResponse = await waku.lightPush.push(message); expect(pushResponse?.isSuccess).to.be.true; let msgs: WakuMessage[] = []; @@ -77,7 +75,9 @@ describe('Waku Light Push', () => { const messageText = 'Light Push works!'; const message = WakuMessage.fromUtf8String(messageText); - const pushResponse = await waku.lightPush.push(nimPeerId, message); + const pushResponse = await waku.lightPush.push(message, { + peerId: nimPeerId, + }); expect(pushResponse?.isSuccess).to.be.true; let msgs: WakuMessage[] = []; diff --git a/src/lib/waku_light_push/index.ts b/src/lib/waku_light_push/index.ts index da86796906..88551756de 100644 --- a/src/lib/waku_light_push/index.ts +++ b/src/lib/waku_light_push/index.ts @@ -5,6 +5,7 @@ import Libp2p from 'libp2p'; import PeerId from 'peer-id'; import { PushResponse } from '../../proto/waku/v2/light_push'; +import { selectRandomPeer } from '../select_peer'; import { WakuMessage } from '../waku_message'; import { DefaultPubsubTopic } from '../waku_relay'; @@ -25,6 +26,11 @@ export interface CreateOptions { pubsubTopic?: string; } +export interface PushOptions { + peerId?: PeerId; + pubsubTopic?: string; +} + /** * Implements the [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/). */ @@ -40,12 +46,17 @@ export class WakuLightPush { } async push( - peerId: PeerId, message: WakuMessage, - pubsubTopic: string = this.pubsubTopic + opts?: PushOptions ): Promise { - const peer = this.libp2p.peerStore.get(peerId); - if (!peer) throw 'Peer is unknown'; + let peer; + if (opts?.peerId) { + peer = this.libp2p.peerStore.get(opts.peerId); + if (!peer) throw 'Peer is unknown'; + } else { + peer = selectRandomPeer(this.libp2p, LightPushCodec); + } + if (!peer) throw 'No peer available'; if (!peer.protocols.includes(LightPushCodec)) throw 'Peer does not register waku light push protocol'; @@ -54,6 +65,9 @@ export class WakuLightPush { const { stream } = await connection.newStream(LightPushCodec); try { + const pubsubTopic = opts?.pubsubTopic + ? opts.pubsubTopic + : this.pubsubTopic; const query = PushRPC.createRequest(message, pubsubTopic); const res = await pipe( [query.encode()], diff --git a/src/lib/waku_relay/index.spec.ts b/src/lib/waku_relay/index.spec.ts index 6c1c26e88c..cc6b698859 100644 --- a/src/lib/waku_relay/index.spec.ts +++ b/src/lib/waku_relay/index.spec.ts @@ -140,6 +140,30 @@ describe('Waku Relay', () => { expect(allMessages[1].version).to.eq(barMessage.version); expect(allMessages[1].payloadAsUtf8).to.eq(barMessageText); }); + + it('Delete observer', async function () { + this.timeout(10000); + + const messageText = + 'Published on content topic with added then deleted observer'; + const message = WakuMessage.fromUtf8String( + messageText, + 'added-then-deleted-observer' + ); + + // The promise **fails** if we receive a message on this observer. + const receivedMsgPromise: Promise = new Promise( + (resolve, reject) => { + waku2.relay.addObserver(reject, ['added-then-deleted-observer']); + waku2.relay.deleteObserver(reject, ['added-then-deleted-observer']); + setTimeout(resolve, 500); + } + ); + await waku1.relay.send(message); + + await receivedMsgPromise; + // If it does not throw then we are good. + }); }); describe('Custom pubsub topic', () => { diff --git a/src/lib/waku_relay/index.ts b/src/lib/waku_relay/index.ts index c846b7f85e..f466ec6e72 100644 --- a/src/lib/waku_relay/index.ts +++ b/src/lib/waku_relay/index.ts @@ -65,7 +65,7 @@ export class WakuRelay extends Gossipsub implements Pubsub { * Observers under key "" are always called. */ public observers: { - [contentTopic: string]: Array<(message: WakuMessage) => void>; + [contentTopic: string]: Set<(message: WakuMessage) => void>; }; constructor( @@ -131,15 +131,37 @@ export class WakuRelay extends Gossipsub implements Pubsub { ): void { if (contentTopics.length === 0) { if (!this.observers['']) { - this.observers[''] = []; + this.observers[''] = new Set(); } - this.observers[''].push(callback); + this.observers[''].add(callback); } else { contentTopics.forEach((contentTopic) => { if (!this.observers[contentTopic]) { - this.observers[contentTopic] = []; + this.observers[contentTopic] = new Set(); + } + this.observers[contentTopic].add(callback); + }); + } + } + + /** + * Remove an observer of new messages received via waku relay. + * Useful to ensure the same observer is not registered several time + * (e.g when loading React components) + */ + deleteObserver( + callback: (message: WakuMessage) => void, + contentTopics: string[] = [] + ): void { + if (contentTopics.length === 0) { + if (this.observers['']) { + this.observers[''].delete(callback); + } + } else { + contentTopics.forEach((contentTopic) => { + if (this.observers[contentTopic]) { + this.observers[contentTopic].delete(callback); } - this.observers[contentTopic].push(callback); }); } } diff --git a/src/lib/waku_store/index.spec.ts b/src/lib/waku_store/index.spec.ts index 3caf210352..9e0ee71ea2 100644 --- a/src/lib/waku_store/index.spec.ts +++ b/src/lib/waku_store/index.spec.ts @@ -39,10 +39,7 @@ describe('Waku Store', () => { waku.libp2p.peerStore.once('change:protocols', resolve); }); - const nimPeerId = await nimWaku.getPeerId(); - const messages = await waku.store.queryHistory({ - peerId: nimPeerId, contentTopics: [], }); @@ -76,10 +73,7 @@ describe('Waku Store', () => { waku.libp2p.peerStore.once('change:protocols', resolve); }); - const nimPeerId = await nimWaku.getPeerId(); - const messages = await waku.store.queryHistory({ - peerId: nimPeerId, contentTopics: [DefaultContentTopic], direction: Direction.FORWARD, }); diff --git a/src/lib/waku_store/index.ts b/src/lib/waku_store/index.ts index bb14790f2d..88102ce2e2 100644 --- a/src/lib/waku_store/index.ts +++ b/src/lib/waku_store/index.ts @@ -4,6 +4,7 @@ import pipe from 'it-pipe'; import Libp2p from 'libp2p'; import PeerId from 'peer-id'; +import { selectRandomPeer } from '../select_peer'; import { WakuMessage } from '../waku_message'; import { DefaultPubsubTopic } from '../waku_relay'; @@ -26,7 +27,7 @@ export interface CreateOptions { } export interface QueryOptions { - peerId: PeerId; + peerId?: PeerId; contentTopics: string[]; pubsubTopic?: string; direction?: Direction; @@ -70,8 +71,14 @@ export class WakuStore { options ); - const peer = this.libp2p.peerStore.get(opts.peerId); - if (!peer) throw 'Peer is unknown'; + let peer; + if (opts.peerId) { + peer = this.libp2p.peerStore.get(opts.peerId); + if (!peer) throw 'Peer is unknown'; + } else { + peer = selectRandomPeer(this.libp2p, StoreCodec); + } + if (!peer) throw 'No peer available'; if (!peer.protocols.includes(StoreCodec)) throw 'Peer does not register waku store protocol'; const connection = this.libp2p.connectionManager.get(peer.id);