mirror of https://github.com/waku-org/js-waku.git
Override gossipsub function that checked for gossipsub peers
Instead, we need to check for waku relay peers.
This commit is contained in:
parent
b487e7803b
commit
ff9bfa7f0a
|
@ -13,6 +13,7 @@
|
||||||
"codecov",
|
"codecov",
|
||||||
"commitlint",
|
"commitlint",
|
||||||
"dependabot",
|
"dependabot",
|
||||||
|
"Dlazy",
|
||||||
"Dout",
|
"Dout",
|
||||||
"Dscore",
|
"Dscore",
|
||||||
"editorconfig",
|
"editorconfig",
|
||||||
|
|
|
@ -11,6 +11,13 @@ export const RelayCodec = '/vac/waku/relay/2.0.0-beta2';
|
||||||
*/
|
*/
|
||||||
export const RelayDefaultTopic = '/waku/2/default-waku/proto';
|
export const RelayDefaultTopic = '/waku/2/default-waku/proto';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* RelayGossipFactor affects how many peers we will emit gossip to at each heartbeat.
|
||||||
|
* We will send gossip to RelayGossipFactor * (total number of non-mesh peers), or
|
||||||
|
* RelayDlazy, whichever is greater.
|
||||||
|
*/
|
||||||
|
export declare const RelayGossipFactor = 0.25;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* GossipsubHeartbeatInitialDelay is the short delay before the heartbeat timer begins
|
* GossipsubHeartbeatInitialDelay is the short delay before the heartbeat timer begins
|
||||||
* after the router is initialized.
|
* after the router is initialized.
|
||||||
|
@ -22,6 +29,24 @@ export const RelayHeartbeatInitialDelay = 100;
|
||||||
*/
|
*/
|
||||||
export const RelayHeartbeatInterval = second;
|
export const RelayHeartbeatInterval = second;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* RelayPrunePeers controls the number of peers to include in prune Peer eXchange.
|
||||||
|
* When we prune a peer that's eligible for PX (has a good score, etc), we will try to
|
||||||
|
* send them signed peer records for up to RelayPrunePeers other peers that we
|
||||||
|
* know of.
|
||||||
|
*/
|
||||||
|
export const RelayPrunePeers = 16;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* RelayPruneBackoff controls the backoff time for pruned peers. This is how long
|
||||||
|
* a peer must wait before attempting to graft into our mesh again after being pruned.
|
||||||
|
* When pruning a peer, we send them our value of RelayPruneBackoff so they know
|
||||||
|
* the minimum time to wait. Peers running older versions may not send a backoff time,
|
||||||
|
* so if we receive a prune message without one, we will wait at least RelayPruneBackoff
|
||||||
|
* before attempting to re-graft.
|
||||||
|
*/
|
||||||
|
export const RelayPruneBackoff = minute;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* RelayFanoutTTL controls how long we keep track of the fanout state. If it's been
|
* RelayFanoutTTL controls how long we keep track of the fanout state. If it's been
|
||||||
* RelayFanoutTTL since we've published to a topic that we're not subscribed to,
|
* RelayFanoutTTL since we've published to a topic that we're not subscribed to,
|
||||||
|
@ -41,3 +66,12 @@ export const RelayOpportunisticGraftTicks = 60;
|
||||||
* RelayOpportunisticGraftPeers is the number of peers to opportunistically graft.
|
* RelayOpportunisticGraftPeers is the number of peers to opportunistically graft.
|
||||||
*/
|
*/
|
||||||
export const RelayOpportunisticGraftPeers = 2;
|
export const RelayOpportunisticGraftPeers = 2;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* RelayMaxIHaveLength is the maximum number of messages to include in an IHAVE message.
|
||||||
|
* Also controls the maximum number of IHAVE ids we will accept and request with IWANT from a
|
||||||
|
* peer within a heartbeat, to protect from IHAVE floods. You should adjust this value from the
|
||||||
|
* default if your system is pushing more than 5000 messages in GossipsubHistoryGossip heartbeats;
|
||||||
|
* with the defaults this is 1666 messages/s.
|
||||||
|
*/
|
||||||
|
export declare const RelayMaxIHaveLength = 5000;
|
||||||
|
|
|
@ -1,12 +1,25 @@
|
||||||
import Gossipsub from 'libp2p-gossipsub';
|
import Gossipsub from 'libp2p-gossipsub';
|
||||||
import { Libp2p } from 'libp2p-gossipsub/src/interfaces';
|
import { Libp2p } from 'libp2p-gossipsub/src/interfaces';
|
||||||
import { createGossipRpc, messageIdToString } from 'libp2p-gossipsub/src/utils';
|
import { ControlPrune, PeerInfo } from 'libp2p-gossipsub/src/message';
|
||||||
|
import {
|
||||||
|
createGossipRpc,
|
||||||
|
messageIdToString,
|
||||||
|
shuffle,
|
||||||
|
} from 'libp2p-gossipsub/src/utils';
|
||||||
import Pubsub, { InMessage } from 'libp2p-interfaces/src/pubsub';
|
import Pubsub, { InMessage } from 'libp2p-interfaces/src/pubsub';
|
||||||
import { SignaturePolicy } from 'libp2p-interfaces/src/pubsub/signature-policy';
|
import { SignaturePolicy } from 'libp2p-interfaces/src/pubsub/signature-policy';
|
||||||
|
import PeerId from 'peer-id';
|
||||||
|
|
||||||
import { WakuMessage } from '../waku_message';
|
import { WakuMessage } from '../waku_message';
|
||||||
|
|
||||||
import { RelayCodec, RelayDefaultTopic } from './constants';
|
import {
|
||||||
|
RelayCodec,
|
||||||
|
RelayDefaultTopic,
|
||||||
|
RelayGossipFactor,
|
||||||
|
RelayMaxIHaveLength,
|
||||||
|
RelayPruneBackoff,
|
||||||
|
RelayPrunePeers,
|
||||||
|
} from './constants';
|
||||||
import { getRelayPeers } from './get_relay_peers';
|
import { getRelayPeers } from './get_relay_peers';
|
||||||
import { RelayHeartbeat } from './relay_heartbeat';
|
import { RelayHeartbeat } from './relay_heartbeat';
|
||||||
|
|
||||||
|
@ -163,6 +176,127 @@ export class WakuRelayPubsub extends Gossipsub {
|
||||||
this._sendRpc(id, rpc);
|
this._sendRpc(id, rpc);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Emits gossip to peers in a particular topic
|
||||||
|
* @param {string} topic
|
||||||
|
* @param {Set<string>} exclude peers to exclude
|
||||||
|
* @returns {void}
|
||||||
|
*/
|
||||||
|
_emitGossip(topic: string, exclude: Set<string>): void {
|
||||||
|
const messageIDs = this.messageCache.getGossipIDs(topic);
|
||||||
|
if (!messageIDs.length) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// shuffle to emit in random order
|
||||||
|
shuffle(messageIDs);
|
||||||
|
|
||||||
|
// if we are emitting more than GossipsubMaxIHaveLength ids, truncate the list
|
||||||
|
if (messageIDs.length > RelayMaxIHaveLength) {
|
||||||
|
// we do the truncation (with shuffling) per peer below
|
||||||
|
this.log(
|
||||||
|
'too many messages for gossip; will truncate IHAVE list (%d messages)',
|
||||||
|
messageIDs.length
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send gossip to GossipFactor peers above threshold with a minimum of D_lazy
|
||||||
|
// First we collect the peers above gossipThreshold that are not in the exclude set
|
||||||
|
// and then randomly select from that set
|
||||||
|
// We also exclude direct peers, as there is no reason to emit gossip to them
|
||||||
|
const peersToGossip: string[] = [];
|
||||||
|
const topicPeers = this.topics.get(topic);
|
||||||
|
if (!topicPeers) {
|
||||||
|
// no topic peers, no gossip
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
topicPeers.forEach((id) => {
|
||||||
|
const peerStreams = this.peers.get(id);
|
||||||
|
if (!peerStreams) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (
|
||||||
|
!exclude.has(id) &&
|
||||||
|
!this.direct.has(id) &&
|
||||||
|
peerStreams.protocol == RelayCodec &&
|
||||||
|
this.score.score(id) >= this._options.scoreThresholds.gossipThreshold
|
||||||
|
) {
|
||||||
|
peersToGossip.push(id);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let target = this._options.Dlazy;
|
||||||
|
const factor = RelayGossipFactor * peersToGossip.length;
|
||||||
|
if (factor > target) {
|
||||||
|
target = factor;
|
||||||
|
}
|
||||||
|
if (target > peersToGossip.length) {
|
||||||
|
target = peersToGossip.length;
|
||||||
|
} else {
|
||||||
|
shuffle(peersToGossip);
|
||||||
|
}
|
||||||
|
// Emit the IHAVE gossip to the selected peers up to the target
|
||||||
|
peersToGossip.slice(0, target).forEach((id) => {
|
||||||
|
let peerMessageIDs = messageIDs;
|
||||||
|
if (messageIDs.length > RelayMaxIHaveLength) {
|
||||||
|
// shuffle and slice message IDs per peer so that we emit a different set for each peer
|
||||||
|
// we have enough redundancy in the system that this will significantly increase the message
|
||||||
|
// coverage when we do truncate
|
||||||
|
peerMessageIDs = shuffle(peerMessageIDs.slice()).slice(
|
||||||
|
0,
|
||||||
|
RelayMaxIHaveLength
|
||||||
|
);
|
||||||
|
}
|
||||||
|
this._pushGossip(id, {
|
||||||
|
topicID: topic,
|
||||||
|
messageIDs: peerMessageIDs,
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Make a PRUNE control message for a peer in a topic
|
||||||
|
* @param {string} id
|
||||||
|
* @param {string} topic
|
||||||
|
* @param {boolean} doPX
|
||||||
|
* @returns {ControlPrune}
|
||||||
|
*/
|
||||||
|
_makePrune(id: string, topic: string, doPX: boolean): ControlPrune {
|
||||||
|
// backoff is measured in seconds
|
||||||
|
// RelayPruneBackoff is measured in milliseconds
|
||||||
|
const backoff = RelayPruneBackoff / 1000;
|
||||||
|
const px: PeerInfo[] = [];
|
||||||
|
if (doPX) {
|
||||||
|
// select peers for Peer eXchange
|
||||||
|
const peers = getRelayPeers(
|
||||||
|
this,
|
||||||
|
topic,
|
||||||
|
RelayPrunePeers,
|
||||||
|
(xid: string): boolean => {
|
||||||
|
return xid !== id && this.score.score(xid) >= 0;
|
||||||
|
}
|
||||||
|
);
|
||||||
|
peers.forEach((p) => {
|
||||||
|
// see if we have a signed record to send back; if we don't, just send
|
||||||
|
// the peer ID and let the pruned peer find them in the DHT -- we can't trust
|
||||||
|
// unsigned address records through PX anyways
|
||||||
|
// Finding signed records in the DHT is not supported at the time of writing in js-libp2p
|
||||||
|
const peerId = PeerId.createFromB58String(p);
|
||||||
|
px.push({
|
||||||
|
peerID: peerId.toBytes(),
|
||||||
|
signedPeerRecord: this._libp2p.peerStore.addressBook.getRawEnvelope(
|
||||||
|
peerId
|
||||||
|
),
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
return {
|
||||||
|
topicID: topic,
|
||||||
|
peers: px,
|
||||||
|
backoff: backoff,
|
||||||
|
};
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// This class provides an interface to execute the waku relay protocol
|
// This class provides an interface to execute the waku relay protocol
|
||||||
|
|
Loading…
Reference in New Issue