Override Gossipsub.join` function to consider waku peers

Instead of gossipsub peers, this allows js to publish messages to nim.
This commit is contained in:
Franck Royer 2021-03-24 12:18:10 +11:00
parent 71f3e9aa1f
commit e89b4ca437
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4
4 changed files with 112 additions and 4 deletions

View File

@ -15,8 +15,10 @@
"esnext", "esnext",
"execa", "execa",
"exponentiate", "exponentiate",
"fanout",
"globby", "globby",
"gossipsub", "gossipsub",
"lastpub",
"libauth", "libauth",
"libp", "libp",
"mkdir", "mkdir",

47
src/lib/get_waku_peers.ts Normal file
View File

@ -0,0 +1,47 @@
import { shuffle } from 'libp2p-gossipsub/src/utils';
import { CODEC, WakuRelayPubsub } from './waku_relay';
/**
* Given a topic, returns up to count peers subscribed to that topic
* that pass an optional filter function
*
* @param {Gossipsub} router
* @param {String} topic
* @param {Number} count
* @param {Function} [filter] a function to filter acceptable peers
* @returns {Set<string>}
*
*/
export function getWakuPeers(
router: WakuRelayPubsub,
topic: string,
count: number,
filter: (id: string) => boolean = () => true
): Set<string> {
const peersInTopic = router.topics.get(topic);
if (!peersInTopic) {
return new Set();
}
// Adds all peers using our protocol
// that also pass the filter function
let peers: string[] = [];
peersInTopic.forEach((id) => {
const peerStreams = router.peers.get(id);
if (!peerStreams) {
return;
}
if (peerStreams.protocol == CODEC && filter(id)) {
peers.push(id);
}
});
// Pseudo-randomly shuffles peers
peers = shuffle(peers);
if (count > 0 && peers.length > count) {
peers = peers.slice(0, count);
}
return new Set(peers);
}

View File

@ -79,6 +79,11 @@ describe('Waku Relay', () => {
nimWaku = new NimWaku(this.test!.ctx!.currentTest!.title); nimWaku = new NimWaku(this.test!.ctx!.currentTest!.title);
await nimWaku.start({ staticnode: multiAddrWithId }); await nimWaku.start({ staticnode: multiAddrWithId });
await waku.relay.subscribe();
await new Promise((resolve) =>
waku.libp2p.pubsub.once('gossipsub:heartbeat', resolve)
);
}); });
afterEach(async function () { afterEach(async function () {
@ -97,10 +102,6 @@ describe('Waku Relay', () => {
this.timeout(5000); this.timeout(5000);
const message = Message.fromUtf8String('This is a message'); const message = Message.fromUtf8String('This is a message');
// TODO: nim-waku does follow the `StrictNoSign` policy hence we need to change
// it for nim-waku to process our messages. Can be removed once
// https://github.com/status-im/nim-waku/issues/422 is fixed
waku.libp2p.pubsub.globalSignaturePolicy = 'StrictSign';
await waku.relay.publish(message); await waku.relay.publish(message);

View File

@ -3,6 +3,7 @@ import { Libp2p } from 'libp2p-gossipsub/src/interfaces';
import Pubsub from 'libp2p-interfaces/src/pubsub'; import Pubsub from 'libp2p-interfaces/src/pubsub';
import { SignaturePolicy } from 'libp2p-interfaces/src/pubsub/signature-policy'; import { SignaturePolicy } from 'libp2p-interfaces/src/pubsub/signature-policy';
import { getWakuPeers } from './get_waku_peers';
import { Message } from './waku_message'; import { Message } from './waku_message';
export const CODEC = '/vac/waku/relay/2.0.0-beta2'; export const CODEC = '/vac/waku/relay/2.0.0-beta2';
@ -30,6 +31,63 @@ export class WakuRelayPubsub extends Gossipsub {
// implementing WakuRelay from scratch. // implementing WakuRelay from scratch.
Object.assign(this, { multicodecs }); Object.assign(this, { multicodecs });
} }
/**
* Join topic
* @param {string} topic
* @returns {void}
* @override
*/
join(topic: string): void {
if (!this.started) {
throw new Error('WakuRelayPubsub has not started');
}
const fanoutPeers = this.fanout.get(topic);
if (fanoutPeers) {
// these peers have a score above the publish threshold, which may be negative
// so drop the ones with a negative score
fanoutPeers.forEach((id) => {
if (this.score.score(id) < 0) {
fanoutPeers.delete(id);
}
});
if (fanoutPeers.size < this._options.D) {
// we need more peers; eager, as this would get fixed in the next heartbeat
getWakuPeers(
this,
topic,
this._options.D - fanoutPeers.size,
(id: string): boolean => {
// filter our current peers, direct peers, and peers with negative scores
return (
!fanoutPeers.has(id) &&
!this.direct.has(id) &&
this.score.score(id) >= 0
);
}
).forEach((id) => fanoutPeers.add(id));
}
this.mesh.set(topic, fanoutPeers);
this.fanout.delete(topic);
this.lastpub.delete(topic);
} else {
const peers = getWakuPeers(
this,
topic,
this._options.D,
(id: string): boolean => {
// filter direct peers and peers with negative score
return !this.direct.has(id) && this.score.score(id) >= 0;
}
);
this.mesh.set(topic, peers);
}
this.mesh.get(topic)!.forEach((id) => {
this.log('JOIN: Add mesh link to %s in %s', id, topic);
this._sendGraft(id, topic);
});
}
} }
// TODO: Implement dial for an address with format '/ip4/127.0.0.1/tcp/60000/p2p/16Uiu2HAkyzsXzENw5XBDYEQQAeQTCYjBJpMLgBmEXuwbtcrgxBJ4' // TODO: Implement dial for an address with format '/ip4/127.0.0.1/tcp/60000/p2p/16Uiu2HAkyzsXzENw5XBDYEQQAeQTCYjBJpMLgBmEXuwbtcrgxBJ4'