mirror of https://github.com/waku-org/js-waku.git
Upgrade libp2p-gossipsub
This commit is contained in:
parent
98c93c8283
commit
8e5318dc4a
File diff suppressed because it is too large
Load Diff
|
@ -66,6 +66,7 @@
|
||||||
"node": ">=16"
|
"node": ">=16"
|
||||||
},
|
},
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
|
"@chainsafe/libp2p-gossipsub": "^2.0.0",
|
||||||
"@chainsafe/libp2p-noise": "^5.0.0",
|
"@chainsafe/libp2p-noise": "^5.0.0",
|
||||||
"@ethersproject/rlp": "^5.5.0",
|
"@ethersproject/rlp": "^5.5.0",
|
||||||
"@libp2p/interface-peer-id": "^1.0.2",
|
"@libp2p/interface-peer-id": "^1.0.2",
|
||||||
|
@ -74,7 +75,7 @@
|
||||||
"@libp2p/peer-id": "^1.1.10",
|
"@libp2p/peer-id": "^1.1.10",
|
||||||
"@multiformats/multiaddr": "^10.2.0",
|
"@multiformats/multiaddr": "^10.2.0",
|
||||||
"@noble/secp256k1": "^1.3.4",
|
"@noble/secp256k1": "^1.3.4",
|
||||||
"debug": "^4.3.1",
|
"debug": "^4.3.4",
|
||||||
"dns-query": "^0.11.1",
|
"dns-query": "^0.11.1",
|
||||||
"hi-base32": "^0.5.1",
|
"hi-base32": "^0.5.1",
|
||||||
"it-concat": "^2.0.0",
|
"it-concat": "^2.0.0",
|
||||||
|
@ -84,7 +85,6 @@
|
||||||
"libp2p": "^0.37.3",
|
"libp2p": "^0.37.3",
|
||||||
"libp2p-bootstrap": "^0.14.0",
|
"libp2p-bootstrap": "^0.14.0",
|
||||||
"libp2p-crypto": "^0.21.2",
|
"libp2p-crypto": "^0.21.2",
|
||||||
"libp2p-gossipsub": "0.13.0",
|
|
||||||
"libp2p-mplex": "^0.10.4",
|
"libp2p-mplex": "^0.10.4",
|
||||||
"libp2p-websockets": "^0.16.1",
|
"libp2p-websockets": "^0.16.1",
|
||||||
"multiformats": "^9.6.5",
|
"multiformats": "^9.6.5",
|
||||||
|
@ -101,6 +101,7 @@
|
||||||
"@size-limit/preset-big-lib": "^7.0.8",
|
"@size-limit/preset-big-lib": "^7.0.8",
|
||||||
"@types/app-root-path": "^1.2.4",
|
"@types/app-root-path": "^1.2.4",
|
||||||
"@types/chai": "^4.2.15",
|
"@types/chai": "^4.2.15",
|
||||||
|
"@types/debug": "^4.1.7",
|
||||||
"@types/mocha": "^9.1.0",
|
"@types/mocha": "^9.1.0",
|
||||||
"@types/node": "^17.0.6",
|
"@types/node": "^17.0.6",
|
||||||
"@types/secp256k1": "^4.0.2",
|
"@types/secp256k1": "^4.0.2",
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
import { shuffle } from "libp2p-gossipsub/src/utils/index";
|
import { shuffle } from "@chainsafe/libp2p-gossipsub/utils/shuffle";
|
||||||
|
|
||||||
export function getPseudoRandomSubset<T>(
|
export function getPseudoRandomSubset<T>(
|
||||||
values: T[],
|
values: T[],
|
||||||
|
|
|
@ -201,7 +201,7 @@ describe("Wait for remote peer / get peers", function () {
|
||||||
const nimPeerId = multiAddrWithId.getPeerId();
|
const nimPeerId = multiAddrWithId.getPeerId();
|
||||||
|
|
||||||
expect(nimPeerId).to.not.be.undefined;
|
expect(nimPeerId).to.not.be.undefined;
|
||||||
expect(peers.has(nimPeerId as string)).to.be.true;
|
expect(peers).to.includes(nimPeerId);
|
||||||
});
|
});
|
||||||
|
|
||||||
it("Relay - dialed after", async function () {
|
it("Relay - dialed after", async function () {
|
||||||
|
@ -219,11 +219,12 @@ describe("Wait for remote peer / get peers", function () {
|
||||||
await waku.dial(multiAddrWithId);
|
await waku.dial(multiAddrWithId);
|
||||||
await waitPromise;
|
await waitPromise;
|
||||||
|
|
||||||
|
// TODO: Should getMeshPeers be used instead?
|
||||||
const peers = waku.relay.getPeers();
|
const peers = waku.relay.getPeers();
|
||||||
const nimPeerId = multiAddrWithId.getPeerId();
|
const nimPeerId = multiAddrWithId.getPeerId();
|
||||||
|
|
||||||
expect(nimPeerId).to.not.be.undefined;
|
expect(nimPeerId).to.not.be.undefined;
|
||||||
expect(peers.has(nimPeerId as string)).to.be.true;
|
expect(peers).includes(nimPeerId);
|
||||||
});
|
});
|
||||||
|
|
||||||
it("Relay - times out", function (done) {
|
it("Relay - times out", function (done) {
|
||||||
|
|
|
@ -128,8 +128,7 @@ export class Waku {
|
||||||
options.relayKeepAlive || DefaultRelayKeepAliveValueSecs;
|
options.relayKeepAlive || DefaultRelayKeepAliveValueSecs;
|
||||||
|
|
||||||
libp2p.connectionManager.addEventListener("peer:connect", (evt) => {
|
libp2p.connectionManager.addEventListener("peer:connect", (evt) => {
|
||||||
const { detail: connection } = evt;
|
this.startKeepAlive(evt.detail.remotePeer, pingKeepAlive, relayKeepAlive);
|
||||||
this.startKeepAlive(connection.remotePeer, pingKeepAlive, relayKeepAlive);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -144,8 +143,7 @@ export class Waku {
|
||||||
* @see https://github.com/libp2p/js-libp2p/blob/bad9e8c0ff58d60a78314077720c82ae331cc55b/doc/API.md?plain=1#L2100
|
* @see https://github.com/libp2p/js-libp2p/blob/bad9e8c0ff58d60a78314077720c82ae331cc55b/doc/API.md?plain=1#L2100
|
||||||
*/
|
*/
|
||||||
libp2p.connectionManager.addEventListener("peer:disconnect", (evt) => {
|
libp2p.connectionManager.addEventListener("peer:disconnect", (evt) => {
|
||||||
const { detail: connection } = evt;
|
this.stopKeepAlive(evt.detail.remotePeer);
|
||||||
this.stopKeepAlive(connection.remotePeer);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
options?.decryptionKeys?.forEach((key) => {
|
options?.decryptionKeys?.forEach((key) => {
|
||||||
|
@ -372,10 +370,21 @@ export class Waku {
|
||||||
|
|
||||||
if (protocols.includes(Protocols.Store)) {
|
if (protocols.includes(Protocols.Store)) {
|
||||||
const storePromise = (async (): Promise<void> => {
|
const storePromise = (async (): Promise<void> => {
|
||||||
for await (const peer of this.store.peers) {
|
const peers = await this.store.peers();
|
||||||
dbg("Store peer found", peer.id.toString());
|
|
||||||
break;
|
if (peers.length) {
|
||||||
|
dbg("Store peer found: ", peers[0].id.toString());
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
await new Promise<void>((resolve) => {
|
||||||
|
this.libp2p.peerStore.addEventListener("change:protocols", (evt) => {
|
||||||
|
if (evt.detail.protocols.includes(LightPushCodec)) {
|
||||||
|
dbg("Resolving for", LightPushCodec, evt.detail.protocols);
|
||||||
|
resolve();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
})();
|
})();
|
||||||
promises.push(storePromise);
|
promises.push(storePromise);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,48 +0,0 @@
|
||||||
import Gossipsub from "libp2p-gossipsub";
|
|
||||||
import { shuffle } from "libp2p-gossipsub/src/utils";
|
|
||||||
|
|
||||||
import { RelayCodecs } from "./constants";
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Given a topic, returns up to count peers subscribed to that topic
|
|
||||||
* that pass an optional filter function
|
|
||||||
*
|
|
||||||
* @param {Gossipsub} router
|
|
||||||
* @param {String} topic
|
|
||||||
* @param {Number} count
|
|
||||||
* @param {Function} [filter] a function to filter acceptable peers
|
|
||||||
* @returns {Set<string>}
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
export function getRelayPeers(
|
|
||||||
router: Gossipsub,
|
|
||||||
topic: string,
|
|
||||||
count: number,
|
|
||||||
filter: (id: string) => boolean = (): boolean => true
|
|
||||||
): Set<string> {
|
|
||||||
const peersInTopic = router.topics.get(topic);
|
|
||||||
if (!peersInTopic) {
|
|
||||||
return new Set();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Adds all peers using our protocol
|
|
||||||
// that also pass the filter function
|
|
||||||
let peers: string[] = [];
|
|
||||||
peersInTopic.forEach((id: string) => {
|
|
||||||
const peerStreams = router.peers.get(id);
|
|
||||||
if (!peerStreams) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (RelayCodecs.includes(peerStreams.protocol) && filter(id)) {
|
|
||||||
peers.push(id);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// Pseudo-randomly shuffles peers
|
|
||||||
peers = shuffle(peers);
|
|
||||||
if (count > 0 && peers.length > count) {
|
|
||||||
peers = peers.slice(0, count);
|
|
||||||
}
|
|
||||||
|
|
||||||
return new Set(peers);
|
|
||||||
}
|
|
|
@ -1,17 +1,10 @@
|
||||||
import debug from "debug";
|
|
||||||
import { Libp2p } from "libp2p";
|
|
||||||
import Gossipsub from "libp2p-gossipsub";
|
|
||||||
import { AddrInfo, MessageIdFunction } from "libp2p-gossipsub/src/interfaces";
|
|
||||||
import { MessageCache } from "libp2p-gossipsub/src/message-cache";
|
|
||||||
import { RPC } from "libp2p-gossipsub/src/message/rpc";
|
|
||||||
import {
|
import {
|
||||||
PeerScoreParams,
|
GossipSub,
|
||||||
PeerScoreThresholds,
|
GossipsubMessage,
|
||||||
} from "libp2p-gossipsub/src/score";
|
GossipsubOpts,
|
||||||
import { createGossipRpc, shuffle } from "libp2p-gossipsub/src/utils";
|
} from "@chainsafe/libp2p-gossipsub";
|
||||||
import { InMessage } from "libp2p-interfaces/src/pubsub";
|
import { SignaturePolicy } from "@chainsafe/libp2p-gossipsub/types";
|
||||||
import { SignaturePolicy } from "libp2p-interfaces/src/pubsub/signature-policy";
|
import debug from "debug";
|
||||||
import PeerId from "peer-id";
|
|
||||||
|
|
||||||
import { DefaultPubSubTopic } from "../constants";
|
import { DefaultPubSubTopic } from "../constants";
|
||||||
import { hexToBytes } from "../utils";
|
import { hexToBytes } from "../utils";
|
||||||
|
@ -19,35 +12,9 @@ import { CreateOptions } from "../waku";
|
||||||
import { DecryptionMethod, WakuMessage } from "../waku_message";
|
import { DecryptionMethod, WakuMessage } from "../waku_message";
|
||||||
|
|
||||||
import * as constants from "./constants";
|
import * as constants from "./constants";
|
||||||
import { getRelayPeers } from "./get_relay_peers";
|
|
||||||
import { RelayHeartbeat } from "./relay_heartbeat";
|
|
||||||
|
|
||||||
const dbg = debug("waku:relay");
|
const dbg = debug("waku:relay");
|
||||||
|
|
||||||
/**
|
|
||||||
* See constructor libp2p-gossipsub [API](https://github.com/ChainSafe/js-libp2p-gossipsub#api).
|
|
||||||
*/
|
|
||||||
export interface GossipOptions {
|
|
||||||
emitSelf: boolean;
|
|
||||||
gossipIncoming: boolean;
|
|
||||||
fallbackToFloodsub: boolean;
|
|
||||||
floodPublish: boolean;
|
|
||||||
doPX: boolean;
|
|
||||||
msgIdFn: MessageIdFunction;
|
|
||||||
messageCache: MessageCache;
|
|
||||||
// This option is always overridden
|
|
||||||
// globalSignaturePolicy: string;
|
|
||||||
scoreParams: Partial<PeerScoreParams>;
|
|
||||||
scoreThresholds: Partial<PeerScoreThresholds>;
|
|
||||||
directPeers: AddrInfo[];
|
|
||||||
D: number;
|
|
||||||
Dlo: number;
|
|
||||||
Dhi: number;
|
|
||||||
Dscore: number;
|
|
||||||
Dout: number;
|
|
||||||
Dlazy: number;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Implements the [Waku v2 Relay protocol]{@link https://rfc.vac.dev/spec/11/}.
|
* Implements the [Waku v2 Relay protocol]{@link https://rfc.vac.dev/spec/11/}.
|
||||||
* Must be passed as a `pubsub` module to a {Libp2p} instance.
|
* Must be passed as a `pubsub` module to a {Libp2p} instance.
|
||||||
|
@ -55,9 +22,9 @@ export interface GossipOptions {
|
||||||
* @implements {require('libp2p-interfaces/src/pubsub')}
|
* @implements {require('libp2p-interfaces/src/pubsub')}
|
||||||
* @noInheritDoc
|
* @noInheritDoc
|
||||||
*/
|
*/
|
||||||
export class WakuRelay extends Gossipsub {
|
export class WakuRelay extends GossipSub {
|
||||||
heartbeat: RelayHeartbeat;
|
|
||||||
pubSubTopic: string;
|
pubSubTopic: string;
|
||||||
|
public static multicodec: string = constants.RelayCodecs[0];
|
||||||
|
|
||||||
public decryptionKeys: Map<
|
public decryptionKeys: Map<
|
||||||
Uint8Array,
|
Uint8Array,
|
||||||
|
@ -72,26 +39,19 @@ export class WakuRelay extends Gossipsub {
|
||||||
[contentTopic: string]: Set<(message: WakuMessage) => void>;
|
[contentTopic: string]: Set<(message: WakuMessage) => void>;
|
||||||
};
|
};
|
||||||
|
|
||||||
constructor(
|
constructor(options?: Partial<CreateOptions & GossipsubOpts>) {
|
||||||
libp2p: Libp2p,
|
|
||||||
options?: Partial<CreateOptions & GossipOptions>
|
|
||||||
) {
|
|
||||||
super(
|
super(
|
||||||
libp2p,
|
Object.assign(options, {
|
||||||
Object.assign(options ?? {}, {
|
|
||||||
// Ensure that no signature is included nor expected in the messages.
|
// Ensure that no signature is included nor expected in the messages.
|
||||||
globalSignaturePolicy: SignaturePolicy.StrictNoSign,
|
globalSignaturePolicy: SignaturePolicy.StrictNoSign,
|
||||||
|
fallbackToFloodsub: false,
|
||||||
})
|
})
|
||||||
);
|
);
|
||||||
|
this.multicodecs = constants.RelayCodecs;
|
||||||
|
|
||||||
this.heartbeat = new RelayHeartbeat(this);
|
|
||||||
this.observers = {};
|
this.observers = {};
|
||||||
this.decryptionKeys = new Map();
|
this.decryptionKeys = new Map();
|
||||||
|
|
||||||
const multicodecs = constants.RelayCodecs;
|
|
||||||
|
|
||||||
Object.assign(this, { multicodecs });
|
|
||||||
|
|
||||||
this.pubSubTopic = options?.pubSubTopic || DefaultPubSubTopic;
|
this.pubSubTopic = options?.pubSubTopic || DefaultPubSubTopic;
|
||||||
|
|
||||||
options?.decryptionKeys?.forEach((key) => {
|
options?.decryptionKeys?.forEach((key) => {
|
||||||
|
@ -119,7 +79,7 @@ export class WakuRelay extends Gossipsub {
|
||||||
*/
|
*/
|
||||||
public async send(message: WakuMessage): Promise<void> {
|
public async send(message: WakuMessage): Promise<void> {
|
||||||
const msg = message.encode();
|
const msg = message.encode();
|
||||||
await super.publish(this.pubSubTopic, msg);
|
await this.publish(this.pubSubTopic, msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -195,338 +155,57 @@ export class WakuRelay extends Gossipsub {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Return the relay peers we are connected to, and we would publish a message to
|
|
||||||
*/
|
|
||||||
getPeers(): Set<string> {
|
|
||||||
return getRelayPeers(this, this.pubSubTopic, this._options.D, (id) => {
|
|
||||||
// Filter peers we would not publish to
|
|
||||||
return (
|
|
||||||
this.score.score(id) >= this._options.scoreThresholds.publishThreshold
|
|
||||||
);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Subscribe to a pubsub topic and start emitting Waku messages to observers.
|
* Subscribe to a pubsub topic and start emitting Waku messages to observers.
|
||||||
*
|
*
|
||||||
* @override
|
* @override
|
||||||
*/
|
*/
|
||||||
subscribe(pubSubTopic: string): void {
|
subscribe(pubSubTopic: string): void {
|
||||||
this.on(pubSubTopic, (event) => {
|
this.addEventListener(
|
||||||
const decryptionKeys = Array.from(this.decryptionKeys).map(
|
"gossipsub:message",
|
||||||
([key, { method, contentTopics }]) => {
|
(event: CustomEvent<GossipsubMessage>) => {
|
||||||
return {
|
if (event.detail.msg.topic === pubSubTopic) {
|
||||||
key,
|
const decryptionKeys = Array.from(this.decryptionKeys).map(
|
||||||
method,
|
([key, { method, contentTopics }]) => {
|
||||||
contentTopics,
|
return {
|
||||||
};
|
key,
|
||||||
}
|
method,
|
||||||
);
|
contentTopics,
|
||||||
|
};
|
||||||
dbg(`Message received on ${pubSubTopic}`);
|
|
||||||
WakuMessage.decode(event.data, decryptionKeys)
|
|
||||||
.then((wakuMsg) => {
|
|
||||||
if (!wakuMsg) {
|
|
||||||
dbg("Failed to decode Waku Message");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (this.observers[""]) {
|
|
||||||
this.observers[""].forEach((callbackFn) => {
|
|
||||||
callbackFn(wakuMsg);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
if (wakuMsg.contentTopic) {
|
|
||||||
if (this.observers[wakuMsg.contentTopic]) {
|
|
||||||
this.observers[wakuMsg.contentTopic].forEach((callbackFn) => {
|
|
||||||
callbackFn(wakuMsg);
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
}
|
);
|
||||||
})
|
|
||||||
.catch((e) => {
|
dbg(`Message received on ${pubSubTopic}`);
|
||||||
dbg("Failed to decode Waku Message", e);
|
WakuMessage.decode(event.detail.msg.data, decryptionKeys)
|
||||||
});
|
.then((wakuMsg) => {
|
||||||
});
|
if (!wakuMsg) {
|
||||||
|
dbg("Failed to decode Waku Message");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this.observers[""]) {
|
||||||
|
this.observers[""].forEach((callbackFn) => {
|
||||||
|
callbackFn(wakuMsg);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
if (wakuMsg.contentTopic) {
|
||||||
|
if (this.observers[wakuMsg.contentTopic]) {
|
||||||
|
this.observers[wakuMsg.contentTopic].forEach((callbackFn) => {
|
||||||
|
callbackFn(wakuMsg);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.catch((e) => {
|
||||||
|
dbg("Failed to decode Waku Message", e);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
super.subscribe(pubSubTopic);
|
super.subscribe(pubSubTopic);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
// TODO: Implement method that uses Relay codec
|
||||||
* Join pubsub topic.
|
// public async heartbeat(): Promise<void> {
|
||||||
* This is present to override the behavior of Gossipsub and should not
|
|
||||||
* be used by API Consumers
|
|
||||||
*
|
|
||||||
* @internal
|
|
||||||
* @param {string} topic
|
|
||||||
* @returns {void}
|
|
||||||
* @override
|
|
||||||
*/
|
|
||||||
join(topic: string): void {
|
|
||||||
if (!this.started) {
|
|
||||||
throw new Error("WakuRelayPubSub has not started");
|
|
||||||
}
|
|
||||||
|
|
||||||
const fanoutPeers = this.fanout.get(topic);
|
|
||||||
if (fanoutPeers) {
|
|
||||||
// these peers have a score above the publish threshold, which may be negative
|
|
||||||
// so drop the ones with a negative score
|
|
||||||
fanoutPeers.forEach((id) => {
|
|
||||||
if (this.score.score(id) < 0) {
|
|
||||||
fanoutPeers.delete(id);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
if (fanoutPeers.size < this._options.D) {
|
|
||||||
// we need more peers; eager, as this would get fixed in the next heartbeat
|
|
||||||
getRelayPeers(
|
|
||||||
this,
|
|
||||||
topic,
|
|
||||||
this._options.D - fanoutPeers.size,
|
|
||||||
(id: string): boolean => {
|
|
||||||
// filter our current peers, direct peers, and peers with negative scores
|
|
||||||
return (
|
|
||||||
!fanoutPeers.has(id) &&
|
|
||||||
!this.direct.has(id) &&
|
|
||||||
this.score.score(id) >= 0
|
|
||||||
);
|
|
||||||
}
|
|
||||||
).forEach((id) => fanoutPeers.add(id));
|
|
||||||
}
|
|
||||||
this.mesh.set(topic, fanoutPeers);
|
|
||||||
this.fanout.delete(topic);
|
|
||||||
this.lastpub.delete(topic);
|
|
||||||
} else {
|
|
||||||
const peers = getRelayPeers(
|
|
||||||
this,
|
|
||||||
topic,
|
|
||||||
this._options.D,
|
|
||||||
(id: string): boolean => {
|
|
||||||
// filter direct peers and peers with negative score
|
|
||||||
return !this.direct.has(id) && this.score.score(id) >= 0;
|
|
||||||
}
|
|
||||||
);
|
|
||||||
this.mesh.set(topic, peers);
|
|
||||||
}
|
|
||||||
this.mesh.get(topic)?.forEach((id) => {
|
|
||||||
this.log("JOIN: Add mesh link to %s in %s", id, topic);
|
|
||||||
this._sendGraft(id, topic);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Publish messages.
|
|
||||||
* This is present to override the behavior of Gossipsub and should not
|
|
||||||
* be used by API Consumers
|
|
||||||
*
|
|
||||||
* @ignore
|
|
||||||
* @override
|
|
||||||
* @param {InMessage} msg
|
|
||||||
* @returns {void}
|
|
||||||
*/
|
|
||||||
async _publish(msg: InMessage): Promise<void> {
|
|
||||||
const msgIdStr = await this.getCanonicalMsgIdStr(msg);
|
|
||||||
if (msg.receivedFrom !== this.peerId.toString()) {
|
|
||||||
this.score.deliverMessage(msg, msgIdStr);
|
|
||||||
this.gossipTracer.deliverMessage(msgIdStr);
|
|
||||||
}
|
|
||||||
|
|
||||||
// put in seen cache
|
|
||||||
this.seenCache.put(msgIdStr);
|
|
||||||
|
|
||||||
this.messageCache.put(msg, msgIdStr);
|
|
||||||
|
|
||||||
const toSend = new Set<string>();
|
|
||||||
msg.topicIDs.forEach((topic) => {
|
|
||||||
const peersInTopic = this.topics.get(topic);
|
|
||||||
if (!peersInTopic) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// direct peers
|
|
||||||
this.direct.forEach((id) => {
|
|
||||||
toSend.add(id);
|
|
||||||
});
|
|
||||||
|
|
||||||
let meshPeers = this.mesh.get(topic);
|
|
||||||
if (!meshPeers || !meshPeers.size) {
|
|
||||||
// We are not in the mesh for topic, use fanout peers
|
|
||||||
meshPeers = this.fanout.get(topic);
|
|
||||||
if (!meshPeers) {
|
|
||||||
// If we are not in the fanout, then pick peers in topic above the publishThreshold
|
|
||||||
const peers = getRelayPeers(this, topic, this._options.D, (id) => {
|
|
||||||
return (
|
|
||||||
this.score.score(id) >=
|
|
||||||
this._options.scoreThresholds.publishThreshold
|
|
||||||
);
|
|
||||||
});
|
|
||||||
|
|
||||||
if (peers.size > 0) {
|
|
||||||
meshPeers = peers;
|
|
||||||
this.fanout.set(topic, peers);
|
|
||||||
} else {
|
|
||||||
meshPeers = new Set();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Store the latest publishing time
|
|
||||||
this.lastpub.set(topic, this._now());
|
|
||||||
}
|
|
||||||
|
|
||||||
meshPeers?.forEach((peer) => {
|
|
||||||
toSend.add(peer);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
// Publish messages to peers
|
|
||||||
const rpc = createGossipRpc([Gossipsub.utils.normalizeOutRpcMessage(msg)]);
|
|
||||||
dbg(`Relay message to ${toSend.size} peers`);
|
|
||||||
toSend.forEach((id) => {
|
|
||||||
if (id === msg.from) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
dbg("Relay message to", id);
|
|
||||||
this._sendRpc(id, rpc);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Emits gossip to peers in a particular topic.
|
|
||||||
*
|
|
||||||
* This is present to override the behavior of Gossipsub and should not
|
|
||||||
* be used by API Consumers
|
|
||||||
*
|
|
||||||
* @ignore
|
|
||||||
* @override
|
|
||||||
* @param {string} topic
|
|
||||||
* @param {Set<string>} exclude peers to exclude
|
|
||||||
* @returns {void}
|
|
||||||
*/
|
|
||||||
_emitGossip(topic: string, exclude: Set<string>): void {
|
|
||||||
const messageIDs = this.messageCache.getGossipIDs(topic);
|
|
||||||
if (!messageIDs.length) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// shuffle to emit in random order
|
|
||||||
shuffle(messageIDs);
|
|
||||||
|
|
||||||
// if we are emitting more than GossipsubMaxIHaveLength ids, truncate the list
|
|
||||||
if (messageIDs.length > constants.RelayMaxIHaveLength) {
|
|
||||||
// we do the truncation (with shuffling) per peer below
|
|
||||||
this.log(
|
|
||||||
"too many messages for gossip; will truncate IHAVE list (%d messages)",
|
|
||||||
messageIDs.length
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Send gossip to GossipFactor peers above threshold with a minimum of D_lazy
|
|
||||||
// First we collect the peers above gossipThreshold that are not in the exclude set
|
|
||||||
// and then randomly select from that set
|
|
||||||
// We also exclude direct peers, as there is no reason to emit gossip to them
|
|
||||||
const peersToGossip: string[] = [];
|
|
||||||
const topicPeers = this.topics.get(topic);
|
|
||||||
if (!topicPeers) {
|
|
||||||
// no topic peers, no gossip
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
topicPeers.forEach((id) => {
|
|
||||||
const peerStreams = this.peers.get(id);
|
|
||||||
if (!peerStreams) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (
|
|
||||||
!exclude.has(id) &&
|
|
||||||
!this.direct.has(id) &&
|
|
||||||
constants.RelayCodecs.includes(peerStreams.protocol) &&
|
|
||||||
this.score.score(id) >= this._options.scoreThresholds.gossipThreshold
|
|
||||||
) {
|
|
||||||
peersToGossip.push(id);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
let target = this._options.Dlazy;
|
|
||||||
const factor = constants.RelayGossipFactor * peersToGossip.length;
|
|
||||||
if (factor > target) {
|
|
||||||
target = factor;
|
|
||||||
}
|
|
||||||
if (target > peersToGossip.length) {
|
|
||||||
target = peersToGossip.length;
|
|
||||||
} else {
|
|
||||||
shuffle(peersToGossip);
|
|
||||||
}
|
|
||||||
// Emit the IHAVE gossip to the selected peers up to the target
|
|
||||||
peersToGossip.slice(0, target).forEach((id) => {
|
|
||||||
let peerMessageIDs = messageIDs;
|
|
||||||
if (messageIDs.length > constants.RelayMaxIHaveLength) {
|
|
||||||
// shuffle and slice message IDs per peer so that we emit a different set for each peer
|
|
||||||
// we have enough redundancy in the system that this will significantly increase the message
|
|
||||||
// coverage when we do truncate
|
|
||||||
peerMessageIDs = shuffle(peerMessageIDs.slice()).slice(
|
|
||||||
0,
|
|
||||||
constants.RelayMaxIHaveLength
|
|
||||||
);
|
|
||||||
}
|
|
||||||
this._pushGossip(id, {
|
|
||||||
topicID: topic,
|
|
||||||
messageIDs: peerMessageIDs,
|
|
||||||
});
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Make a PRUNE control message for a peer in a topic.
|
|
||||||
* This is present to override the behavior of Gossipsub and should not
|
|
||||||
* be used by API Consumers
|
|
||||||
*
|
|
||||||
* @ignore
|
|
||||||
* @override
|
|
||||||
* @param {string} id
|
|
||||||
* @param {string} topic
|
|
||||||
* @param {boolean} doPX
|
|
||||||
* @returns {Promise<RPC.IControlPrune>}
|
|
||||||
*/
|
|
||||||
async _makePrune(
|
|
||||||
id: string,
|
|
||||||
topic: string,
|
|
||||||
doPX: boolean
|
|
||||||
): Promise<RPC.IControlPrune> {
|
|
||||||
// backoff is measured in seconds
|
|
||||||
// RelayPruneBackoff is measured in milliseconds
|
|
||||||
const backoff = constants.RelayPruneBackoff / 1000;
|
|
||||||
if (!doPX) {
|
|
||||||
return {
|
|
||||||
topicID: topic,
|
|
||||||
peers: [],
|
|
||||||
backoff: backoff,
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
// select peers for Peer eXchange
|
|
||||||
const peers = getRelayPeers(
|
|
||||||
this,
|
|
||||||
topic,
|
|
||||||
constants.RelayPrunePeers,
|
|
||||||
(xid: string): boolean => {
|
|
||||||
return xid !== id && this.score.score(xid) >= 0;
|
|
||||||
}
|
|
||||||
);
|
|
||||||
const px = await Promise.all(
|
|
||||||
Array.from(peers).map(async (p) => {
|
|
||||||
// see if we have a signed record to send back; if we don't, just send
|
|
||||||
// the peer ID and let the pruned peer find them in the DHT -- we can't trust
|
|
||||||
// unsigned address records through PX anyways
|
|
||||||
// Finding signed records in the DHT is not supported at the time of writing in js-libp2p
|
|
||||||
const peerId = PeerId.createFromB58String(p);
|
|
||||||
return {
|
|
||||||
peerID: peerId.toBytes(),
|
|
||||||
signedPeerRecord:
|
|
||||||
await this._libp2p.peerStore.addressBook.getRawEnvelope(peerId),
|
|
||||||
};
|
|
||||||
})
|
|
||||||
);
|
|
||||||
return {
|
|
||||||
topicID: topic,
|
|
||||||
peers: px,
|
|
||||||
backoff: backoff,
|
|
||||||
};
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,379 +0,0 @@
|
||||||
/**
|
|
||||||
* @hidden
|
|
||||||
* @module
|
|
||||||
*/
|
|
||||||
|
|
||||||
import Gossipsub from "libp2p-gossipsub";
|
|
||||||
import { Heartbeat } from "libp2p-gossipsub/src/heartbeat";
|
|
||||||
import { shuffle } from "libp2p-gossipsub/src/utils";
|
|
||||||
|
|
||||||
import * as constants from "./constants";
|
|
||||||
import { getRelayPeers } from "./get_relay_peers";
|
|
||||||
|
|
||||||
export class RelayHeartbeat extends Heartbeat {
|
|
||||||
/**
|
|
||||||
* @param {Object} gossipsub
|
|
||||||
* @constructor
|
|
||||||
*/
|
|
||||||
constructor(gossipsub: Gossipsub) {
|
|
||||||
super(gossipsub);
|
|
||||||
}
|
|
||||||
|
|
||||||
start(): void {
|
|
||||||
if (this._heartbeatTimer) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
const heartbeat = this._heartbeat.bind(this);
|
|
||||||
|
|
||||||
const timeout = setTimeout(() => {
|
|
||||||
heartbeat();
|
|
||||||
this._heartbeatTimer?.runPeriodically(
|
|
||||||
heartbeat,
|
|
||||||
constants.RelayHeartbeatInterval
|
|
||||||
);
|
|
||||||
}, constants.RelayHeartbeatInitialDelay);
|
|
||||||
|
|
||||||
this._heartbeatTimer = {
|
|
||||||
_intervalId: undefined,
|
|
||||||
runPeriodically: (fn, period): void => {
|
|
||||||
// this._heartbeatTimer cannot be null, it is being assigned.
|
|
||||||
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
|
|
||||||
this._heartbeatTimer!._intervalId = setInterval(fn, period);
|
|
||||||
},
|
|
||||||
cancel: (): void => {
|
|
||||||
clearTimeout(timeout);
|
|
||||||
clearInterval(this._heartbeatTimer?._intervalId as NodeJS.Timeout);
|
|
||||||
},
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Unmounts the gossipsub protocol and shuts down every connection
|
|
||||||
* @override
|
|
||||||
* @returns {void}
|
|
||||||
*/
|
|
||||||
stop(): void {
|
|
||||||
if (!this._heartbeatTimer) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
this._heartbeatTimer.cancel();
|
|
||||||
this._heartbeatTimer = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Maintains the mesh and fanout maps in gossipsub.
|
|
||||||
*
|
|
||||||
* @returns {void}
|
|
||||||
*/
|
|
||||||
_heartbeat(): void {
|
|
||||||
const { D, Dlo, Dhi, Dscore, Dout } = this.gossipsub._options;
|
|
||||||
this.gossipsub.heartbeatTicks++;
|
|
||||||
|
|
||||||
// cache scores through the heartbeat
|
|
||||||
const scores = new Map<string, number>();
|
|
||||||
const getScore = (id: string): number => {
|
|
||||||
let s = scores.get(id);
|
|
||||||
if (s === undefined) {
|
|
||||||
s = this.gossipsub.score.score(id);
|
|
||||||
scores.set(id, s);
|
|
||||||
}
|
|
||||||
return s;
|
|
||||||
};
|
|
||||||
|
|
||||||
// peer id => topic[]
|
|
||||||
const toGraft = new Map<string, string[]>();
|
|
||||||
// peer id => topic[]
|
|
||||||
const toPrune = new Map<string, string[]>();
|
|
||||||
// peer id => don't px
|
|
||||||
const noPX = new Map<string, boolean>();
|
|
||||||
|
|
||||||
// clean up expired backoffs
|
|
||||||
this.gossipsub._clearBackoff();
|
|
||||||
|
|
||||||
// clean up peerhave/iasked counters
|
|
||||||
this.gossipsub.peerhave.clear();
|
|
||||||
this.gossipsub.iasked.clear();
|
|
||||||
|
|
||||||
// apply IWANT request penalties
|
|
||||||
this.gossipsub._applyIwantPenalties();
|
|
||||||
|
|
||||||
// ensure direct peers are connected
|
|
||||||
this.gossipsub._directConnect();
|
|
||||||
|
|
||||||
// maintain the mesh for topics we have joined
|
|
||||||
this.gossipsub.mesh.forEach((peers, topic) => {
|
|
||||||
// prune/graft helper functions (defined per topic)
|
|
||||||
const prunePeer = (id: string): void => {
|
|
||||||
this.gossipsub.log(
|
|
||||||
"HEARTBEAT: Remove mesh link to %s in %s",
|
|
||||||
id,
|
|
||||||
topic
|
|
||||||
);
|
|
||||||
// update peer score
|
|
||||||
this.gossipsub.score.prune(id, topic);
|
|
||||||
// add prune backoff record
|
|
||||||
this.gossipsub._addBackoff(id, topic);
|
|
||||||
// remove peer from mesh
|
|
||||||
peers.delete(id);
|
|
||||||
// add to toPrune
|
|
||||||
const topics = toPrune.get(id);
|
|
||||||
if (!topics) {
|
|
||||||
toPrune.set(id, [topic]);
|
|
||||||
} else {
|
|
||||||
topics.push(topic);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
const graftPeer = (id: string): void => {
|
|
||||||
this.gossipsub.log("HEARTBEAT: Add mesh link to %s in %s", id, topic);
|
|
||||||
// update peer score
|
|
||||||
this.gossipsub.score.graft(id, topic);
|
|
||||||
// add peer to mesh
|
|
||||||
peers.add(id);
|
|
||||||
// add to toGraft
|
|
||||||
const topics = toGraft.get(id);
|
|
||||||
if (!topics) {
|
|
||||||
toGraft.set(id, [topic]);
|
|
||||||
} else {
|
|
||||||
topics.push(topic);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// drop all peers with negative score, without PX
|
|
||||||
peers.forEach((id) => {
|
|
||||||
const score = getScore(id);
|
|
||||||
if (score < 0) {
|
|
||||||
this.gossipsub.log(
|
|
||||||
"HEARTBEAT: Prune peer %s with negative score: score=%d, topic=%s",
|
|
||||||
id,
|
|
||||||
score,
|
|
||||||
topic
|
|
||||||
);
|
|
||||||
prunePeer(id);
|
|
||||||
noPX.set(id, true);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// do we have enough peers?
|
|
||||||
if (peers.size < Dlo) {
|
|
||||||
const backoff = this.gossipsub.backoff.get(topic);
|
|
||||||
const ineed = D - peers.size;
|
|
||||||
const peersSet = getRelayPeers(
|
|
||||||
this.gossipsub,
|
|
||||||
topic,
|
|
||||||
ineed,
|
|
||||||
(id: string) => {
|
|
||||||
// filter out mesh peers, direct peers, peers we are backing off, peers with negative score
|
|
||||||
return (
|
|
||||||
!peers.has(id) &&
|
|
||||||
!this.gossipsub.direct.has(id) &&
|
|
||||||
(!backoff || !backoff.has(id)) &&
|
|
||||||
getScore(id) >= 0
|
|
||||||
);
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
peersSet.forEach(graftPeer);
|
|
||||||
}
|
|
||||||
|
|
||||||
// do we have to many peers?
|
|
||||||
if (peers.size > Dhi) {
|
|
||||||
let peersArray = Array.from(peers);
|
|
||||||
// sort by score
|
|
||||||
peersArray.sort((a, b) => getScore(b) - getScore(a));
|
|
||||||
// We keep the first D_score peers by score and the remaining up to D randomly
|
|
||||||
// under the constraint that we keep D_out peers in the mesh (if we have that many)
|
|
||||||
peersArray = peersArray
|
|
||||||
.slice(0, Dscore)
|
|
||||||
.concat(shuffle(peersArray.slice(Dscore)));
|
|
||||||
|
|
||||||
// count the outbound peers we are keeping
|
|
||||||
let outbound = 0;
|
|
||||||
peersArray.slice(0, D).forEach((p) => {
|
|
||||||
if (this.gossipsub.outbound.get(p)) {
|
|
||||||
outbound++;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// if it's less than D_out, bubble up some outbound peers from the random selection
|
|
||||||
if (outbound < Dout) {
|
|
||||||
const rotate = (i: number): void => {
|
|
||||||
// rotate the peersArray to the right and put the ith peer in the front
|
|
||||||
const p = peersArray[i];
|
|
||||||
for (let j = i; j > 0; j--) {
|
|
||||||
peersArray[j] = peersArray[j - 1];
|
|
||||||
}
|
|
||||||
peersArray[0] = p;
|
|
||||||
};
|
|
||||||
|
|
||||||
// first bubble up all outbound peers already in the selection to the front
|
|
||||||
if (outbound > 0) {
|
|
||||||
let ihave = outbound;
|
|
||||||
for (let i = 1; i < D && ihave > 0; i++) {
|
|
||||||
if (this.gossipsub.outbound.get(peersArray[i])) {
|
|
||||||
rotate(i);
|
|
||||||
ihave--;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// now bubble up enough outbound peers outside the selection to the front
|
|
||||||
let ineed = D - outbound;
|
|
||||||
for (let i = D; i < peersArray.length && ineed > 0; i++) {
|
|
||||||
if (this.gossipsub.outbound.get(peersArray[i])) {
|
|
||||||
rotate(i);
|
|
||||||
ineed--;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// prune the excess peers
|
|
||||||
peersArray.slice(D).forEach(prunePeer);
|
|
||||||
}
|
|
||||||
|
|
||||||
// do we have enough outbound peers?
|
|
||||||
if (peers.size >= Dlo) {
|
|
||||||
// count the outbound peers we have
|
|
||||||
let outbound = 0;
|
|
||||||
peers.forEach((p) => {
|
|
||||||
if (this.gossipsub.outbound.get(p)) {
|
|
||||||
outbound++;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// if it's less than D_out, select some peers with outbound connections and graft them
|
|
||||||
if (outbound < Dout) {
|
|
||||||
const ineed = Dout - outbound;
|
|
||||||
const backoff = this.gossipsub.backoff.get(topic);
|
|
||||||
getRelayPeers(this.gossipsub, topic, ineed, (id: string): boolean => {
|
|
||||||
// filter our current mesh peers, direct peers, peers we are backing off, peers with negative score
|
|
||||||
return (
|
|
||||||
!peers.has(id) &&
|
|
||||||
!this.gossipsub.direct.has(id) &&
|
|
||||||
(!backoff || !backoff.has(id)) &&
|
|
||||||
getScore(id) >= 0
|
|
||||||
);
|
|
||||||
}).forEach(graftPeer);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// should we try to improve the mesh with opportunistic grafting?
|
|
||||||
if (
|
|
||||||
this.gossipsub.heartbeatTicks %
|
|
||||||
constants.RelayOpportunisticGraftTicks ===
|
|
||||||
0 &&
|
|
||||||
peers.size > 1
|
|
||||||
) {
|
|
||||||
// Opportunistic grafting works as follows: we check the median score of peers in the
|
|
||||||
// mesh; if this score is below the opportunisticGraftThreshold, we select a few peers at
|
|
||||||
// random with score over the median.
|
|
||||||
// The intention is to (slowly) improve an under performing mesh by introducing good
|
|
||||||
// scoring peers that may have been gossiping at us. This allows us to get out of sticky
|
|
||||||
// situations where we are stuck with poor peers and also recover from churn of good peers.
|
|
||||||
|
|
||||||
// now compute the median peer score in the mesh
|
|
||||||
const peersList = Array.from(peers).sort(
|
|
||||||
(a, b) => getScore(a) - getScore(b)
|
|
||||||
);
|
|
||||||
const medianIndex = Math.floor(peers.size / 2);
|
|
||||||
const medianScore = getScore(peersList[medianIndex]);
|
|
||||||
|
|
||||||
// if the median score is below the threshold, select a better peer (if any) and GRAFT
|
|
||||||
if (
|
|
||||||
medianScore <
|
|
||||||
this.gossipsub._options.scoreThresholds.opportunisticGraftThreshold
|
|
||||||
) {
|
|
||||||
const backoff = this.gossipsub.backoff.get(topic);
|
|
||||||
const peersToGraft = getRelayPeers(
|
|
||||||
this.gossipsub,
|
|
||||||
topic,
|
|
||||||
constants.RelayOpportunisticGraftPeers,
|
|
||||||
(id: string): boolean => {
|
|
||||||
// filter out current mesh peers, direct peers, peers we are backing off, peers below or at threshold
|
|
||||||
return (
|
|
||||||
peers.has(id) &&
|
|
||||||
!this.gossipsub.direct.has(id) &&
|
|
||||||
(!backoff || !backoff.has(id)) &&
|
|
||||||
getScore(id) > medianScore
|
|
||||||
);
|
|
||||||
}
|
|
||||||
);
|
|
||||||
peersToGraft.forEach((id: string) => {
|
|
||||||
this.gossipsub.log(
|
|
||||||
"HEARTBEAT: Opportunistically graft peer %s on topic %s",
|
|
||||||
id,
|
|
||||||
topic
|
|
||||||
);
|
|
||||||
graftPeer(id);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 2nd arg are mesh peers excluded from gossip. We have already pushed
|
|
||||||
// messages to them, so its redundant to gossip IHAVEs.
|
|
||||||
this.gossipsub._emitGossip(topic, peers);
|
|
||||||
});
|
|
||||||
|
|
||||||
// expire fanout for topics we haven't published to in a while
|
|
||||||
const now = this.gossipsub._now();
|
|
||||||
this.gossipsub.lastpub.forEach((lastpub, topic) => {
|
|
||||||
if (lastpub + constants.RelayFanoutTTL < now) {
|
|
||||||
this.gossipsub.fanout.delete(topic);
|
|
||||||
this.gossipsub.lastpub.delete(topic);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// maintain our fanout for topics we are publishing but we have not joined
|
|
||||||
this.gossipsub.fanout.forEach((fanoutPeers, topic) => {
|
|
||||||
// checks whether our peers are still in the topic and have a score above the publish threshold
|
|
||||||
const topicPeers = this.gossipsub.topics.get(topic);
|
|
||||||
fanoutPeers.forEach((id) => {
|
|
||||||
if (
|
|
||||||
!topicPeers?.has(id) ||
|
|
||||||
getScore(id) <
|
|
||||||
this.gossipsub._options.scoreThresholds.publishThreshold
|
|
||||||
) {
|
|
||||||
fanoutPeers.delete(id);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// do we need more peers?
|
|
||||||
if (fanoutPeers.size < D) {
|
|
||||||
const ineed = D - fanoutPeers.size;
|
|
||||||
const peersSet = getRelayPeers(
|
|
||||||
this.gossipsub,
|
|
||||||
topic,
|
|
||||||
ineed,
|
|
||||||
(id: string): boolean => {
|
|
||||||
// filter out existing fanout peers, direct peers, and peers with score above the publish threshold
|
|
||||||
return (
|
|
||||||
!fanoutPeers.has(id) &&
|
|
||||||
!this.gossipsub.direct.has(id) &&
|
|
||||||
getScore(id) >=
|
|
||||||
this.gossipsub._options.scoreThresholds.publishThreshold
|
|
||||||
);
|
|
||||||
}
|
|
||||||
);
|
|
||||||
peersSet.forEach((id: string) => {
|
|
||||||
fanoutPeers.add(id);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
// 2nd arg are fanout peers excluded from gossip.
|
|
||||||
// We have already pushed messages to them, so its redundant to gossip IHAVEs
|
|
||||||
this.gossipsub._emitGossip(topic, fanoutPeers);
|
|
||||||
});
|
|
||||||
|
|
||||||
// send coalesced GRAFT/PRUNE messages (will piggyback gossip)
|
|
||||||
this.gossipsub._sendGraftPrune(toGraft, toPrune, noPX);
|
|
||||||
|
|
||||||
// flush pending gossip that wasn't piggybacked above
|
|
||||||
this.gossipsub._flush();
|
|
||||||
|
|
||||||
// advance the message history window
|
|
||||||
this.gossipsub.messageCache.shift();
|
|
||||||
|
|
||||||
this.gossipsub.emit("gossipsub:heartbeat");
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue