From f35322967dc8569b0930fdf073b72ecb4d5c4c79 Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Thu, 1 Apr 2021 15:46:14 +1100 Subject: [PATCH 1/8] Clarify that incoming connections are actually not necessary --- README.md | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index d215c9f6c3..8c7ce3cb96 100644 --- a/README.md +++ b/README.md @@ -16,10 +16,12 @@ To run the chat app: ```shell npm install -npm run chat:app -- --staticNode /ip4/134.209.139.210/tcp/30303/p2p/16Uiu2HAmPLe7Mzm8TsYUubgCAW1aJoeFScxrLj8ppHFivPo97bUZ --listenAddr /ip4/0.0.0.0/tcp/55123 +npm run chat:app -- --staticNode /ip4/134.209.139.210/tcp/30303/p2p/16Uiu2HAmPLe7Mzm8TsYUubgCAW1aJoeFScxrLj8ppHFivPo97bUZ ``` -The `--listenAddr` parameter is optional, however [NAT passthrough](https://github.com/status-im/js-waku/issues/12) is not yet supported, so you'll need the listening port to be open to receive messages from the fleet. +You can also specify an optional `listenAddr` parameter (.e.g `--listenAddr /ip4/0.0.0.0/tcp/55123`). +This is only useful if you want a remote node to dial to your chat app, +it is not necessary in normal usage when you just connect to the fleet. ## Contributing From 95b88d2815f808c33d45e4063e01ff9622becbb3 Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Thu, 1 Apr 2021 15:47:34 +1100 Subject: [PATCH 2/8] Check against gossipsub protocol was also done in publish Meaning it would sometimes fail to recognize another waku node at a subscriber of a topic, depending on timing with the `subscribe` call. --- src/lib/waku_relay.spec.ts | 13 +------ src/lib/waku_relay.ts | 74 ++++++++++++++++++++++++++++++++++++-- 2 files changed, 73 insertions(+), 14 deletions(-) diff --git a/src/lib/waku_relay.spec.ts b/src/lib/waku_relay.spec.ts index bedcacd766..cbb6d14729 100644 --- a/src/lib/waku_relay.spec.ts +++ b/src/lib/waku_relay.spec.ts @@ -73,24 +73,13 @@ describe('Waku Relay', () => { expect(protocols.findIndex((value) => value.match(/sub/))).to.eq(-1); }); - // TODO: Fix this - it.skip('Publish', async function () { + it('Publish', async function () { this.timeout(10000); const message = WakuMessage.fromUtf8String('JS to JS communication works'); - // waku.libp2p.pubsub.globalSignaturePolicy = 'StrictSign'; const receivedPromise = waitForNextData(waku2.libp2p.pubsub); - await Promise.all([ - new Promise((resolve) => - waku1.libp2p.pubsub.once('gossipsub:heartbeat', resolve) - ), - new Promise((resolve) => - waku2.libp2p.pubsub.once('gossipsub:heartbeat', resolve) - ), - ]); - await waku1.relay.publish(message); const receivedMsg = await receivedPromise; diff --git a/src/lib/waku_relay.ts b/src/lib/waku_relay.ts index 1069e1f159..7e10757f71 100644 --- a/src/lib/waku_relay.ts +++ b/src/lib/waku_relay.ts @@ -1,6 +1,7 @@ import Gossipsub from 'libp2p-gossipsub'; import { Libp2p } from 'libp2p-gossipsub/src/interfaces'; -import Pubsub from 'libp2p-interfaces/src/pubsub'; +import { createGossipRpc, messageIdToString } from 'libp2p-gossipsub/src/utils'; +import Pubsub, { InMessage } from 'libp2p-interfaces/src/pubsub'; import { SignaturePolicy } from 'libp2p-interfaces/src/pubsub/signature-policy'; import { getWakuPeers } from './get_waku_peers'; @@ -87,9 +88,78 @@ export class WakuRelayPubsub extends Gossipsub { this._sendGraft(id, topic); }); } + + /** + * Publish messages + * + * @override + * @param {InMessage} msg + * @returns {void} + */ + async _publish(msg: InMessage): Promise { + if (msg.receivedFrom !== this.peerId.toB58String()) { + this.score.deliverMessage(msg); + this.gossipTracer.deliverMessage(msg); + } + + const msgID = this.getMsgId(msg); + const msgIdStr = messageIdToString(msgID); + // put in seen cache + this.seenCache.put(msgIdStr); + + this.messageCache.put(msg); + + const toSend = new Set(); + msg.topicIDs.forEach((topic) => { + const peersInTopic = this.topics.get(topic); + if (!peersInTopic) { + return; + } + + // direct peers + this.direct.forEach((id) => { + toSend.add(id); + }); + + let meshPeers = this.mesh.get(topic); + if (!meshPeers || !meshPeers.size) { + // We are not in the mesh for topic, use fanout peers + meshPeers = this.fanout.get(topic); + if (!meshPeers) { + // If we are not in the fanout, then pick peers in topic above the publishThreshold + const peers = getWakuPeers(this, topic, this._options.D, (id) => { + return ( + this.score.score(id) >= + this._options.scoreThresholds.publishThreshold + ); + }); + + if (peers.size > 0) { + meshPeers = peers; + this.fanout.set(topic, peers); + } else { + meshPeers = new Set(); + } + } + // Store the latest publishing time + this.lastpub.set(topic, this._now()); + } + + meshPeers!.forEach((peer) => { + toSend.add(peer); + }); + }); + // Publish messages to peers + const rpc = createGossipRpc([Gossipsub.utils.normalizeOutRpcMessage(msg)]); + toSend.forEach((id) => { + if (id === msg.from) { + return; + } + this._sendRpc(id, rpc); + }); + } } -// TODO: Implement dial for an address with format '/ip4/127.0.0.1/tcp/60000/p2p/16Uiu2HAkyzsXzENw5XBDYEQQAeQTCYjBJpMLgBmEXuwbtcrgxBJ4' // This class provides an interface to execute the waku relay protocol export class WakuRelay { constructor(private pubsub: Pubsub) {} From ed5a36309695b44c27b1f81eb41c2706b3cd5e32 Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Thu, 1 Apr 2021 16:18:06 +1100 Subject: [PATCH 3/8] Ensure that heartbeat checks for waku, and not gossipsub, peers --- .cspell.json | 11 + src/lib/waku_relay.ts | 5 + src/lib/waku_relay/constants.ts | 33 +++ src/lib/waku_relay/index.ts | 2 + src/lib/waku_relay/relay_heartbeat.ts | 368 ++++++++++++++++++++++++++ 5 files changed, 419 insertions(+) create mode 100644 src/lib/waku_relay/constants.ts create mode 100644 src/lib/waku_relay/index.ts create mode 100644 src/lib/waku_relay/relay_heartbeat.ts diff --git a/.cspell.json b/.cspell.json index 8ed6e3062c..dcdc79e7f6 100644 --- a/.cspell.json +++ b/.cspell.json @@ -3,6 +3,8 @@ "$schema": "https://raw.githubusercontent.com/streetsidesoftware/cspell/master/cspell.schema.json", "language": "en", "words": [ + "backoff", + "backoffs", "bitjson", "bitauth", "bufbuild", @@ -11,6 +13,8 @@ "codecov", "commitlint", "dependabot", + "Dout", + "Dscore", "editorconfig", "esnext", "execa", @@ -18,6 +22,11 @@ "fanout", "globby", "gossipsub", + "iasked", + "ihave", + "ihaves", + "ineed", + "iwant", "lastpub", "libauth", "libp", @@ -28,6 +37,7 @@ "mplex", "muxer", "nodekey", + "peerhave", "prettierignore", "protobuf", "protoc", @@ -38,6 +48,7 @@ "submodules", "transpiled", "typedoc", + "unmounts", "untracked", "upgrader", "waku", diff --git a/src/lib/waku_relay.ts b/src/lib/waku_relay.ts index 7e10757f71..87d2727281 100644 --- a/src/lib/waku_relay.ts +++ b/src/lib/waku_relay.ts @@ -6,6 +6,7 @@ import { SignaturePolicy } from 'libp2p-interfaces/src/pubsub/signature-policy'; import { getWakuPeers } from './get_waku_peers'; import { WakuMessage } from './waku_message'; +import { RelayHeartbeat } from './waku_relay/relay_heartbeat'; export const CODEC = '/vac/waku/relay/2.0.0-beta2'; @@ -14,6 +15,8 @@ export const TOPIC = '/waku/2/default-waku/proto'; // This is the class to pass to libp2p as pubsub protocol export class WakuRelayPubsub extends Gossipsub { + heartbeat: RelayHeartbeat; + /** * * @param libp2p: Libp2p @@ -25,6 +28,8 @@ export class WakuRelayPubsub extends Gossipsub { globalSignaturePolicy: SignaturePolicy.StrictNoSign, }); + this.heartbeat = new RelayHeartbeat(this); + const multicodecs = [CODEC]; // This is the downside of using `libp2p-gossipsub` instead of diff --git a/src/lib/waku_relay/constants.ts b/src/lib/waku_relay/constants.ts new file mode 100644 index 0000000000..515f2e4c4f --- /dev/null +++ b/src/lib/waku_relay/constants.ts @@ -0,0 +1,33 @@ +export const second = 1000; +export const minute = 60 * second; + +/** + * GossipsubHeartbeatInitialDelay is the short delay before the heartbeat timer begins + * after the router is initialized. + */ +export const RelayHeartbeatInitialDelay = 100; + +/** + * RelayHeartbeatInterval controls the time between heartbeats. + */ +export const RelayHeartbeatInterval = second; + +/** + * RelayFanoutTTL controls how long we keep track of the fanout state. If it's been + * RelayFanoutTTL since we've published to a topic that we're not subscribed to, + * we'll delete the fanout map for that topic. + */ +export const RelayFanoutTTL = minute; + +/** + * RelayOpportunisticGraftTicks is the number of heartbeat ticks for attempting to improve the mesh + * with opportunistic grafting. Every RelayOpportunisticGraftTicks we will attempt to select some + * high-scoring mesh peers to replace lower-scoring ones, if the median score of our mesh peers falls + * below a threshold + */ +export const RelayOpportunisticGraftTicks = 60; + +/** + * RelayOpportunisticGraftPeers is the number of peers to opportunistically graft. + */ +export const RelayOpportunisticGraftPeers = 2; diff --git a/src/lib/waku_relay/index.ts b/src/lib/waku_relay/index.ts new file mode 100644 index 0000000000..e93d868e54 --- /dev/null +++ b/src/lib/waku_relay/index.ts @@ -0,0 +1,2 @@ +export * from './constants'; +export * from './relay_heartbeat'; diff --git a/src/lib/waku_relay/relay_heartbeat.ts b/src/lib/waku_relay/relay_heartbeat.ts new file mode 100644 index 0000000000..a910a56f8b --- /dev/null +++ b/src/lib/waku_relay/relay_heartbeat.ts @@ -0,0 +1,368 @@ +import Gossipsub from 'libp2p-gossipsub'; +import { Heartbeat } from 'libp2p-gossipsub/src/heartbeat'; +import { shuffle } from 'libp2p-gossipsub/src/utils'; + +import { getWakuPeers } from '../get_waku_peers'; + +import * as constants from './constants'; + +export class RelayHeartbeat extends Heartbeat { + /** + * @param {Object} gossipsub + * @constructor + */ + constructor(gossipsub: Gossipsub) { + super(gossipsub); + } + + start(): void { + if (this._heartbeatTimer) { + return; + } + + const heartbeat = this._heartbeat.bind(this); + + const timeout = setTimeout(() => { + heartbeat(); + this._heartbeatTimer!.runPeriodically( + heartbeat, + constants.RelayHeartbeatInterval + ); + }, constants.RelayHeartbeatInitialDelay); + + this._heartbeatTimer = { + _intervalId: undefined, + runPeriodically: (fn, period) => { + this._heartbeatTimer!._intervalId = setInterval(fn, period); + }, + cancel: () => { + clearTimeout(timeout); + clearInterval(this._heartbeatTimer!._intervalId as NodeJS.Timeout); + }, + }; + } + + /** + * Unmounts the gossipsub protocol and shuts down every connection + * @override + * @returns {void} + */ + stop(): void { + if (!this._heartbeatTimer) { + return; + } + + this._heartbeatTimer.cancel(); + this._heartbeatTimer = null; + } + + /** + * Maintains the mesh and fanout maps in gossipsub. + * + * @returns {void} + */ + _heartbeat(): void { + const { D, Dlo, Dhi, Dscore, Dout } = this.gossipsub._options; + this.gossipsub.heartbeatTicks++; + + // cache scores through the heartbeat + const scores = new Map(); + const getScore = (id: string): number => { + let s = scores.get(id); + if (s === undefined) { + s = this.gossipsub.score.score(id); + scores.set(id, s); + } + return s; + }; + + // peer id => topic[] + const toGraft = new Map(); + // peer id => topic[] + const toPrune = new Map(); + // peer id => don't px + const noPX = new Map(); + + // clean up expired backoffs + this.gossipsub._clearBackoff(); + + // clean up peerhave/iasked counters + this.gossipsub.peerhave.clear(); + this.gossipsub.iasked.clear(); + + // apply IWANT request penalties + this.gossipsub._applyIwantPenalties(); + + // ensure direct peers are connected + this.gossipsub._directConnect(); + + // maintain the mesh for topics we have joined + this.gossipsub.mesh.forEach((peers, topic) => { + // prune/graft helper functions (defined per topic) + const prunePeer = (id: string): void => { + this.gossipsub.log( + 'HEARTBEAT: Remove mesh link to %s in %s', + id, + topic + ); + // update peer score + this.gossipsub.score.prune(id, topic); + // add prune backoff record + this.gossipsub._addBackoff(id, topic); + // remove peer from mesh + peers.delete(id); + // add to toPrune + const topics = toPrune.get(id); + if (!topics) { + toPrune.set(id, [topic]); + } else { + topics.push(topic); + } + }; + const graftPeer = (id: string): void => { + this.gossipsub.log('HEARTBEAT: Add mesh link to %s in %s', id, topic); + // update peer score + this.gossipsub.score.graft(id, topic); + // add peer to mesh + peers.add(id); + // add to toGraft + const topics = toGraft.get(id); + if (!topics) { + toGraft.set(id, [topic]); + } else { + topics.push(topic); + } + }; + + // drop all peers with negative score, without PX + peers.forEach((id) => { + const score = getScore(id); + if (score < 0) { + this.gossipsub.log( + 'HEARTBEAT: Prune peer %s with negative score: score=%d, topic=%s', + id, + score, + topic + ); + prunePeer(id); + noPX.set(id, true); + } + }); + + // do we have enough peers? + if (peers.size < Dlo) { + const backoff = this.gossipsub.backoff.get(topic); + const ineed = D - peers.size; + const peersSet = getWakuPeers(this.gossipsub, topic, ineed, (id) => { + // filter out mesh peers, direct peers, peers we are backing off, peers with negative score + return ( + !peers.has(id) && + !this.gossipsub.direct.has(id) && + (!backoff || !backoff.has(id)) && + getScore(id) >= 0 + ); + }); + + peersSet.forEach(graftPeer); + } + + // do we have to many peers? + if (peers.size > Dhi) { + let peersArray = Array.from(peers); + // sort by score + peersArray.sort((a, b) => getScore(b) - getScore(a)); + // We keep the first D_score peers by score and the remaining up to D randomly + // under the constraint that we keep D_out peers in the mesh (if we have that many) + peersArray = peersArray + .slice(0, Dscore) + .concat(shuffle(peersArray.slice(Dscore))); + + // count the outbound peers we are keeping + let outbound = 0; + peersArray.slice(0, D).forEach((p) => { + if (this.gossipsub.outbound.get(p)) { + outbound++; + } + }); + + // if it's less than D_out, bubble up some outbound peers from the random selection + if (outbound < Dout) { + const rotate = (i: number): void => { + // rotate the peersArray to the right and put the ith peer in the front + const p = peersArray[i]; + for (let j = i; j > 0; j--) { + peersArray[j] = peersArray[j - 1]; + } + peersArray[0] = p; + }; + + // first bubble up all outbound peers already in the selection to the front + if (outbound > 0) { + let ihave = outbound; + for (let i = 1; i < D && ihave > 0; i++) { + if (this.gossipsub.outbound.get(peersArray[i])) { + rotate(i); + ihave--; + } + } + } + + // now bubble up enough outbound peers outside the selection to the front + let ineed = D - outbound; + for (let i = D; i < peersArray.length && ineed > 0; i++) { + if (this.gossipsub.outbound.get(peersArray[i])) { + rotate(i); + ineed--; + } + } + } + + // prune the excess peers + peersArray.slice(D).forEach(prunePeer); + } + + // do we have enough outbound peers? + if (peers.size >= Dlo) { + // count the outbound peers we have + let outbound = 0; + peers.forEach((p) => { + if (this.gossipsub.outbound.get(p)) { + outbound++; + } + }); + + // if it's less than D_out, select some peers with outbound connections and graft them + if (outbound < Dout) { + const ineed = Dout - outbound; + const backoff = this.gossipsub.backoff.get(topic); + getWakuPeers(this.gossipsub, topic, ineed, (id: string): boolean => { + // filter our current mesh peers, direct peers, peers we are backing off, peers with negative score + return ( + !peers.has(id) && + !this.gossipsub.direct.has(id) && + (!backoff || !backoff.has(id)) && + getScore(id) >= 0 + ); + }).forEach(graftPeer); + } + } + + // should we try to improve the mesh with opportunistic grafting? + if ( + this.gossipsub.heartbeatTicks % + constants.RelayOpportunisticGraftTicks === + 0 && + peers.size > 1 + ) { + // Opportunistic grafting works as follows: we check the median score of peers in the + // mesh; if this score is below the opportunisticGraftThreshold, we select a few peers at + // random with score over the median. + // The intention is to (slowly) improve an under performing mesh by introducing good + // scoring peers that may have been gossiping at us. This allows us to get out of sticky + // situations where we are stuck with poor peers and also recover from churn of good peers. + + // now compute the median peer score in the mesh + const peersList = Array.from(peers).sort( + (a, b) => getScore(a) - getScore(b) + ); + const medianIndex = peers.size / 2; + const medianScore = getScore(peersList[medianIndex]); + + // if the median score is below the threshold, select a better peer (if any) and GRAFT + if ( + medianScore < + this.gossipsub._options.scoreThresholds.opportunisticGraftThreshold + ) { + const backoff = this.gossipsub.backoff.get(topic); + const peersToGraft = getWakuPeers( + this.gossipsub, + topic, + constants.RelayOpportunisticGraftPeers, + (id: string): boolean => { + // filter out current mesh peers, direct peers, peers we are backing off, peers below or at threshold + return ( + peers.has(id) && + !this.gossipsub.direct.has(id) && + (!backoff || !backoff.has(id)) && + getScore(id) > medianScore + ); + } + ); + peersToGraft.forEach((id) => { + this.gossipsub.log( + 'HEARTBEAT: Opportunistically graft peer %s on topic %s', + id, + topic + ); + graftPeer(id); + }); + } + } + + // 2nd arg are mesh peers excluded from gossip. We have already pushed + // messages to them, so its redundant to gossip IHAVEs. + this.gossipsub._emitGossip(topic, peers); + }); + + // expire fanout for topics we haven't published to in a while + const now = this.gossipsub._now(); + this.gossipsub.lastpub.forEach((lastpub, topic) => { + if (lastpub + constants.RelayFanoutTTL < now) { + this.gossipsub.fanout.delete(topic); + this.gossipsub.lastpub.delete(topic); + } + }); + + // maintain our fanout for topics we are publishing but we have not joined + this.gossipsub.fanout.forEach((fanoutPeers, topic) => { + // checks whether our peers are still in the topic and have a score above the publish threshold + const topicPeers = this.gossipsub.topics.get(topic); + fanoutPeers.forEach((id) => { + if ( + !topicPeers!.has(id) || + getScore(id) < + this.gossipsub._options.scoreThresholds.publishThreshold + ) { + fanoutPeers.delete(id); + } + }); + + // do we need more peers? + if (fanoutPeers.size < D) { + const ineed = D - fanoutPeers.size; + const peersSet = getWakuPeers( + this.gossipsub, + topic, + ineed, + (id: string): boolean => { + // filter out existing fanout peers, direct peers, and peers with score above the publish threshold + return ( + !fanoutPeers.has(id) && + !this.gossipsub.direct.has(id) && + getScore(id) >= + this.gossipsub._options.scoreThresholds.publishThreshold + ); + } + ); + peersSet.forEach((id) => { + fanoutPeers.add(id); + }); + } + + // 2nd arg are fanout peers excluded from gossip. + // We have already pushed messages to them, so its redundant to gossip IHAVEs + this.gossipsub._emitGossip(topic, fanoutPeers); + }); + + // send coalesced GRAFT/PRUNE messages (will piggyback gossip) + this.gossipsub._sendGraftPrune(toGraft, toPrune, noPX); + + // flush pending gossip that wasn't piggybacked above + this.gossipsub._flush(); + + // advance the message history window + this.gossipsub.messageCache.shift(); + + this.gossipsub.emit('gossipsub:heartbeat'); + } +} From 7bd48b6220e935408021896c39ed9c30bad65ed2 Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Thu, 1 Apr 2021 16:37:05 +1100 Subject: [PATCH 4/8] Move all relay files under common folder --- src/lib/waku_relay.ts | 181 ----------------- src/lib/{ => waku_relay}/get_waku_peers.ts | 4 +- .../index.spec.ts} | 24 +-- src/lib/waku_relay/index.ts | 183 ++++++++++++++++++ src/lib/waku_relay/relay_heartbeat.ts | 30 +-- 5 files changed, 215 insertions(+), 207 deletions(-) delete mode 100644 src/lib/waku_relay.ts rename src/lib/{ => waku_relay}/get_waku_peers.ts (92%) rename src/lib/{waku_relay.spec.ts => waku_relay/index.spec.ts} (95%) diff --git a/src/lib/waku_relay.ts b/src/lib/waku_relay.ts deleted file mode 100644 index 87d2727281..0000000000 --- a/src/lib/waku_relay.ts +++ /dev/null @@ -1,181 +0,0 @@ -import Gossipsub from 'libp2p-gossipsub'; -import { Libp2p } from 'libp2p-gossipsub/src/interfaces'; -import { createGossipRpc, messageIdToString } from 'libp2p-gossipsub/src/utils'; -import Pubsub, { InMessage } from 'libp2p-interfaces/src/pubsub'; -import { SignaturePolicy } from 'libp2p-interfaces/src/pubsub/signature-policy'; - -import { getWakuPeers } from './get_waku_peers'; -import { WakuMessage } from './waku_message'; -import { RelayHeartbeat } from './waku_relay/relay_heartbeat'; - -export const CODEC = '/vac/waku/relay/2.0.0-beta2'; - -// As per waku specs, the topic is fixed, value taken from nim-waku -export const TOPIC = '/waku/2/default-waku/proto'; - -// This is the class to pass to libp2p as pubsub protocol -export class WakuRelayPubsub extends Gossipsub { - heartbeat: RelayHeartbeat; - - /** - * - * @param libp2p: Libp2p - */ - constructor(libp2p: Libp2p) { - super(libp2p, { - emitSelf: false, - // Ensure that no signature is expected in the messages. - globalSignaturePolicy: SignaturePolicy.StrictNoSign, - }); - - this.heartbeat = new RelayHeartbeat(this); - - const multicodecs = [CODEC]; - - // This is the downside of using `libp2p-gossipsub` instead of - // implementing WakuRelay from scratch. - Object.assign(this, { multicodecs }); - } - - /** - * Join topic - * @param {string} topic - * @returns {void} - * @override - */ - join(topic: string): void { - if (!this.started) { - throw new Error('WakuRelayPubsub has not started'); - } - - const fanoutPeers = this.fanout.get(topic); - if (fanoutPeers) { - // these peers have a score above the publish threshold, which may be negative - // so drop the ones with a negative score - fanoutPeers.forEach((id) => { - if (this.score.score(id) < 0) { - fanoutPeers.delete(id); - } - }); - if (fanoutPeers.size < this._options.D) { - // we need more peers; eager, as this would get fixed in the next heartbeat - getWakuPeers( - this, - topic, - this._options.D - fanoutPeers.size, - (id: string): boolean => { - // filter our current peers, direct peers, and peers with negative scores - return ( - !fanoutPeers.has(id) && - !this.direct.has(id) && - this.score.score(id) >= 0 - ); - } - ).forEach((id) => fanoutPeers.add(id)); - } - this.mesh.set(topic, fanoutPeers); - this.fanout.delete(topic); - this.lastpub.delete(topic); - } else { - const peers = getWakuPeers( - this, - topic, - this._options.D, - (id: string): boolean => { - // filter direct peers and peers with negative score - return !this.direct.has(id) && this.score.score(id) >= 0; - } - ); - this.mesh.set(topic, peers); - } - this.mesh.get(topic)!.forEach((id) => { - this.log('JOIN: Add mesh link to %s in %s', id, topic); - this._sendGraft(id, topic); - }); - } - - /** - * Publish messages - * - * @override - * @param {InMessage} msg - * @returns {void} - */ - async _publish(msg: InMessage): Promise { - if (msg.receivedFrom !== this.peerId.toB58String()) { - this.score.deliverMessage(msg); - this.gossipTracer.deliverMessage(msg); - } - - const msgID = this.getMsgId(msg); - const msgIdStr = messageIdToString(msgID); - // put in seen cache - this.seenCache.put(msgIdStr); - - this.messageCache.put(msg); - - const toSend = new Set(); - msg.topicIDs.forEach((topic) => { - const peersInTopic = this.topics.get(topic); - if (!peersInTopic) { - return; - } - - // direct peers - this.direct.forEach((id) => { - toSend.add(id); - }); - - let meshPeers = this.mesh.get(topic); - if (!meshPeers || !meshPeers.size) { - // We are not in the mesh for topic, use fanout peers - meshPeers = this.fanout.get(topic); - if (!meshPeers) { - // If we are not in the fanout, then pick peers in topic above the publishThreshold - const peers = getWakuPeers(this, topic, this._options.D, (id) => { - return ( - this.score.score(id) >= - this._options.scoreThresholds.publishThreshold - ); - }); - - if (peers.size > 0) { - meshPeers = peers; - this.fanout.set(topic, peers); - } else { - meshPeers = new Set(); - } - } - // Store the latest publishing time - this.lastpub.set(topic, this._now()); - } - - meshPeers!.forEach((peer) => { - toSend.add(peer); - }); - }); - // Publish messages to peers - const rpc = createGossipRpc([Gossipsub.utils.normalizeOutRpcMessage(msg)]); - toSend.forEach((id) => { - if (id === msg.from) { - return; - } - this._sendRpc(id, rpc); - }); - } -} - -// This class provides an interface to execute the waku relay protocol -export class WakuRelay { - constructor(private pubsub: Pubsub) {} - - // At this stage we are always using the same topic so we do not pass it as a parameter - async subscribe() { - await this.pubsub.subscribe(TOPIC); - } - - async publish(message: WakuMessage) { - const msg = message.toBinary(); - await this.pubsub.publish(TOPIC, msg); - } -} diff --git a/src/lib/get_waku_peers.ts b/src/lib/waku_relay/get_waku_peers.ts similarity index 92% rename from src/lib/get_waku_peers.ts rename to src/lib/waku_relay/get_waku_peers.ts index 43ebbadc70..9dd788a955 100644 --- a/src/lib/get_waku_peers.ts +++ b/src/lib/waku_relay/get_waku_peers.ts @@ -1,6 +1,6 @@ import { shuffle } from 'libp2p-gossipsub/src/utils'; -import { CODEC, WakuRelayPubsub } from './waku_relay'; +import { CODEC, WakuRelayPubsub } from './index'; /** * Given a topic, returns up to count peers subscribed to that topic @@ -27,7 +27,7 @@ export function getWakuPeers( // Adds all peers using our protocol // that also pass the filter function let peers: string[] = []; - peersInTopic.forEach((id) => { + peersInTopic.forEach((id: string) => { const peerStreams = router.peers.get(id); if (!peerStreams) { return; diff --git a/src/lib/waku_relay.spec.ts b/src/lib/waku_relay/index.spec.ts similarity index 95% rename from src/lib/waku_relay.spec.ts rename to src/lib/waku_relay/index.spec.ts index cbb6d14729..6daab699a4 100644 --- a/src/lib/waku_relay.spec.ts +++ b/src/lib/waku_relay/index.spec.ts @@ -1,14 +1,14 @@ import { expect } from 'chai'; import Pubsub from 'libp2p-interfaces/src/pubsub'; -import { NOISE_KEY_1, NOISE_KEY_2 } from '../test_utils/constants'; -import { delay } from '../test_utils/delay'; -import { makeLogFileName } from '../test_utils/log_file'; -import { NimWaku } from '../test_utils/nim_waku'; +import { NOISE_KEY_1, NOISE_KEY_2 } from '../../test_utils/constants'; +import { delay } from '../../test_utils/delay'; +import { makeLogFileName } from '../../test_utils/log_file'; +import { NimWaku } from '../../test_utils/nim_waku'; +import Waku from '../waku'; +import { WakuMessage } from '../waku_message'; -import Waku from './waku'; -import { WakuMessage } from './waku_message'; -import { CODEC, TOPIC } from './waku_relay'; +import { CODEC, TOPIC } from './index'; describe('Waku Relay', () => { afterEach(function () { @@ -41,13 +41,15 @@ describe('Waku Relay', () => { await Promise.all([ new Promise((resolve) => - waku1.libp2p.pubsub.once('pubsub:subscription-change', (...args) => - resolve(args) + waku1.libp2p.pubsub.once( + 'pubsub:subscription-change', + (...args: any[]) => resolve(args) ) ), new Promise((resolve) => - waku2.libp2p.pubsub.once('pubsub:subscription-change', (...args) => - resolve(args) + waku2.libp2p.pubsub.once( + 'pubsub:subscription-change', + (...args: any[]) => resolve(args) ) ), ]); diff --git a/src/lib/waku_relay/index.ts b/src/lib/waku_relay/index.ts index e93d868e54..cd507ed797 100644 --- a/src/lib/waku_relay/index.ts +++ b/src/lib/waku_relay/index.ts @@ -1,2 +1,185 @@ +import Gossipsub from 'libp2p-gossipsub'; +import { Libp2p } from 'libp2p-gossipsub/src/interfaces'; +import { createGossipRpc, messageIdToString } from 'libp2p-gossipsub/src/utils'; +import Pubsub, { InMessage } from 'libp2p-interfaces/src/pubsub'; +import { SignaturePolicy } from 'libp2p-interfaces/src/pubsub/signature-policy'; + +import { WakuMessage } from '../waku_message'; + +import { getWakuPeers } from './get_waku_peers'; +import { RelayHeartbeat } from './relay_heartbeat'; + export * from './constants'; export * from './relay_heartbeat'; + +export const CODEC = '/vac/waku/relay/2.0.0-beta2'; + +// As per waku specs, the topic is fixed, value taken from nim-waku +export const TOPIC = '/waku/2/default-waku/proto'; + +// This is the class to pass to libp2p as pubsub protocol +export class WakuRelayPubsub extends Gossipsub { + heartbeat: RelayHeartbeat; + + /** + * + * @param libp2p: Libp2p + */ + constructor(libp2p: Libp2p) { + super(libp2p, { + emitSelf: false, + // Ensure that no signature is expected in the messages. + globalSignaturePolicy: SignaturePolicy.StrictNoSign, + }); + + this.heartbeat = new RelayHeartbeat(this); + + const multicodecs = [CODEC]; + + // This is the downside of using `libp2p-gossipsub` instead of + // implementing WakuRelay from scratch. + Object.assign(this, { multicodecs }); + } + + /** + * Join topic + * @param {string} topic + * @returns {void} + * @override + */ + join(topic: string): void { + if (!this.started) { + throw new Error('WakuRelayPubsub has not started'); + } + + const fanoutPeers = this.fanout.get(topic); + if (fanoutPeers) { + // these peers have a score above the publish threshold, which may be negative + // so drop the ones with a negative score + fanoutPeers.forEach((id) => { + if (this.score.score(id) < 0) { + fanoutPeers.delete(id); + } + }); + if (fanoutPeers.size < this._options.D) { + // we need more peers; eager, as this would get fixed in the next heartbeat + getWakuPeers( + this, + topic, + this._options.D - fanoutPeers.size, + (id: string): boolean => { + // filter our current peers, direct peers, and peers with negative scores + return ( + !fanoutPeers.has(id) && + !this.direct.has(id) && + this.score.score(id) >= 0 + ); + } + ).forEach((id) => fanoutPeers.add(id)); + } + this.mesh.set(topic, fanoutPeers); + this.fanout.delete(topic); + this.lastpub.delete(topic); + } else { + const peers = getWakuPeers( + this, + topic, + this._options.D, + (id: string): boolean => { + // filter direct peers and peers with negative score + return !this.direct.has(id) && this.score.score(id) >= 0; + } + ); + this.mesh.set(topic, peers); + } + this.mesh.get(topic)!.forEach((id) => { + this.log('JOIN: Add mesh link to %s in %s', id, topic); + this._sendGraft(id, topic); + }); + } + + /** + * Publish messages + * + * @override + * @param {InMessage} msg + * @returns {void} + */ + async _publish(msg: InMessage): Promise { + if (msg.receivedFrom !== this.peerId.toB58String()) { + this.score.deliverMessage(msg); + this.gossipTracer.deliverMessage(msg); + } + + const msgID = this.getMsgId(msg); + const msgIdStr = messageIdToString(msgID); + // put in seen cache + this.seenCache.put(msgIdStr); + + this.messageCache.put(msg); + + const toSend = new Set(); + msg.topicIDs.forEach((topic) => { + const peersInTopic = this.topics.get(topic); + if (!peersInTopic) { + return; + } + + // direct peers + this.direct.forEach((id) => { + toSend.add(id); + }); + + let meshPeers = this.mesh.get(topic); + if (!meshPeers || !meshPeers.size) { + // We are not in the mesh for topic, use fanout peers + meshPeers = this.fanout.get(topic); + if (!meshPeers) { + // If we are not in the fanout, then pick peers in topic above the publishThreshold + const peers = getWakuPeers(this, topic, this._options.D, (id) => { + return ( + this.score.score(id) >= + this._options.scoreThresholds.publishThreshold + ); + }); + + if (peers.size > 0) { + meshPeers = peers; + this.fanout.set(topic, peers); + } else { + meshPeers = new Set(); + } + } + // Store the latest publishing time + this.lastpub.set(topic, this._now()); + } + + meshPeers!.forEach((peer) => { + toSend.add(peer); + }); + }); + // Publish messages to peers + const rpc = createGossipRpc([Gossipsub.utils.normalizeOutRpcMessage(msg)]); + toSend.forEach((id) => { + if (id === msg.from) { + return; + } + this._sendRpc(id, rpc); + }); + } +} + +// This class provides an interface to execute the waku relay protocol +export class WakuRelay { + constructor(private pubsub: Pubsub) {} + + // At this stage we are always using the same topic so we do not pass it as a parameter + async subscribe() { + await this.pubsub.subscribe(TOPIC); + } + + async publish(message: WakuMessage) { + const msg = message.toBinary(); + await this.pubsub.publish(TOPIC, msg); + } +} diff --git a/src/lib/waku_relay/relay_heartbeat.ts b/src/lib/waku_relay/relay_heartbeat.ts index a910a56f8b..0842bfee6a 100644 --- a/src/lib/waku_relay/relay_heartbeat.ts +++ b/src/lib/waku_relay/relay_heartbeat.ts @@ -2,9 +2,8 @@ import Gossipsub from 'libp2p-gossipsub'; import { Heartbeat } from 'libp2p-gossipsub/src/heartbeat'; import { shuffle } from 'libp2p-gossipsub/src/utils'; -import { getWakuPeers } from '../get_waku_peers'; - import * as constants from './constants'; +import { getWakuPeers } from './get_waku_peers'; export class RelayHeartbeat extends Heartbeat { /** @@ -153,15 +152,20 @@ export class RelayHeartbeat extends Heartbeat { if (peers.size < Dlo) { const backoff = this.gossipsub.backoff.get(topic); const ineed = D - peers.size; - const peersSet = getWakuPeers(this.gossipsub, topic, ineed, (id) => { - // filter out mesh peers, direct peers, peers we are backing off, peers with negative score - return ( - !peers.has(id) && - !this.gossipsub.direct.has(id) && - (!backoff || !backoff.has(id)) && - getScore(id) >= 0 - ); - }); + const peersSet = getWakuPeers( + this.gossipsub, + topic, + ineed, + (id: string) => { + // filter out mesh peers, direct peers, peers we are backing off, peers with negative score + return ( + !peers.has(id) && + !this.gossipsub.direct.has(id) && + (!backoff || !backoff.has(id)) && + getScore(id) >= 0 + ); + } + ); peersSet.forEach(graftPeer); } @@ -288,7 +292,7 @@ export class RelayHeartbeat extends Heartbeat { ); } ); - peersToGraft.forEach((id) => { + peersToGraft.forEach((id: string) => { this.gossipsub.log( 'HEARTBEAT: Opportunistically graft peer %s on topic %s', id, @@ -344,7 +348,7 @@ export class RelayHeartbeat extends Heartbeat { ); } ); - peersSet.forEach((id) => { + peersSet.forEach((id: string) => { fanoutPeers.add(id); }); } From 433a490decf6b1203f6ec2a1e06e2df92077f185 Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Thu, 1 Apr 2021 16:41:49 +1100 Subject: [PATCH 5/8] Move waku relay codec and default topic to constants module --- src/chat/index.ts | 4 ++-- src/lib/waku.spec.ts | 4 ++-- src/lib/waku.ts | 6 +++--- src/lib/waku_relay/constants.ts | 10 ++++++++++ src/lib/waku_relay/get_waku_peers.ts | 4 ++-- src/lib/waku_relay/index.spec.ts | 22 ++++++++++++++-------- src/lib/waku_relay/index.ts | 12 ++++-------- src/test_utils/nim_waku.ts | 8 +++++--- 8 files changed, 42 insertions(+), 28 deletions(-) diff --git a/src/chat/index.ts b/src/chat/index.ts index 9be295e75b..e5760b00b8 100644 --- a/src/chat/index.ts +++ b/src/chat/index.ts @@ -3,7 +3,7 @@ import util from 'util'; import Waku from '../lib/waku'; import { WakuMessage } from '../lib/waku_message'; -import { TOPIC } from '../lib/waku_relay'; +import { RelayDefaultTopic } from '../lib/waku_relay'; import { delay } from '../test_utils/delay'; import { ChatMessage } from './chat_message'; @@ -29,7 +29,7 @@ import { ChatMessage } from './chat_message'; // TODO: Bubble event to waku, infer topic, decode msg // Tracked with https://github.com/status-im/js-waku/issues/19 - waku.libp2p.pubsub.on(TOPIC, (event) => { + waku.libp2p.pubsub.on(RelayDefaultTopic, (event) => { const wakuMsg = WakuMessage.decode(event.data); if (wakuMsg.payload) { const chatMsg = ChatMessage.decode(wakuMsg.payload); diff --git a/src/lib/waku.spec.ts b/src/lib/waku.spec.ts index 690feccb86..6f906e0e80 100644 --- a/src/lib/waku.spec.ts +++ b/src/lib/waku.spec.ts @@ -5,7 +5,7 @@ import { makeLogFileName } from '../test_utils/log_file'; import { NimWaku } from '../test_utils/nim_waku'; import Waku from './waku'; -import { CODEC } from './waku_relay'; +import { RelayCodec } from './waku_relay'; describe('Waku', function () { describe('Interop: Nim', function () { @@ -28,7 +28,7 @@ describe('Waku', function () { expect(nimPeers).to.deep.equal([ { multiaddr: multiAddrWithId, - protocol: CODEC, + protocol: RelayCodec, connected: true, }, ]); diff --git a/src/lib/waku.ts b/src/lib/waku.ts index 129313047c..1a7c0e1bbc 100644 --- a/src/lib/waku.ts +++ b/src/lib/waku.ts @@ -6,7 +6,7 @@ import TCP from 'libp2p-tcp'; import Multiaddr from 'multiaddr'; import PeerId from 'peer-id'; -import { CODEC, WakuRelay, WakuRelayPubsub } from './waku_relay'; +import { RelayCodec, WakuRelay, WakuRelayPubsub } from './waku_relay'; export interface CreateOptions { listenAddresses: string[]; @@ -56,12 +56,12 @@ export default class Waku { * @param peer The peer to dial */ async dial(peer: PeerId | Multiaddr | string) { - return this.libp2p.dialProtocol(peer, CODEC); + return this.libp2p.dialProtocol(peer, RelayCodec); } async dialWithMultiAddr(peerId: PeerId, multiaddr: Multiaddr[]) { this.libp2p.peerStore.addressBook.set(peerId, multiaddr); - await this.libp2p.dialProtocol(peerId, CODEC); + await this.libp2p.dialProtocol(peerId, RelayCodec); } async stop() { diff --git a/src/lib/waku_relay/constants.ts b/src/lib/waku_relay/constants.ts index 515f2e4c4f..c63d45459e 100644 --- a/src/lib/waku_relay/constants.ts +++ b/src/lib/waku_relay/constants.ts @@ -1,6 +1,16 @@ export const second = 1000; export const minute = 60 * second; +/** + * RelayCodec is the libp2p identifier for the waku relay protocol + */ +export const RelayCodec = '/vac/waku/relay/2.0.0-beta2'; + +/** + * RelayDefaultTopic is the default gossipsub topic to use for waku relay + */ +export const RelayDefaultTopic = '/waku/2/default-waku/proto'; + /** * GossipsubHeartbeatInitialDelay is the short delay before the heartbeat timer begins * after the router is initialized. diff --git a/src/lib/waku_relay/get_waku_peers.ts b/src/lib/waku_relay/get_waku_peers.ts index 9dd788a955..cda6067db3 100644 --- a/src/lib/waku_relay/get_waku_peers.ts +++ b/src/lib/waku_relay/get_waku_peers.ts @@ -1,6 +1,6 @@ import { shuffle } from 'libp2p-gossipsub/src/utils'; -import { CODEC, WakuRelayPubsub } from './index'; +import { RelayCodec, WakuRelayPubsub } from './index'; /** * Given a topic, returns up to count peers subscribed to that topic @@ -32,7 +32,7 @@ export function getWakuPeers( if (!peerStreams) { return; } - if (peerStreams.protocol == CODEC && filter(id)) { + if (peerStreams.protocol == RelayCodec && filter(id)) { peers.push(id); } }); diff --git a/src/lib/waku_relay/index.spec.ts b/src/lib/waku_relay/index.spec.ts index 6daab699a4..6e5a964483 100644 --- a/src/lib/waku_relay/index.spec.ts +++ b/src/lib/waku_relay/index.spec.ts @@ -8,7 +8,7 @@ import { NimWaku } from '../../test_utils/nim_waku'; import Waku from '../waku'; import { WakuMessage } from '../waku_message'; -import { CODEC, TOPIC } from './index'; +import { RelayCodec, RelayDefaultTopic } from './index'; describe('Waku Relay', () => { afterEach(function () { @@ -61,8 +61,8 @@ describe('Waku Relay', () => { }); it('Subscribe', async function () { - const subscribers1 = waku1.libp2p.pubsub.getSubscribers(TOPIC); - const subscribers2 = waku2.libp2p.pubsub.getSubscribers(TOPIC); + const subscribers1 = waku1.libp2p.pubsub.getSubscribers(RelayDefaultTopic); + const subscribers2 = waku2.libp2p.pubsub.getSubscribers(RelayDefaultTopic); expect(subscribers1).to.contain(waku2.libp2p.peerId.toB58String()); expect(subscribers2).to.contain(waku1.libp2p.peerId.toB58String()); @@ -71,7 +71,7 @@ describe('Waku Relay', () => { it('Register correct protocols', async function () { const protocols = Array.from(waku1.libp2p.upgrader.protocols.keys()); - expect(protocols).to.contain(CODEC); + expect(protocols).to.contain(RelayCodec); expect(protocols.findIndex((value) => value.match(/sub/))).to.eq(-1); }); @@ -124,7 +124,9 @@ describe('Waku Relay', () => { it('nim subscribes to js', async function () { const nimPeerId = await nimWaku.getPeerId(); - const subscribers = waku.libp2p.pubsub.getSubscribers(TOPIC); + const subscribers = waku.libp2p.pubsub.getSubscribers( + RelayDefaultTopic + ); expect(subscribers).to.contain(nimPeerId.toB58String()); }); @@ -196,7 +198,9 @@ describe('Waku Relay', () => { }); it('nim subscribes to js', async function () { - const subscribers = waku.libp2p.pubsub.getSubscribers(TOPIC); + const subscribers = waku.libp2p.pubsub.getSubscribers( + RelayDefaultTopic + ); const nimPeerId = await nimWaku.getPeerId(); expect(subscribers).to.contain(nimPeerId.toB58String()); @@ -265,7 +269,9 @@ describe('Waku Relay', () => { }); it('nim subscribes to js', async function () { - const subscribers = waku.libp2p.pubsub.getSubscribers(TOPIC); + const subscribers = waku.libp2p.pubsub.getSubscribers( + RelayDefaultTopic + ); const nimPeerId = await nimWaku.getPeerId(); expect(subscribers).to.contain(nimPeerId.toB58String()); @@ -381,7 +387,7 @@ describe('Waku Relay', () => { function waitForNextData(pubsub: Pubsub): Promise { return new Promise((resolve) => { - pubsub.once(TOPIC, resolve); + pubsub.once(RelayDefaultTopic, resolve); }).then((msg: any) => { return WakuMessage.decode(msg.data); }); diff --git a/src/lib/waku_relay/index.ts b/src/lib/waku_relay/index.ts index cd507ed797..7b1789db11 100644 --- a/src/lib/waku_relay/index.ts +++ b/src/lib/waku_relay/index.ts @@ -6,17 +6,13 @@ import { SignaturePolicy } from 'libp2p-interfaces/src/pubsub/signature-policy'; import { WakuMessage } from '../waku_message'; +import { RelayCodec, RelayDefaultTopic } from './constants'; import { getWakuPeers } from './get_waku_peers'; import { RelayHeartbeat } from './relay_heartbeat'; export * from './constants'; export * from './relay_heartbeat'; -export const CODEC = '/vac/waku/relay/2.0.0-beta2'; - -// As per waku specs, the topic is fixed, value taken from nim-waku -export const TOPIC = '/waku/2/default-waku/proto'; - // This is the class to pass to libp2p as pubsub protocol export class WakuRelayPubsub extends Gossipsub { heartbeat: RelayHeartbeat; @@ -34,7 +30,7 @@ export class WakuRelayPubsub extends Gossipsub { this.heartbeat = new RelayHeartbeat(this); - const multicodecs = [CODEC]; + const multicodecs = [RelayCodec]; // This is the downside of using `libp2p-gossipsub` instead of // implementing WakuRelay from scratch. @@ -175,11 +171,11 @@ export class WakuRelay { // At this stage we are always using the same topic so we do not pass it as a parameter async subscribe() { - await this.pubsub.subscribe(TOPIC); + await this.pubsub.subscribe(RelayDefaultTopic); } async publish(message: WakuMessage) { const msg = message.toBinary(); - await this.pubsub.publish(TOPIC, msg); + await this.pubsub.publish(RelayDefaultTopic, msg); } } diff --git a/src/test_utils/nim_waku.ts b/src/test_utils/nim_waku.ts index 11bd4030fc..811c1eeb5a 100644 --- a/src/test_utils/nim_waku.ts +++ b/src/test_utils/nim_waku.ts @@ -8,7 +8,7 @@ import multiaddr from 'multiaddr'; import PeerId from 'peer-id'; import { WakuMessage } from '../lib/waku_message'; -import { TOPIC } from '../lib/waku_relay'; +import { RelayDefaultTopic } from '../lib/waku_relay'; import { existsAsync, mkdirAsync, openAsync } from './async_fs'; import waitForLine from './log_file'; @@ -145,7 +145,7 @@ export class NimWaku { }; const res = await this.rpcCall('post_waku_v2_relay_v1_message', [ - TOPIC, + RelayDefaultTopic, rpcMessage, ]); @@ -155,7 +155,9 @@ export class NimWaku { async messages() { this.checkProcess(); - const res = await this.rpcCall('get_waku_v2_relay_v1_messages', [TOPIC]); + const res = await this.rpcCall('get_waku_v2_relay_v1_messages', [ + RelayDefaultTopic, + ]); return res.result; } From b487e7803b4183415ec429a500a362e2e1b6526b Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Thu, 1 Apr 2021 16:44:28 +1100 Subject: [PATCH 6/8] Returns waku relay peers, not all waku peers across the various protocols --- .../{get_waku_peers.ts => get_relay_peers.ts} | 2 +- src/lib/waku_relay/index.ts | 8 ++++---- src/lib/waku_relay/relay_heartbeat.ts | 10 +++++----- 3 files changed, 10 insertions(+), 10 deletions(-) rename src/lib/waku_relay/{get_waku_peers.ts => get_relay_peers.ts} (97%) diff --git a/src/lib/waku_relay/get_waku_peers.ts b/src/lib/waku_relay/get_relay_peers.ts similarity index 97% rename from src/lib/waku_relay/get_waku_peers.ts rename to src/lib/waku_relay/get_relay_peers.ts index cda6067db3..e31a5280f3 100644 --- a/src/lib/waku_relay/get_waku_peers.ts +++ b/src/lib/waku_relay/get_relay_peers.ts @@ -13,7 +13,7 @@ import { RelayCodec, WakuRelayPubsub } from './index'; * @returns {Set} * */ -export function getWakuPeers( +export function getRelayPeers( router: WakuRelayPubsub, topic: string, count: number, diff --git a/src/lib/waku_relay/index.ts b/src/lib/waku_relay/index.ts index 7b1789db11..09bc941033 100644 --- a/src/lib/waku_relay/index.ts +++ b/src/lib/waku_relay/index.ts @@ -7,7 +7,7 @@ import { SignaturePolicy } from 'libp2p-interfaces/src/pubsub/signature-policy'; import { WakuMessage } from '../waku_message'; import { RelayCodec, RelayDefaultTopic } from './constants'; -import { getWakuPeers } from './get_waku_peers'; +import { getRelayPeers } from './get_relay_peers'; import { RelayHeartbeat } from './relay_heartbeat'; export * from './constants'; @@ -59,7 +59,7 @@ export class WakuRelayPubsub extends Gossipsub { }); if (fanoutPeers.size < this._options.D) { // we need more peers; eager, as this would get fixed in the next heartbeat - getWakuPeers( + getRelayPeers( this, topic, this._options.D - fanoutPeers.size, @@ -77,7 +77,7 @@ export class WakuRelayPubsub extends Gossipsub { this.fanout.delete(topic); this.lastpub.delete(topic); } else { - const peers = getWakuPeers( + const peers = getRelayPeers( this, topic, this._options.D, @@ -132,7 +132,7 @@ export class WakuRelayPubsub extends Gossipsub { meshPeers = this.fanout.get(topic); if (!meshPeers) { // If we are not in the fanout, then pick peers in topic above the publishThreshold - const peers = getWakuPeers(this, topic, this._options.D, (id) => { + const peers = getRelayPeers(this, topic, this._options.D, (id) => { return ( this.score.score(id) >= this._options.scoreThresholds.publishThreshold diff --git a/src/lib/waku_relay/relay_heartbeat.ts b/src/lib/waku_relay/relay_heartbeat.ts index 0842bfee6a..7d418e945e 100644 --- a/src/lib/waku_relay/relay_heartbeat.ts +++ b/src/lib/waku_relay/relay_heartbeat.ts @@ -3,7 +3,7 @@ import { Heartbeat } from 'libp2p-gossipsub/src/heartbeat'; import { shuffle } from 'libp2p-gossipsub/src/utils'; import * as constants from './constants'; -import { getWakuPeers } from './get_waku_peers'; +import { getRelayPeers } from './get_relay_peers'; export class RelayHeartbeat extends Heartbeat { /** @@ -152,7 +152,7 @@ export class RelayHeartbeat extends Heartbeat { if (peers.size < Dlo) { const backoff = this.gossipsub.backoff.get(topic); const ineed = D - peers.size; - const peersSet = getWakuPeers( + const peersSet = getRelayPeers( this.gossipsub, topic, ineed, @@ -239,7 +239,7 @@ export class RelayHeartbeat extends Heartbeat { if (outbound < Dout) { const ineed = Dout - outbound; const backoff = this.gossipsub.backoff.get(topic); - getWakuPeers(this.gossipsub, topic, ineed, (id: string): boolean => { + getRelayPeers(this.gossipsub, topic, ineed, (id: string): boolean => { // filter our current mesh peers, direct peers, peers we are backing off, peers with negative score return ( !peers.has(id) && @@ -278,7 +278,7 @@ export class RelayHeartbeat extends Heartbeat { this.gossipsub._options.scoreThresholds.opportunisticGraftThreshold ) { const backoff = this.gossipsub.backoff.get(topic); - const peersToGraft = getWakuPeers( + const peersToGraft = getRelayPeers( this.gossipsub, topic, constants.RelayOpportunisticGraftPeers, @@ -334,7 +334,7 @@ export class RelayHeartbeat extends Heartbeat { // do we need more peers? if (fanoutPeers.size < D) { const ineed = D - fanoutPeers.size; - const peersSet = getWakuPeers( + const peersSet = getRelayPeers( this.gossipsub, topic, ineed, From ff9bfa7f0a0068869e1b2fd5037838b6adb4f399 Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Thu, 1 Apr 2021 16:49:35 +1100 Subject: [PATCH 7/8] Override gossipsub function that checked for gossipsub peers Instead, we need to check for waku relay peers. --- .cspell.json | 1 + src/lib/waku_relay/constants.ts | 34 ++++++++ src/lib/waku_relay/index.ts | 138 +++++++++++++++++++++++++++++++- 3 files changed, 171 insertions(+), 2 deletions(-) diff --git a/.cspell.json b/.cspell.json index dcdc79e7f6..a0c0c0293c 100644 --- a/.cspell.json +++ b/.cspell.json @@ -13,6 +13,7 @@ "codecov", "commitlint", "dependabot", + "Dlazy", "Dout", "Dscore", "editorconfig", diff --git a/src/lib/waku_relay/constants.ts b/src/lib/waku_relay/constants.ts index c63d45459e..cba56b1b7c 100644 --- a/src/lib/waku_relay/constants.ts +++ b/src/lib/waku_relay/constants.ts @@ -11,6 +11,13 @@ export const RelayCodec = '/vac/waku/relay/2.0.0-beta2'; */ export const RelayDefaultTopic = '/waku/2/default-waku/proto'; +/** + * RelayGossipFactor affects how many peers we will emit gossip to at each heartbeat. + * We will send gossip to RelayGossipFactor * (total number of non-mesh peers), or + * RelayDlazy, whichever is greater. + */ +export declare const RelayGossipFactor = 0.25; + /** * GossipsubHeartbeatInitialDelay is the short delay before the heartbeat timer begins * after the router is initialized. @@ -22,6 +29,24 @@ export const RelayHeartbeatInitialDelay = 100; */ export const RelayHeartbeatInterval = second; +/** + * RelayPrunePeers controls the number of peers to include in prune Peer eXchange. + * When we prune a peer that's eligible for PX (has a good score, etc), we will try to + * send them signed peer records for up to RelayPrunePeers other peers that we + * know of. + */ +export const RelayPrunePeers = 16; + +/** + * RelayPruneBackoff controls the backoff time for pruned peers. This is how long + * a peer must wait before attempting to graft into our mesh again after being pruned. + * When pruning a peer, we send them our value of RelayPruneBackoff so they know + * the minimum time to wait. Peers running older versions may not send a backoff time, + * so if we receive a prune message without one, we will wait at least RelayPruneBackoff + * before attempting to re-graft. + */ +export const RelayPruneBackoff = minute; + /** * RelayFanoutTTL controls how long we keep track of the fanout state. If it's been * RelayFanoutTTL since we've published to a topic that we're not subscribed to, @@ -41,3 +66,12 @@ export const RelayOpportunisticGraftTicks = 60; * RelayOpportunisticGraftPeers is the number of peers to opportunistically graft. */ export const RelayOpportunisticGraftPeers = 2; + +/** + * RelayMaxIHaveLength is the maximum number of messages to include in an IHAVE message. + * Also controls the maximum number of IHAVE ids we will accept and request with IWANT from a + * peer within a heartbeat, to protect from IHAVE floods. You should adjust this value from the + * default if your system is pushing more than 5000 messages in GossipsubHistoryGossip heartbeats; + * with the defaults this is 1666 messages/s. + */ +export declare const RelayMaxIHaveLength = 5000; diff --git a/src/lib/waku_relay/index.ts b/src/lib/waku_relay/index.ts index 09bc941033..786a18c23d 100644 --- a/src/lib/waku_relay/index.ts +++ b/src/lib/waku_relay/index.ts @@ -1,12 +1,25 @@ import Gossipsub from 'libp2p-gossipsub'; import { Libp2p } from 'libp2p-gossipsub/src/interfaces'; -import { createGossipRpc, messageIdToString } from 'libp2p-gossipsub/src/utils'; +import { ControlPrune, PeerInfo } from 'libp2p-gossipsub/src/message'; +import { + createGossipRpc, + messageIdToString, + shuffle, +} from 'libp2p-gossipsub/src/utils'; import Pubsub, { InMessage } from 'libp2p-interfaces/src/pubsub'; import { SignaturePolicy } from 'libp2p-interfaces/src/pubsub/signature-policy'; +import PeerId from 'peer-id'; import { WakuMessage } from '../waku_message'; -import { RelayCodec, RelayDefaultTopic } from './constants'; +import { + RelayCodec, + RelayDefaultTopic, + RelayGossipFactor, + RelayMaxIHaveLength, + RelayPruneBackoff, + RelayPrunePeers, +} from './constants'; import { getRelayPeers } from './get_relay_peers'; import { RelayHeartbeat } from './relay_heartbeat'; @@ -163,6 +176,127 @@ export class WakuRelayPubsub extends Gossipsub { this._sendRpc(id, rpc); }); } + + /** + * Emits gossip to peers in a particular topic + * @param {string} topic + * @param {Set} exclude peers to exclude + * @returns {void} + */ + _emitGossip(topic: string, exclude: Set): void { + const messageIDs = this.messageCache.getGossipIDs(topic); + if (!messageIDs.length) { + return; + } + + // shuffle to emit in random order + shuffle(messageIDs); + + // if we are emitting more than GossipsubMaxIHaveLength ids, truncate the list + if (messageIDs.length > RelayMaxIHaveLength) { + // we do the truncation (with shuffling) per peer below + this.log( + 'too many messages for gossip; will truncate IHAVE list (%d messages)', + messageIDs.length + ); + } + + // Send gossip to GossipFactor peers above threshold with a minimum of D_lazy + // First we collect the peers above gossipThreshold that are not in the exclude set + // and then randomly select from that set + // We also exclude direct peers, as there is no reason to emit gossip to them + const peersToGossip: string[] = []; + const topicPeers = this.topics.get(topic); + if (!topicPeers) { + // no topic peers, no gossip + return; + } + topicPeers.forEach((id) => { + const peerStreams = this.peers.get(id); + if (!peerStreams) { + return; + } + if ( + !exclude.has(id) && + !this.direct.has(id) && + peerStreams.protocol == RelayCodec && + this.score.score(id) >= this._options.scoreThresholds.gossipThreshold + ) { + peersToGossip.push(id); + } + }); + + let target = this._options.Dlazy; + const factor = RelayGossipFactor * peersToGossip.length; + if (factor > target) { + target = factor; + } + if (target > peersToGossip.length) { + target = peersToGossip.length; + } else { + shuffle(peersToGossip); + } + // Emit the IHAVE gossip to the selected peers up to the target + peersToGossip.slice(0, target).forEach((id) => { + let peerMessageIDs = messageIDs; + if (messageIDs.length > RelayMaxIHaveLength) { + // shuffle and slice message IDs per peer so that we emit a different set for each peer + // we have enough redundancy in the system that this will significantly increase the message + // coverage when we do truncate + peerMessageIDs = shuffle(peerMessageIDs.slice()).slice( + 0, + RelayMaxIHaveLength + ); + } + this._pushGossip(id, { + topicID: topic, + messageIDs: peerMessageIDs, + }); + }); + } + + /** + * Make a PRUNE control message for a peer in a topic + * @param {string} id + * @param {string} topic + * @param {boolean} doPX + * @returns {ControlPrune} + */ + _makePrune(id: string, topic: string, doPX: boolean): ControlPrune { + // backoff is measured in seconds + // RelayPruneBackoff is measured in milliseconds + const backoff = RelayPruneBackoff / 1000; + const px: PeerInfo[] = []; + if (doPX) { + // select peers for Peer eXchange + const peers = getRelayPeers( + this, + topic, + RelayPrunePeers, + (xid: string): boolean => { + return xid !== id && this.score.score(xid) >= 0; + } + ); + peers.forEach((p) => { + // see if we have a signed record to send back; if we don't, just send + // the peer ID and let the pruned peer find them in the DHT -- we can't trust + // unsigned address records through PX anyways + // Finding signed records in the DHT is not supported at the time of writing in js-libp2p + const peerId = PeerId.createFromB58String(p); + px.push({ + peerID: peerId.toBytes(), + signedPeerRecord: this._libp2p.peerStore.addressBook.getRawEnvelope( + peerId + ), + }); + }); + } + return { + topicID: topic, + peers: px, + backoff: backoff, + }; + } } // This class provides an interface to execute the waku relay protocol From ff8e96b60ca49bb4248416d997f86ab99c8a394c Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Thu, 1 Apr 2021 18:27:15 +1100 Subject: [PATCH 8/8] Clean up imports --- src/chat/index.ts | 2 +- src/lib/waku.spec.ts | 4 +--- src/lib/waku_relay/index.ts | 29 +++++++++++------------------ src/test_utils/index.ts | 5 +++++ 4 files changed, 18 insertions(+), 22 deletions(-) create mode 100644 src/test_utils/index.ts diff --git a/src/chat/index.ts b/src/chat/index.ts index e5760b00b8..2b079380a2 100644 --- a/src/chat/index.ts +++ b/src/chat/index.ts @@ -4,7 +4,7 @@ import util from 'util'; import Waku from '../lib/waku'; import { WakuMessage } from '../lib/waku_message'; import { RelayDefaultTopic } from '../lib/waku_relay'; -import { delay } from '../test_utils/delay'; +import { delay } from '../test_utils/'; import { ChatMessage } from './chat_message'; diff --git a/src/lib/waku.spec.ts b/src/lib/waku.spec.ts index 6f906e0e80..836faef89b 100644 --- a/src/lib/waku.spec.ts +++ b/src/lib/waku.spec.ts @@ -1,8 +1,6 @@ import { expect } from 'chai'; -import { NOISE_KEY_1 } from '../test_utils/constants'; -import { makeLogFileName } from '../test_utils/log_file'; -import { NimWaku } from '../test_utils/nim_waku'; +import { makeLogFileName, NimWaku, NOISE_KEY_1 } from '../test_utils/'; import Waku from './waku'; import { RelayCodec } from './waku_relay'; diff --git a/src/lib/waku_relay/index.ts b/src/lib/waku_relay/index.ts index 786a18c23d..286aa874f8 100644 --- a/src/lib/waku_relay/index.ts +++ b/src/lib/waku_relay/index.ts @@ -12,14 +12,7 @@ import PeerId from 'peer-id'; import { WakuMessage } from '../waku_message'; -import { - RelayCodec, - RelayDefaultTopic, - RelayGossipFactor, - RelayMaxIHaveLength, - RelayPruneBackoff, - RelayPrunePeers, -} from './constants'; +import * as constants from './constants'; import { getRelayPeers } from './get_relay_peers'; import { RelayHeartbeat } from './relay_heartbeat'; @@ -43,7 +36,7 @@ export class WakuRelayPubsub extends Gossipsub { this.heartbeat = new RelayHeartbeat(this); - const multicodecs = [RelayCodec]; + const multicodecs = [constants.RelayCodec]; // This is the downside of using `libp2p-gossipsub` instead of // implementing WakuRelay from scratch. @@ -193,7 +186,7 @@ export class WakuRelayPubsub extends Gossipsub { shuffle(messageIDs); // if we are emitting more than GossipsubMaxIHaveLength ids, truncate the list - if (messageIDs.length > RelayMaxIHaveLength) { + if (messageIDs.length > constants.RelayMaxIHaveLength) { // we do the truncation (with shuffling) per peer below this.log( 'too many messages for gossip; will truncate IHAVE list (%d messages)', @@ -219,7 +212,7 @@ export class WakuRelayPubsub extends Gossipsub { if ( !exclude.has(id) && !this.direct.has(id) && - peerStreams.protocol == RelayCodec && + peerStreams.protocol == constants.RelayCodec && this.score.score(id) >= this._options.scoreThresholds.gossipThreshold ) { peersToGossip.push(id); @@ -227,7 +220,7 @@ export class WakuRelayPubsub extends Gossipsub { }); let target = this._options.Dlazy; - const factor = RelayGossipFactor * peersToGossip.length; + const factor = constants.RelayGossipFactor * peersToGossip.length; if (factor > target) { target = factor; } @@ -239,13 +232,13 @@ export class WakuRelayPubsub extends Gossipsub { // Emit the IHAVE gossip to the selected peers up to the target peersToGossip.slice(0, target).forEach((id) => { let peerMessageIDs = messageIDs; - if (messageIDs.length > RelayMaxIHaveLength) { + if (messageIDs.length > constants.RelayMaxIHaveLength) { // shuffle and slice message IDs per peer so that we emit a different set for each peer // we have enough redundancy in the system that this will significantly increase the message // coverage when we do truncate peerMessageIDs = shuffle(peerMessageIDs.slice()).slice( 0, - RelayMaxIHaveLength + constants.RelayMaxIHaveLength ); } this._pushGossip(id, { @@ -265,14 +258,14 @@ export class WakuRelayPubsub extends Gossipsub { _makePrune(id: string, topic: string, doPX: boolean): ControlPrune { // backoff is measured in seconds // RelayPruneBackoff is measured in milliseconds - const backoff = RelayPruneBackoff / 1000; + const backoff = constants.RelayPruneBackoff / 1000; const px: PeerInfo[] = []; if (doPX) { // select peers for Peer eXchange const peers = getRelayPeers( this, topic, - RelayPrunePeers, + constants.RelayPrunePeers, (xid: string): boolean => { return xid !== id && this.score.score(xid) >= 0; } @@ -305,11 +298,11 @@ export class WakuRelay { // At this stage we are always using the same topic so we do not pass it as a parameter async subscribe() { - await this.pubsub.subscribe(RelayDefaultTopic); + await this.pubsub.subscribe(constants.RelayDefaultTopic); } async publish(message: WakuMessage) { const msg = message.toBinary(); - await this.pubsub.publish(RelayDefaultTopic, msg); + await this.pubsub.publish(constants.RelayDefaultTopic, msg); } } diff --git a/src/test_utils/index.ts b/src/test_utils/index.ts new file mode 100644 index 0000000000..62a0c6e5aa --- /dev/null +++ b/src/test_utils/index.ts @@ -0,0 +1,5 @@ +export * from './async_fs'; +export * from './constants'; +export * from './delay'; +export * from './log_file'; +export * from './nim_waku';