Auto select peer if none provided for store and light push protocols

This commit is contained in:
Franck Royer 2021-06-16 23:37:13 +10:00
parent 60eb473047
commit 939b5fb20a
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4
7 changed files with 56 additions and 36 deletions

View File

@ -7,9 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
## Added
### Added
- `WakuRelay.deleteObserver` to allow removal of observers, useful when a React component add observers when mounting and needs to delete it when unmounting.
### Changed
- **Breaking**: Auto select peer if none provided for store and light push protocols.
## [0.7.0] - 2021-06-15
### Changed

View File

@ -6,7 +6,6 @@ import {
Direction,
Environment,
getStatusFleetNodes,
LightPushCodec,
Protocol,
StoreCodec,
Waku,
@ -14,7 +13,6 @@ import {
} from 'js-waku';
import TCP from 'libp2p-tcp';
import { multiaddr, Multiaddr } from 'multiaddr';
import PeerId from 'peer-id';
const ChatContentTopic = '/toy-chat/2/huilong/proto';
@ -100,20 +98,6 @@ export default async function startChat(): Promise<void> {
}
);
let lightPushNode: PeerId | undefined = undefined;
// Select a node for light pushing (any node).
if (opts.lightPush) {
waku.libp2p.peerStore.on(
'change:protocols',
async ({ peerId, protocols }) => {
if (!lightPushNode && protocols.includes(LightPushCodec)) {
console.log(`Using ${peerId.toB58String()} to light push messages`);
lightPushNode = peerId;
}
}
);
}
console.log('Ready to chat!');
rl.prompt();
for await (const line of rl) {
@ -121,8 +105,8 @@ export default async function startChat(): Promise<void> {
const chatMessage = ChatMessage.fromUtf8String(new Date(), nick, line);
const msg = WakuMessage.fromBytes(chatMessage.encode(), ChatContentTopic);
if (lightPushNode && opts.lightPush) {
await waku.lightPush.push(lightPushNode, msg);
if (opts.lightPush) {
await waku.lightPush.push(msg);
} else {
await waku.relay.send(msg);
}

18
src/lib/select_peer.ts Normal file
View File

@ -0,0 +1,18 @@
import Libp2p from 'libp2p';
import { Peer } from 'libp2p/src/peer-store';
/**
* Returns a pseudo-random peer that supports the given protocol.
* Useful for protocols such as store and light push
*/
export function selectRandomPeer(
libp2p: Libp2p,
protocol: string
): Peer | undefined {
const allPeers = Array.from(libp2p.peerStore.peers.values());
const size = allPeers.length;
const peers = allPeers.filter((peer) => peer.protocols.includes(protocol));
if (peers.length === 0) return;
const index = Math.round(Math.random() * (size - 1));
return allPeers[index];
}

View File

@ -32,12 +32,10 @@ describe('Waku Light Push', () => {
waku.libp2p.peerStore.once('change:protocols', resolve);
});
const nimPeerId = await nimWaku.getPeerId();
const messageText = 'Light Push works!';
const message = WakuMessage.fromUtf8String(messageText);
const pushResponse = await waku.lightPush.push(nimPeerId, message);
const pushResponse = await waku.lightPush.push(message);
expect(pushResponse?.isSuccess).to.be.true;
let msgs: WakuMessage[] = [];
@ -77,7 +75,9 @@ describe('Waku Light Push', () => {
const messageText = 'Light Push works!';
const message = WakuMessage.fromUtf8String(messageText);
const pushResponse = await waku.lightPush.push(nimPeerId, message);
const pushResponse = await waku.lightPush.push(message, {
peerId: nimPeerId,
});
expect(pushResponse?.isSuccess).to.be.true;
let msgs: WakuMessage[] = [];

View File

@ -5,6 +5,7 @@ import Libp2p from 'libp2p';
import PeerId from 'peer-id';
import { PushResponse } from '../../proto/waku/v2/light_push';
import { selectRandomPeer } from '../select_peer';
import { WakuMessage } from '../waku_message';
import { DefaultPubsubTopic } from '../waku_relay';
@ -25,6 +26,11 @@ export interface CreateOptions {
pubsubTopic?: string;
}
export interface PushOptions {
peerId?: PeerId;
pubsubTopic?: string;
}
/**
* Implements the [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/).
*/
@ -40,12 +46,17 @@ export class WakuLightPush {
}
async push(
peerId: PeerId,
message: WakuMessage,
pubsubTopic: string = this.pubsubTopic
opts?: PushOptions
): Promise<PushResponse | null> {
const peer = this.libp2p.peerStore.get(peerId);
if (!peer) throw 'Peer is unknown';
let peer;
if (opts?.peerId) {
peer = this.libp2p.peerStore.get(opts.peerId);
if (!peer) throw 'Peer is unknown';
} else {
peer = selectRandomPeer(this.libp2p, LightPushCodec);
}
if (!peer) throw 'No peer available';
if (!peer.protocols.includes(LightPushCodec))
throw 'Peer does not register waku light push protocol';
@ -54,6 +65,9 @@ export class WakuLightPush {
const { stream } = await connection.newStream(LightPushCodec);
try {
const pubsubTopic = opts?.pubsubTopic
? opts.pubsubTopic
: this.pubsubTopic;
const query = PushRPC.createRequest(message, pubsubTopic);
const res = await pipe(
[query.encode()],

View File

@ -39,10 +39,7 @@ describe('Waku Store', () => {
waku.libp2p.peerStore.once('change:protocols', resolve);
});
const nimPeerId = await nimWaku.getPeerId();
const messages = await waku.store.queryHistory({
peerId: nimPeerId,
contentTopics: [],
});
@ -76,10 +73,7 @@ describe('Waku Store', () => {
waku.libp2p.peerStore.once('change:protocols', resolve);
});
const nimPeerId = await nimWaku.getPeerId();
const messages = await waku.store.queryHistory({
peerId: nimPeerId,
contentTopics: [DefaultContentTopic],
direction: Direction.FORWARD,
});

View File

@ -4,6 +4,7 @@ import pipe from 'it-pipe';
import Libp2p from 'libp2p';
import PeerId from 'peer-id';
import { selectRandomPeer } from '../select_peer';
import { WakuMessage } from '../waku_message';
import { DefaultPubsubTopic } from '../waku_relay';
@ -26,7 +27,7 @@ export interface CreateOptions {
}
export interface QueryOptions {
peerId: PeerId;
peerId?: PeerId;
contentTopics: string[];
pubsubTopic?: string;
direction?: Direction;
@ -70,8 +71,14 @@ export class WakuStore {
options
);
const peer = this.libp2p.peerStore.get(opts.peerId);
if (!peer) throw 'Peer is unknown';
let peer;
if (opts.peerId) {
peer = this.libp2p.peerStore.get(opts.peerId);
if (!peer) throw 'Peer is unknown';
} else {
peer = selectRandomPeer(this.libp2p, StoreCodec);
}
if (!peer) throw 'No peer available';
if (!peer.protocols.includes(StoreCodec))
throw 'Peer does not register waku store protocol';
const connection = this.libp2p.connectionManager.get(peer.id);