diff --git a/.cspell.json b/.cspell.json index 2755a7b052..8ed6e3062c 100644 --- a/.cspell.json +++ b/.cspell.json @@ -15,8 +15,10 @@ "esnext", "execa", "exponentiate", + "fanout", "globby", "gossipsub", + "lastpub", "libauth", "libp", "mkdir", diff --git a/src/lib/get_waku_peers.ts b/src/lib/get_waku_peers.ts new file mode 100644 index 0000000000..43ebbadc70 --- /dev/null +++ b/src/lib/get_waku_peers.ts @@ -0,0 +1,47 @@ +import { shuffle } from 'libp2p-gossipsub/src/utils'; + +import { CODEC, WakuRelayPubsub } from './waku_relay'; + +/** + * Given a topic, returns up to count peers subscribed to that topic + * that pass an optional filter function + * + * @param {Gossipsub} router + * @param {String} topic + * @param {Number} count + * @param {Function} [filter] a function to filter acceptable peers + * @returns {Set} + * + */ +export function getWakuPeers( + router: WakuRelayPubsub, + topic: string, + count: number, + filter: (id: string) => boolean = () => true +): Set { + const peersInTopic = router.topics.get(topic); + if (!peersInTopic) { + return new Set(); + } + + // Adds all peers using our protocol + // that also pass the filter function + let peers: string[] = []; + peersInTopic.forEach((id) => { + const peerStreams = router.peers.get(id); + if (!peerStreams) { + return; + } + if (peerStreams.protocol == CODEC && filter(id)) { + peers.push(id); + } + }); + + // Pseudo-randomly shuffles peers + peers = shuffle(peers); + if (count > 0 && peers.length > count) { + peers = peers.slice(0, count); + } + + return new Set(peers); +} diff --git a/src/lib/waku_relay.spec.ts b/src/lib/waku_relay.spec.ts index 7f013f8ccd..549b239b61 100644 --- a/src/lib/waku_relay.spec.ts +++ b/src/lib/waku_relay.spec.ts @@ -77,8 +77,13 @@ describe('Waku Relay', () => { ); const multiAddrWithId = localMultiaddr + '/p2p/' + peerId; - nimWaku = new NimWaku(this.test!.title); + nimWaku = new NimWaku(this.test!.ctx!.currentTest!.title); await nimWaku.start({ staticnode: multiAddrWithId }); + + await waku.relay.subscribe(); + await new Promise((resolve) => + waku.libp2p.pubsub.once('gossipsub:heartbeat', resolve) + ); }); afterEach(async function () { @@ -97,10 +102,6 @@ describe('Waku Relay', () => { this.timeout(5000); const message = Message.fromUtf8String('This is a message'); - // TODO: nim-waku does follow the `StrictNoSign` policy hence we need to change - // it for nim-waku to process our messages. Can be removed once - // https://github.com/status-im/nim-waku/issues/422 is fixed - waku.libp2p.pubsub.globalSignaturePolicy = 'StrictSign'; await waku.relay.publish(message); diff --git a/src/lib/waku_relay.ts b/src/lib/waku_relay.ts index 7239a05a56..76a03caec3 100644 --- a/src/lib/waku_relay.ts +++ b/src/lib/waku_relay.ts @@ -3,6 +3,7 @@ import { Libp2p } from 'libp2p-gossipsub/src/interfaces'; import Pubsub from 'libp2p-interfaces/src/pubsub'; import { SignaturePolicy } from 'libp2p-interfaces/src/pubsub/signature-policy'; +import { getWakuPeers } from './get_waku_peers'; import { Message } from './waku_message'; export const CODEC = '/vac/waku/relay/2.0.0-beta2'; @@ -30,6 +31,63 @@ export class WakuRelayPubsub extends Gossipsub { // implementing WakuRelay from scratch. Object.assign(this, { multicodecs }); } + + /** + * Join topic + * @param {string} topic + * @returns {void} + * @override + */ + join(topic: string): void { + if (!this.started) { + throw new Error('WakuRelayPubsub has not started'); + } + + const fanoutPeers = this.fanout.get(topic); + if (fanoutPeers) { + // these peers have a score above the publish threshold, which may be negative + // so drop the ones with a negative score + fanoutPeers.forEach((id) => { + if (this.score.score(id) < 0) { + fanoutPeers.delete(id); + } + }); + if (fanoutPeers.size < this._options.D) { + // we need more peers; eager, as this would get fixed in the next heartbeat + getWakuPeers( + this, + topic, + this._options.D - fanoutPeers.size, + (id: string): boolean => { + // filter our current peers, direct peers, and peers with negative scores + return ( + !fanoutPeers.has(id) && + !this.direct.has(id) && + this.score.score(id) >= 0 + ); + } + ).forEach((id) => fanoutPeers.add(id)); + } + this.mesh.set(topic, fanoutPeers); + this.fanout.delete(topic); + this.lastpub.delete(topic); + } else { + const peers = getWakuPeers( + this, + topic, + this._options.D, + (id: string): boolean => { + // filter direct peers and peers with negative score + return !this.direct.has(id) && this.score.score(id) >= 0; + } + ); + this.mesh.set(topic, peers); + } + this.mesh.get(topic)!.forEach((id) => { + this.log('JOIN: Add mesh link to %s in %s', id, topic); + this._sendGraft(id, topic); + }); + } } // TODO: Implement dial for an address with format '/ip4/127.0.0.1/tcp/60000/p2p/16Uiu2HAkyzsXzENw5XBDYEQQAeQTCYjBJpMLgBmEXuwbtcrgxBJ4'