Merge pull request #208 from status-im/auto-selection-of-peers

This commit is contained in:
Franck Royer 2021-06-17 10:40:03 +10:00 committed by GitHub
commit 3ca365c6f8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 109 additions and 40 deletions

View File

@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased] ## [Unreleased]
### Added
- `WakuRelay.deleteObserver` to allow removal of observers, useful when a React component add observers when mounting and needs to delete it when unmounting.
### Changed
- **Breaking**: Auto select peer if none provided for store and light push protocols.
## [0.7.0] - 2021-06-15 ## [0.7.0] - 2021-06-15
### Changed ### Changed

View File

@ -6,7 +6,6 @@ import {
Direction, Direction,
Environment, Environment,
getStatusFleetNodes, getStatusFleetNodes,
LightPushCodec,
Protocol, Protocol,
StoreCodec, StoreCodec,
Waku, Waku,
@ -14,7 +13,6 @@ import {
} from 'js-waku'; } from 'js-waku';
import TCP from 'libp2p-tcp'; import TCP from 'libp2p-tcp';
import { multiaddr, Multiaddr } from 'multiaddr'; import { multiaddr, Multiaddr } from 'multiaddr';
import PeerId from 'peer-id';
const ChatContentTopic = '/toy-chat/2/huilong/proto'; const ChatContentTopic = '/toy-chat/2/huilong/proto';
@ -100,20 +98,6 @@ export default async function startChat(): Promise<void> {
} }
); );
let lightPushNode: PeerId | undefined = undefined;
// Select a node for light pushing (any node).
if (opts.lightPush) {
waku.libp2p.peerStore.on(
'change:protocols',
async ({ peerId, protocols }) => {
if (!lightPushNode && protocols.includes(LightPushCodec)) {
console.log(`Using ${peerId.toB58String()} to light push messages`);
lightPushNode = peerId;
}
}
);
}
console.log('Ready to chat!'); console.log('Ready to chat!');
rl.prompt(); rl.prompt();
for await (const line of rl) { for await (const line of rl) {
@ -121,8 +105,8 @@ export default async function startChat(): Promise<void> {
const chatMessage = ChatMessage.fromUtf8String(new Date(), nick, line); const chatMessage = ChatMessage.fromUtf8String(new Date(), nick, line);
const msg = WakuMessage.fromBytes(chatMessage.encode(), ChatContentTopic); const msg = WakuMessage.fromBytes(chatMessage.encode(), ChatContentTopic);
if (lightPushNode && opts.lightPush) { if (opts.lightPush) {
await waku.lightPush.push(lightPushNode, msg); await waku.lightPush.push(msg);
} else { } else {
await waku.relay.send(msg); await waku.relay.send(msg);
} }

18
src/lib/select_peer.ts Normal file
View File

@ -0,0 +1,18 @@
import Libp2p from 'libp2p';
import { Peer } from 'libp2p/src/peer-store';
/**
* Returns a pseudo-random peer that supports the given protocol.
* Useful for protocols such as store and light push
*/
export function selectRandomPeer(
libp2p: Libp2p,
protocol: string
): Peer | undefined {
const allPeers = Array.from(libp2p.peerStore.peers.values());
const size = allPeers.length;
const peers = allPeers.filter((peer) => peer.protocols.includes(protocol));
if (peers.length === 0) return;
const index = Math.round(Math.random() * (size - 1));
return allPeers[index];
}

View File

@ -32,12 +32,10 @@ describe('Waku Light Push', () => {
waku.libp2p.peerStore.once('change:protocols', resolve); waku.libp2p.peerStore.once('change:protocols', resolve);
}); });
const nimPeerId = await nimWaku.getPeerId();
const messageText = 'Light Push works!'; const messageText = 'Light Push works!';
const message = WakuMessage.fromUtf8String(messageText); const message = WakuMessage.fromUtf8String(messageText);
const pushResponse = await waku.lightPush.push(nimPeerId, message); const pushResponse = await waku.lightPush.push(message);
expect(pushResponse?.isSuccess).to.be.true; expect(pushResponse?.isSuccess).to.be.true;
let msgs: WakuMessage[] = []; let msgs: WakuMessage[] = [];
@ -77,7 +75,9 @@ describe('Waku Light Push', () => {
const messageText = 'Light Push works!'; const messageText = 'Light Push works!';
const message = WakuMessage.fromUtf8String(messageText); const message = WakuMessage.fromUtf8String(messageText);
const pushResponse = await waku.lightPush.push(nimPeerId, message); const pushResponse = await waku.lightPush.push(message, {
peerId: nimPeerId,
});
expect(pushResponse?.isSuccess).to.be.true; expect(pushResponse?.isSuccess).to.be.true;
let msgs: WakuMessage[] = []; let msgs: WakuMessage[] = [];

View File

@ -5,6 +5,7 @@ import Libp2p from 'libp2p';
import PeerId from 'peer-id'; import PeerId from 'peer-id';
import { PushResponse } from '../../proto/waku/v2/light_push'; import { PushResponse } from '../../proto/waku/v2/light_push';
import { selectRandomPeer } from '../select_peer';
import { WakuMessage } from '../waku_message'; import { WakuMessage } from '../waku_message';
import { DefaultPubsubTopic } from '../waku_relay'; import { DefaultPubsubTopic } from '../waku_relay';
@ -25,6 +26,11 @@ export interface CreateOptions {
pubsubTopic?: string; pubsubTopic?: string;
} }
export interface PushOptions {
peerId?: PeerId;
pubsubTopic?: string;
}
/** /**
* Implements the [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/). * Implements the [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/).
*/ */
@ -40,12 +46,17 @@ export class WakuLightPush {
} }
async push( async push(
peerId: PeerId,
message: WakuMessage, message: WakuMessage,
pubsubTopic: string = this.pubsubTopic opts?: PushOptions
): Promise<PushResponse | null> { ): Promise<PushResponse | null> {
const peer = this.libp2p.peerStore.get(peerId); let peer;
if (!peer) throw 'Peer is unknown'; if (opts?.peerId) {
peer = this.libp2p.peerStore.get(opts.peerId);
if (!peer) throw 'Peer is unknown';
} else {
peer = selectRandomPeer(this.libp2p, LightPushCodec);
}
if (!peer) throw 'No peer available';
if (!peer.protocols.includes(LightPushCodec)) if (!peer.protocols.includes(LightPushCodec))
throw 'Peer does not register waku light push protocol'; throw 'Peer does not register waku light push protocol';
@ -54,6 +65,9 @@ export class WakuLightPush {
const { stream } = await connection.newStream(LightPushCodec); const { stream } = await connection.newStream(LightPushCodec);
try { try {
const pubsubTopic = opts?.pubsubTopic
? opts.pubsubTopic
: this.pubsubTopic;
const query = PushRPC.createRequest(message, pubsubTopic); const query = PushRPC.createRequest(message, pubsubTopic);
const res = await pipe( const res = await pipe(
[query.encode()], [query.encode()],

View File

@ -140,6 +140,30 @@ describe('Waku Relay', () => {
expect(allMessages[1].version).to.eq(barMessage.version); expect(allMessages[1].version).to.eq(barMessage.version);
expect(allMessages[1].payloadAsUtf8).to.eq(barMessageText); expect(allMessages[1].payloadAsUtf8).to.eq(barMessageText);
}); });
it('Delete observer', async function () {
this.timeout(10000);
const messageText =
'Published on content topic with added then deleted observer';
const message = WakuMessage.fromUtf8String(
messageText,
'added-then-deleted-observer'
);
// The promise **fails** if we receive a message on this observer.
const receivedMsgPromise: Promise<WakuMessage> = new Promise(
(resolve, reject) => {
waku2.relay.addObserver(reject, ['added-then-deleted-observer']);
waku2.relay.deleteObserver(reject, ['added-then-deleted-observer']);
setTimeout(resolve, 500);
}
);
await waku1.relay.send(message);
await receivedMsgPromise;
// If it does not throw then we are good.
});
}); });
describe('Custom pubsub topic', () => { describe('Custom pubsub topic', () => {

View File

@ -65,7 +65,7 @@ export class WakuRelay extends Gossipsub implements Pubsub {
* Observers under key "" are always called. * Observers under key "" are always called.
*/ */
public observers: { public observers: {
[contentTopic: string]: Array<(message: WakuMessage) => void>; [contentTopic: string]: Set<(message: WakuMessage) => void>;
}; };
constructor( constructor(
@ -131,15 +131,37 @@ export class WakuRelay extends Gossipsub implements Pubsub {
): void { ): void {
if (contentTopics.length === 0) { if (contentTopics.length === 0) {
if (!this.observers['']) { if (!this.observers['']) {
this.observers[''] = []; this.observers[''] = new Set();
} }
this.observers[''].push(callback); this.observers[''].add(callback);
} else { } else {
contentTopics.forEach((contentTopic) => { contentTopics.forEach((contentTopic) => {
if (!this.observers[contentTopic]) { if (!this.observers[contentTopic]) {
this.observers[contentTopic] = []; this.observers[contentTopic] = new Set();
}
this.observers[contentTopic].add(callback);
});
}
}
/**
* Remove an observer of new messages received via waku relay.
* Useful to ensure the same observer is not registered several time
* (e.g when loading React components)
*/
deleteObserver(
callback: (message: WakuMessage) => void,
contentTopics: string[] = []
): void {
if (contentTopics.length === 0) {
if (this.observers['']) {
this.observers[''].delete(callback);
}
} else {
contentTopics.forEach((contentTopic) => {
if (this.observers[contentTopic]) {
this.observers[contentTopic].delete(callback);
} }
this.observers[contentTopic].push(callback);
}); });
} }
} }

View File

@ -39,10 +39,7 @@ describe('Waku Store', () => {
waku.libp2p.peerStore.once('change:protocols', resolve); waku.libp2p.peerStore.once('change:protocols', resolve);
}); });
const nimPeerId = await nimWaku.getPeerId();
const messages = await waku.store.queryHistory({ const messages = await waku.store.queryHistory({
peerId: nimPeerId,
contentTopics: [], contentTopics: [],
}); });
@ -76,10 +73,7 @@ describe('Waku Store', () => {
waku.libp2p.peerStore.once('change:protocols', resolve); waku.libp2p.peerStore.once('change:protocols', resolve);
}); });
const nimPeerId = await nimWaku.getPeerId();
const messages = await waku.store.queryHistory({ const messages = await waku.store.queryHistory({
peerId: nimPeerId,
contentTopics: [DefaultContentTopic], contentTopics: [DefaultContentTopic],
direction: Direction.FORWARD, direction: Direction.FORWARD,
}); });

View File

@ -4,6 +4,7 @@ import pipe from 'it-pipe';
import Libp2p from 'libp2p'; import Libp2p from 'libp2p';
import PeerId from 'peer-id'; import PeerId from 'peer-id';
import { selectRandomPeer } from '../select_peer';
import { WakuMessage } from '../waku_message'; import { WakuMessage } from '../waku_message';
import { DefaultPubsubTopic } from '../waku_relay'; import { DefaultPubsubTopic } from '../waku_relay';
@ -26,7 +27,7 @@ export interface CreateOptions {
} }
export interface QueryOptions { export interface QueryOptions {
peerId: PeerId; peerId?: PeerId;
contentTopics: string[]; contentTopics: string[];
pubsubTopic?: string; pubsubTopic?: string;
direction?: Direction; direction?: Direction;
@ -70,8 +71,14 @@ export class WakuStore {
options options
); );
const peer = this.libp2p.peerStore.get(opts.peerId); let peer;
if (!peer) throw 'Peer is unknown'; if (opts.peerId) {
peer = this.libp2p.peerStore.get(opts.peerId);
if (!peer) throw 'Peer is unknown';
} else {
peer = selectRandomPeer(this.libp2p, StoreCodec);
}
if (!peer) throw 'No peer available';
if (!peer.protocols.includes(StoreCodec)) if (!peer.protocols.includes(StoreCodec))
throw 'Peer does not register waku store protocol'; throw 'Peer does not register waku store protocol';
const connection = this.libp2p.connectionManager.get(peer.id); const connection = this.libp2p.connectionManager.get(peer.id);