From ff9bfa7f0a0068869e1b2fd5037838b6adb4f399 Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Thu, 1 Apr 2021 16:49:35 +1100 Subject: [PATCH] Override gossipsub function that checked for gossipsub peers Instead, we need to check for waku relay peers. --- .cspell.json | 1 + src/lib/waku_relay/constants.ts | 34 ++++++++ src/lib/waku_relay/index.ts | 138 +++++++++++++++++++++++++++++++- 3 files changed, 171 insertions(+), 2 deletions(-) diff --git a/.cspell.json b/.cspell.json index dcdc79e7f6..a0c0c0293c 100644 --- a/.cspell.json +++ b/.cspell.json @@ -13,6 +13,7 @@ "codecov", "commitlint", "dependabot", + "Dlazy", "Dout", "Dscore", "editorconfig", diff --git a/src/lib/waku_relay/constants.ts b/src/lib/waku_relay/constants.ts index c63d45459e..cba56b1b7c 100644 --- a/src/lib/waku_relay/constants.ts +++ b/src/lib/waku_relay/constants.ts @@ -11,6 +11,13 @@ export const RelayCodec = '/vac/waku/relay/2.0.0-beta2'; */ export const RelayDefaultTopic = '/waku/2/default-waku/proto'; +/** + * RelayGossipFactor affects how many peers we will emit gossip to at each heartbeat. + * We will send gossip to RelayGossipFactor * (total number of non-mesh peers), or + * RelayDlazy, whichever is greater. + */ +export declare const RelayGossipFactor = 0.25; + /** * GossipsubHeartbeatInitialDelay is the short delay before the heartbeat timer begins * after the router is initialized. @@ -22,6 +29,24 @@ export const RelayHeartbeatInitialDelay = 100; */ export const RelayHeartbeatInterval = second; +/** + * RelayPrunePeers controls the number of peers to include in prune Peer eXchange. + * When we prune a peer that's eligible for PX (has a good score, etc), we will try to + * send them signed peer records for up to RelayPrunePeers other peers that we + * know of. + */ +export const RelayPrunePeers = 16; + +/** + * RelayPruneBackoff controls the backoff time for pruned peers. This is how long + * a peer must wait before attempting to graft into our mesh again after being pruned. + * When pruning a peer, we send them our value of RelayPruneBackoff so they know + * the minimum time to wait. Peers running older versions may not send a backoff time, + * so if we receive a prune message without one, we will wait at least RelayPruneBackoff + * before attempting to re-graft. + */ +export const RelayPruneBackoff = minute; + /** * RelayFanoutTTL controls how long we keep track of the fanout state. If it's been * RelayFanoutTTL since we've published to a topic that we're not subscribed to, @@ -41,3 +66,12 @@ export const RelayOpportunisticGraftTicks = 60; * RelayOpportunisticGraftPeers is the number of peers to opportunistically graft. */ export const RelayOpportunisticGraftPeers = 2; + +/** + * RelayMaxIHaveLength is the maximum number of messages to include in an IHAVE message. + * Also controls the maximum number of IHAVE ids we will accept and request with IWANT from a + * peer within a heartbeat, to protect from IHAVE floods. You should adjust this value from the + * default if your system is pushing more than 5000 messages in GossipsubHistoryGossip heartbeats; + * with the defaults this is 1666 messages/s. + */ +export declare const RelayMaxIHaveLength = 5000; diff --git a/src/lib/waku_relay/index.ts b/src/lib/waku_relay/index.ts index 09bc941033..786a18c23d 100644 --- a/src/lib/waku_relay/index.ts +++ b/src/lib/waku_relay/index.ts @@ -1,12 +1,25 @@ import Gossipsub from 'libp2p-gossipsub'; import { Libp2p } from 'libp2p-gossipsub/src/interfaces'; -import { createGossipRpc, messageIdToString } from 'libp2p-gossipsub/src/utils'; +import { ControlPrune, PeerInfo } from 'libp2p-gossipsub/src/message'; +import { + createGossipRpc, + messageIdToString, + shuffle, +} from 'libp2p-gossipsub/src/utils'; import Pubsub, { InMessage } from 'libp2p-interfaces/src/pubsub'; import { SignaturePolicy } from 'libp2p-interfaces/src/pubsub/signature-policy'; +import PeerId from 'peer-id'; import { WakuMessage } from '../waku_message'; -import { RelayCodec, RelayDefaultTopic } from './constants'; +import { + RelayCodec, + RelayDefaultTopic, + RelayGossipFactor, + RelayMaxIHaveLength, + RelayPruneBackoff, + RelayPrunePeers, +} from './constants'; import { getRelayPeers } from './get_relay_peers'; import { RelayHeartbeat } from './relay_heartbeat'; @@ -163,6 +176,127 @@ export class WakuRelayPubsub extends Gossipsub { this._sendRpc(id, rpc); }); } + + /** + * Emits gossip to peers in a particular topic + * @param {string} topic + * @param {Set} exclude peers to exclude + * @returns {void} + */ + _emitGossip(topic: string, exclude: Set): void { + const messageIDs = this.messageCache.getGossipIDs(topic); + if (!messageIDs.length) { + return; + } + + // shuffle to emit in random order + shuffle(messageIDs); + + // if we are emitting more than GossipsubMaxIHaveLength ids, truncate the list + if (messageIDs.length > RelayMaxIHaveLength) { + // we do the truncation (with shuffling) per peer below + this.log( + 'too many messages for gossip; will truncate IHAVE list (%d messages)', + messageIDs.length + ); + } + + // Send gossip to GossipFactor peers above threshold with a minimum of D_lazy + // First we collect the peers above gossipThreshold that are not in the exclude set + // and then randomly select from that set + // We also exclude direct peers, as there is no reason to emit gossip to them + const peersToGossip: string[] = []; + const topicPeers = this.topics.get(topic); + if (!topicPeers) { + // no topic peers, no gossip + return; + } + topicPeers.forEach((id) => { + const peerStreams = this.peers.get(id); + if (!peerStreams) { + return; + } + if ( + !exclude.has(id) && + !this.direct.has(id) && + peerStreams.protocol == RelayCodec && + this.score.score(id) >= this._options.scoreThresholds.gossipThreshold + ) { + peersToGossip.push(id); + } + }); + + let target = this._options.Dlazy; + const factor = RelayGossipFactor * peersToGossip.length; + if (factor > target) { + target = factor; + } + if (target > peersToGossip.length) { + target = peersToGossip.length; + } else { + shuffle(peersToGossip); + } + // Emit the IHAVE gossip to the selected peers up to the target + peersToGossip.slice(0, target).forEach((id) => { + let peerMessageIDs = messageIDs; + if (messageIDs.length > RelayMaxIHaveLength) { + // shuffle and slice message IDs per peer so that we emit a different set for each peer + // we have enough redundancy in the system that this will significantly increase the message + // coverage when we do truncate + peerMessageIDs = shuffle(peerMessageIDs.slice()).slice( + 0, + RelayMaxIHaveLength + ); + } + this._pushGossip(id, { + topicID: topic, + messageIDs: peerMessageIDs, + }); + }); + } + + /** + * Make a PRUNE control message for a peer in a topic + * @param {string} id + * @param {string} topic + * @param {boolean} doPX + * @returns {ControlPrune} + */ + _makePrune(id: string, topic: string, doPX: boolean): ControlPrune { + // backoff is measured in seconds + // RelayPruneBackoff is measured in milliseconds + const backoff = RelayPruneBackoff / 1000; + const px: PeerInfo[] = []; + if (doPX) { + // select peers for Peer eXchange + const peers = getRelayPeers( + this, + topic, + RelayPrunePeers, + (xid: string): boolean => { + return xid !== id && this.score.score(xid) >= 0; + } + ); + peers.forEach((p) => { + // see if we have a signed record to send back; if we don't, just send + // the peer ID and let the pruned peer find them in the DHT -- we can't trust + // unsigned address records through PX anyways + // Finding signed records in the DHT is not supported at the time of writing in js-libp2p + const peerId = PeerId.createFromB58String(p); + px.push({ + peerID: peerId.toBytes(), + signedPeerRecord: this._libp2p.peerStore.addressBook.getRawEnvelope( + peerId + ), + }); + }); + } + return { + topicID: topic, + peers: px, + backoff: backoff, + }; + } } // This class provides an interface to execute the waku relay protocol