diff --git a/.cspell.json b/.cspell.json index 8ed6e3062c..dcdc79e7f6 100644 --- a/.cspell.json +++ b/.cspell.json @@ -3,6 +3,8 @@ "$schema": "https://raw.githubusercontent.com/streetsidesoftware/cspell/master/cspell.schema.json", "language": "en", "words": [ + "backoff", + "backoffs", "bitjson", "bitauth", "bufbuild", @@ -11,6 +13,8 @@ "codecov", "commitlint", "dependabot", + "Dout", + "Dscore", "editorconfig", "esnext", "execa", @@ -18,6 +22,11 @@ "fanout", "globby", "gossipsub", + "iasked", + "ihave", + "ihaves", + "ineed", + "iwant", "lastpub", "libauth", "libp", @@ -28,6 +37,7 @@ "mplex", "muxer", "nodekey", + "peerhave", "prettierignore", "protobuf", "protoc", @@ -38,6 +48,7 @@ "submodules", "transpiled", "typedoc", + "unmounts", "untracked", "upgrader", "waku", diff --git a/src/lib/waku_relay.ts b/src/lib/waku_relay.ts index 7e10757f71..87d2727281 100644 --- a/src/lib/waku_relay.ts +++ b/src/lib/waku_relay.ts @@ -6,6 +6,7 @@ import { SignaturePolicy } from 'libp2p-interfaces/src/pubsub/signature-policy'; import { getWakuPeers } from './get_waku_peers'; import { WakuMessage } from './waku_message'; +import { RelayHeartbeat } from './waku_relay/relay_heartbeat'; export const CODEC = '/vac/waku/relay/2.0.0-beta2'; @@ -14,6 +15,8 @@ export const TOPIC = '/waku/2/default-waku/proto'; // This is the class to pass to libp2p as pubsub protocol export class WakuRelayPubsub extends Gossipsub { + heartbeat: RelayHeartbeat; + /** * * @param libp2p: Libp2p @@ -25,6 +28,8 @@ export class WakuRelayPubsub extends Gossipsub { globalSignaturePolicy: SignaturePolicy.StrictNoSign, }); + this.heartbeat = new RelayHeartbeat(this); + const multicodecs = [CODEC]; // This is the downside of using `libp2p-gossipsub` instead of diff --git a/src/lib/waku_relay/constants.ts b/src/lib/waku_relay/constants.ts new file mode 100644 index 0000000000..515f2e4c4f --- /dev/null +++ b/src/lib/waku_relay/constants.ts @@ -0,0 +1,33 @@ +export const second = 1000; +export const minute = 60 * second; + +/** + * GossipsubHeartbeatInitialDelay is the short delay before the heartbeat timer begins + * after the router is initialized. + */ +export const RelayHeartbeatInitialDelay = 100; + +/** + * RelayHeartbeatInterval controls the time between heartbeats. + */ +export const RelayHeartbeatInterval = second; + +/** + * RelayFanoutTTL controls how long we keep track of the fanout state. If it's been + * RelayFanoutTTL since we've published to a topic that we're not subscribed to, + * we'll delete the fanout map for that topic. + */ +export const RelayFanoutTTL = minute; + +/** + * RelayOpportunisticGraftTicks is the number of heartbeat ticks for attempting to improve the mesh + * with opportunistic grafting. Every RelayOpportunisticGraftTicks we will attempt to select some + * high-scoring mesh peers to replace lower-scoring ones, if the median score of our mesh peers falls + * below a threshold + */ +export const RelayOpportunisticGraftTicks = 60; + +/** + * RelayOpportunisticGraftPeers is the number of peers to opportunistically graft. + */ +export const RelayOpportunisticGraftPeers = 2; diff --git a/src/lib/waku_relay/index.ts b/src/lib/waku_relay/index.ts new file mode 100644 index 0000000000..e93d868e54 --- /dev/null +++ b/src/lib/waku_relay/index.ts @@ -0,0 +1,2 @@ +export * from './constants'; +export * from './relay_heartbeat'; diff --git a/src/lib/waku_relay/relay_heartbeat.ts b/src/lib/waku_relay/relay_heartbeat.ts new file mode 100644 index 0000000000..a910a56f8b --- /dev/null +++ b/src/lib/waku_relay/relay_heartbeat.ts @@ -0,0 +1,368 @@ +import Gossipsub from 'libp2p-gossipsub'; +import { Heartbeat } from 'libp2p-gossipsub/src/heartbeat'; +import { shuffle } from 'libp2p-gossipsub/src/utils'; + +import { getWakuPeers } from '../get_waku_peers'; + +import * as constants from './constants'; + +export class RelayHeartbeat extends Heartbeat { + /** + * @param {Object} gossipsub + * @constructor + */ + constructor(gossipsub: Gossipsub) { + super(gossipsub); + } + + start(): void { + if (this._heartbeatTimer) { + return; + } + + const heartbeat = this._heartbeat.bind(this); + + const timeout = setTimeout(() => { + heartbeat(); + this._heartbeatTimer!.runPeriodically( + heartbeat, + constants.RelayHeartbeatInterval + ); + }, constants.RelayHeartbeatInitialDelay); + + this._heartbeatTimer = { + _intervalId: undefined, + runPeriodically: (fn, period) => { + this._heartbeatTimer!._intervalId = setInterval(fn, period); + }, + cancel: () => { + clearTimeout(timeout); + clearInterval(this._heartbeatTimer!._intervalId as NodeJS.Timeout); + }, + }; + } + + /** + * Unmounts the gossipsub protocol and shuts down every connection + * @override + * @returns {void} + */ + stop(): void { + if (!this._heartbeatTimer) { + return; + } + + this._heartbeatTimer.cancel(); + this._heartbeatTimer = null; + } + + /** + * Maintains the mesh and fanout maps in gossipsub. + * + * @returns {void} + */ + _heartbeat(): void { + const { D, Dlo, Dhi, Dscore, Dout } = this.gossipsub._options; + this.gossipsub.heartbeatTicks++; + + // cache scores through the heartbeat + const scores = new Map(); + const getScore = (id: string): number => { + let s = scores.get(id); + if (s === undefined) { + s = this.gossipsub.score.score(id); + scores.set(id, s); + } + return s; + }; + + // peer id => topic[] + const toGraft = new Map(); + // peer id => topic[] + const toPrune = new Map(); + // peer id => don't px + const noPX = new Map(); + + // clean up expired backoffs + this.gossipsub._clearBackoff(); + + // clean up peerhave/iasked counters + this.gossipsub.peerhave.clear(); + this.gossipsub.iasked.clear(); + + // apply IWANT request penalties + this.gossipsub._applyIwantPenalties(); + + // ensure direct peers are connected + this.gossipsub._directConnect(); + + // maintain the mesh for topics we have joined + this.gossipsub.mesh.forEach((peers, topic) => { + // prune/graft helper functions (defined per topic) + const prunePeer = (id: string): void => { + this.gossipsub.log( + 'HEARTBEAT: Remove mesh link to %s in %s', + id, + topic + ); + // update peer score + this.gossipsub.score.prune(id, topic); + // add prune backoff record + this.gossipsub._addBackoff(id, topic); + // remove peer from mesh + peers.delete(id); + // add to toPrune + const topics = toPrune.get(id); + if (!topics) { + toPrune.set(id, [topic]); + } else { + topics.push(topic); + } + }; + const graftPeer = (id: string): void => { + this.gossipsub.log('HEARTBEAT: Add mesh link to %s in %s', id, topic); + // update peer score + this.gossipsub.score.graft(id, topic); + // add peer to mesh + peers.add(id); + // add to toGraft + const topics = toGraft.get(id); + if (!topics) { + toGraft.set(id, [topic]); + } else { + topics.push(topic); + } + }; + + // drop all peers with negative score, without PX + peers.forEach((id) => { + const score = getScore(id); + if (score < 0) { + this.gossipsub.log( + 'HEARTBEAT: Prune peer %s with negative score: score=%d, topic=%s', + id, + score, + topic + ); + prunePeer(id); + noPX.set(id, true); + } + }); + + // do we have enough peers? + if (peers.size < Dlo) { + const backoff = this.gossipsub.backoff.get(topic); + const ineed = D - peers.size; + const peersSet = getWakuPeers(this.gossipsub, topic, ineed, (id) => { + // filter out mesh peers, direct peers, peers we are backing off, peers with negative score + return ( + !peers.has(id) && + !this.gossipsub.direct.has(id) && + (!backoff || !backoff.has(id)) && + getScore(id) >= 0 + ); + }); + + peersSet.forEach(graftPeer); + } + + // do we have to many peers? + if (peers.size > Dhi) { + let peersArray = Array.from(peers); + // sort by score + peersArray.sort((a, b) => getScore(b) - getScore(a)); + // We keep the first D_score peers by score and the remaining up to D randomly + // under the constraint that we keep D_out peers in the mesh (if we have that many) + peersArray = peersArray + .slice(0, Dscore) + .concat(shuffle(peersArray.slice(Dscore))); + + // count the outbound peers we are keeping + let outbound = 0; + peersArray.slice(0, D).forEach((p) => { + if (this.gossipsub.outbound.get(p)) { + outbound++; + } + }); + + // if it's less than D_out, bubble up some outbound peers from the random selection + if (outbound < Dout) { + const rotate = (i: number): void => { + // rotate the peersArray to the right and put the ith peer in the front + const p = peersArray[i]; + for (let j = i; j > 0; j--) { + peersArray[j] = peersArray[j - 1]; + } + peersArray[0] = p; + }; + + // first bubble up all outbound peers already in the selection to the front + if (outbound > 0) { + let ihave = outbound; + for (let i = 1; i < D && ihave > 0; i++) { + if (this.gossipsub.outbound.get(peersArray[i])) { + rotate(i); + ihave--; + } + } + } + + // now bubble up enough outbound peers outside the selection to the front + let ineed = D - outbound; + for (let i = D; i < peersArray.length && ineed > 0; i++) { + if (this.gossipsub.outbound.get(peersArray[i])) { + rotate(i); + ineed--; + } + } + } + + // prune the excess peers + peersArray.slice(D).forEach(prunePeer); + } + + // do we have enough outbound peers? + if (peers.size >= Dlo) { + // count the outbound peers we have + let outbound = 0; + peers.forEach((p) => { + if (this.gossipsub.outbound.get(p)) { + outbound++; + } + }); + + // if it's less than D_out, select some peers with outbound connections and graft them + if (outbound < Dout) { + const ineed = Dout - outbound; + const backoff = this.gossipsub.backoff.get(topic); + getWakuPeers(this.gossipsub, topic, ineed, (id: string): boolean => { + // filter our current mesh peers, direct peers, peers we are backing off, peers with negative score + return ( + !peers.has(id) && + !this.gossipsub.direct.has(id) && + (!backoff || !backoff.has(id)) && + getScore(id) >= 0 + ); + }).forEach(graftPeer); + } + } + + // should we try to improve the mesh with opportunistic grafting? + if ( + this.gossipsub.heartbeatTicks % + constants.RelayOpportunisticGraftTicks === + 0 && + peers.size > 1 + ) { + // Opportunistic grafting works as follows: we check the median score of peers in the + // mesh; if this score is below the opportunisticGraftThreshold, we select a few peers at + // random with score over the median. + // The intention is to (slowly) improve an under performing mesh by introducing good + // scoring peers that may have been gossiping at us. This allows us to get out of sticky + // situations where we are stuck with poor peers and also recover from churn of good peers. + + // now compute the median peer score in the mesh + const peersList = Array.from(peers).sort( + (a, b) => getScore(a) - getScore(b) + ); + const medianIndex = peers.size / 2; + const medianScore = getScore(peersList[medianIndex]); + + // if the median score is below the threshold, select a better peer (if any) and GRAFT + if ( + medianScore < + this.gossipsub._options.scoreThresholds.opportunisticGraftThreshold + ) { + const backoff = this.gossipsub.backoff.get(topic); + const peersToGraft = getWakuPeers( + this.gossipsub, + topic, + constants.RelayOpportunisticGraftPeers, + (id: string): boolean => { + // filter out current mesh peers, direct peers, peers we are backing off, peers below or at threshold + return ( + peers.has(id) && + !this.gossipsub.direct.has(id) && + (!backoff || !backoff.has(id)) && + getScore(id) > medianScore + ); + } + ); + peersToGraft.forEach((id) => { + this.gossipsub.log( + 'HEARTBEAT: Opportunistically graft peer %s on topic %s', + id, + topic + ); + graftPeer(id); + }); + } + } + + // 2nd arg are mesh peers excluded from gossip. We have already pushed + // messages to them, so its redundant to gossip IHAVEs. + this.gossipsub._emitGossip(topic, peers); + }); + + // expire fanout for topics we haven't published to in a while + const now = this.gossipsub._now(); + this.gossipsub.lastpub.forEach((lastpub, topic) => { + if (lastpub + constants.RelayFanoutTTL < now) { + this.gossipsub.fanout.delete(topic); + this.gossipsub.lastpub.delete(topic); + } + }); + + // maintain our fanout for topics we are publishing but we have not joined + this.gossipsub.fanout.forEach((fanoutPeers, topic) => { + // checks whether our peers are still in the topic and have a score above the publish threshold + const topicPeers = this.gossipsub.topics.get(topic); + fanoutPeers.forEach((id) => { + if ( + !topicPeers!.has(id) || + getScore(id) < + this.gossipsub._options.scoreThresholds.publishThreshold + ) { + fanoutPeers.delete(id); + } + }); + + // do we need more peers? + if (fanoutPeers.size < D) { + const ineed = D - fanoutPeers.size; + const peersSet = getWakuPeers( + this.gossipsub, + topic, + ineed, + (id: string): boolean => { + // filter out existing fanout peers, direct peers, and peers with score above the publish threshold + return ( + !fanoutPeers.has(id) && + !this.gossipsub.direct.has(id) && + getScore(id) >= + this.gossipsub._options.scoreThresholds.publishThreshold + ); + } + ); + peersSet.forEach((id) => { + fanoutPeers.add(id); + }); + } + + // 2nd arg are fanout peers excluded from gossip. + // We have already pushed messages to them, so its redundant to gossip IHAVEs + this.gossipsub._emitGossip(topic, fanoutPeers); + }); + + // send coalesced GRAFT/PRUNE messages (will piggyback gossip) + this.gossipsub._sendGraftPrune(toGraft, toPrune, noPX); + + // flush pending gossip that wasn't piggybacked above + this.gossipsub._flush(); + + // advance the message history window + this.gossipsub.messageCache.shift(); + + this.gossipsub.emit('gossipsub:heartbeat'); + } +}