mirror of https://github.com/waku-org/js-waku.git
Merge #22
22: Fix js to js test r=D4nte a=D4nte Several flows in gossipsub check against the peer's protocol. We need to override these instances to ensure it checks against waku relay protocol instead. Fixes #7. Co-authored-by: Franck Royer <franck@royer.one>
This commit is contained in:
commit
3be57632f7
12
.cspell.json
12
.cspell.json
|
@ -3,6 +3,8 @@
|
||||||
"$schema": "https://raw.githubusercontent.com/streetsidesoftware/cspell/master/cspell.schema.json",
|
"$schema": "https://raw.githubusercontent.com/streetsidesoftware/cspell/master/cspell.schema.json",
|
||||||
"language": "en",
|
"language": "en",
|
||||||
"words": [
|
"words": [
|
||||||
|
"backoff",
|
||||||
|
"backoffs",
|
||||||
"bitjson",
|
"bitjson",
|
||||||
"bitauth",
|
"bitauth",
|
||||||
"bufbuild",
|
"bufbuild",
|
||||||
|
@ -11,6 +13,9 @@
|
||||||
"codecov",
|
"codecov",
|
||||||
"commitlint",
|
"commitlint",
|
||||||
"dependabot",
|
"dependabot",
|
||||||
|
"Dlazy",
|
||||||
|
"Dout",
|
||||||
|
"Dscore",
|
||||||
"editorconfig",
|
"editorconfig",
|
||||||
"esnext",
|
"esnext",
|
||||||
"execa",
|
"execa",
|
||||||
|
@ -18,6 +23,11 @@
|
||||||
"fanout",
|
"fanout",
|
||||||
"globby",
|
"globby",
|
||||||
"gossipsub",
|
"gossipsub",
|
||||||
|
"iasked",
|
||||||
|
"ihave",
|
||||||
|
"ihaves",
|
||||||
|
"ineed",
|
||||||
|
"iwant",
|
||||||
"lastpub",
|
"lastpub",
|
||||||
"libauth",
|
"libauth",
|
||||||
"libp",
|
"libp",
|
||||||
|
@ -28,6 +38,7 @@
|
||||||
"mplex",
|
"mplex",
|
||||||
"muxer",
|
"muxer",
|
||||||
"nodekey",
|
"nodekey",
|
||||||
|
"peerhave",
|
||||||
"prettierignore",
|
"prettierignore",
|
||||||
"protobuf",
|
"protobuf",
|
||||||
"protoc",
|
"protoc",
|
||||||
|
@ -38,6 +49,7 @@
|
||||||
"submodules",
|
"submodules",
|
||||||
"transpiled",
|
"transpiled",
|
||||||
"typedoc",
|
"typedoc",
|
||||||
|
"unmounts",
|
||||||
"untracked",
|
"untracked",
|
||||||
"upgrader",
|
"upgrader",
|
||||||
"waku",
|
"waku",
|
||||||
|
|
|
@ -16,10 +16,12 @@ To run the chat app:
|
||||||
|
|
||||||
```shell
|
```shell
|
||||||
npm install
|
npm install
|
||||||
npm run chat:app -- --staticNode /ip4/134.209.139.210/tcp/30303/p2p/16Uiu2HAmPLe7Mzm8TsYUubgCAW1aJoeFScxrLj8ppHFivPo97bUZ --listenAddr /ip4/0.0.0.0/tcp/55123
|
npm run chat:app -- --staticNode /ip4/134.209.139.210/tcp/30303/p2p/16Uiu2HAmPLe7Mzm8TsYUubgCAW1aJoeFScxrLj8ppHFivPo97bUZ
|
||||||
```
|
```
|
||||||
|
|
||||||
The `--listenAddr` parameter is optional, however [NAT passthrough](https://github.com/status-im/js-waku/issues/12) is not yet supported, so you'll need the listening port to be open to receive messages from the fleet.
|
You can also specify an optional `listenAddr` parameter (.e.g `--listenAddr /ip4/0.0.0.0/tcp/55123`).
|
||||||
|
This is only useful if you want a remote node to dial to your chat app,
|
||||||
|
it is not necessary in normal usage when you just connect to the fleet.
|
||||||
|
|
||||||
## Contributing
|
## Contributing
|
||||||
|
|
||||||
|
|
|
@ -3,8 +3,8 @@ import util from 'util';
|
||||||
|
|
||||||
import Waku from '../lib/waku';
|
import Waku from '../lib/waku';
|
||||||
import { WakuMessage } from '../lib/waku_message';
|
import { WakuMessage } from '../lib/waku_message';
|
||||||
import { TOPIC } from '../lib/waku_relay';
|
import { RelayDefaultTopic } from '../lib/waku_relay';
|
||||||
import { delay } from '../test_utils/delay';
|
import { delay } from '../test_utils/';
|
||||||
|
|
||||||
import { ChatMessage } from './chat_message';
|
import { ChatMessage } from './chat_message';
|
||||||
|
|
||||||
|
@ -29,7 +29,7 @@ import { ChatMessage } from './chat_message';
|
||||||
|
|
||||||
// TODO: Bubble event to waku, infer topic, decode msg
|
// TODO: Bubble event to waku, infer topic, decode msg
|
||||||
// Tracked with https://github.com/status-im/js-waku/issues/19
|
// Tracked with https://github.com/status-im/js-waku/issues/19
|
||||||
waku.libp2p.pubsub.on(TOPIC, (event) => {
|
waku.libp2p.pubsub.on(RelayDefaultTopic, (event) => {
|
||||||
const wakuMsg = WakuMessage.decode(event.data);
|
const wakuMsg = WakuMessage.decode(event.data);
|
||||||
if (wakuMsg.payload) {
|
if (wakuMsg.payload) {
|
||||||
const chatMsg = ChatMessage.decode(wakuMsg.payload);
|
const chatMsg = ChatMessage.decode(wakuMsg.payload);
|
||||||
|
|
|
@ -1,11 +1,9 @@
|
||||||
import { expect } from 'chai';
|
import { expect } from 'chai';
|
||||||
|
|
||||||
import { NOISE_KEY_1 } from '../test_utils/constants';
|
import { makeLogFileName, NimWaku, NOISE_KEY_1 } from '../test_utils/';
|
||||||
import { makeLogFileName } from '../test_utils/log_file';
|
|
||||||
import { NimWaku } from '../test_utils/nim_waku';
|
|
||||||
|
|
||||||
import Waku from './waku';
|
import Waku from './waku';
|
||||||
import { CODEC } from './waku_relay';
|
import { RelayCodec } from './waku_relay';
|
||||||
|
|
||||||
describe('Waku', function () {
|
describe('Waku', function () {
|
||||||
describe('Interop: Nim', function () {
|
describe('Interop: Nim', function () {
|
||||||
|
@ -28,7 +26,7 @@ describe('Waku', function () {
|
||||||
expect(nimPeers).to.deep.equal([
|
expect(nimPeers).to.deep.equal([
|
||||||
{
|
{
|
||||||
multiaddr: multiAddrWithId,
|
multiaddr: multiAddrWithId,
|
||||||
protocol: CODEC,
|
protocol: RelayCodec,
|
||||||
connected: true,
|
connected: true,
|
||||||
},
|
},
|
||||||
]);
|
]);
|
||||||
|
|
|
@ -6,7 +6,7 @@ import TCP from 'libp2p-tcp';
|
||||||
import Multiaddr from 'multiaddr';
|
import Multiaddr from 'multiaddr';
|
||||||
import PeerId from 'peer-id';
|
import PeerId from 'peer-id';
|
||||||
|
|
||||||
import { CODEC, WakuRelay, WakuRelayPubsub } from './waku_relay';
|
import { RelayCodec, WakuRelay, WakuRelayPubsub } from './waku_relay';
|
||||||
|
|
||||||
export interface CreateOptions {
|
export interface CreateOptions {
|
||||||
listenAddresses: string[];
|
listenAddresses: string[];
|
||||||
|
@ -56,12 +56,12 @@ export default class Waku {
|
||||||
* @param peer The peer to dial
|
* @param peer The peer to dial
|
||||||
*/
|
*/
|
||||||
async dial(peer: PeerId | Multiaddr | string) {
|
async dial(peer: PeerId | Multiaddr | string) {
|
||||||
return this.libp2p.dialProtocol(peer, CODEC);
|
return this.libp2p.dialProtocol(peer, RelayCodec);
|
||||||
}
|
}
|
||||||
|
|
||||||
async dialWithMultiAddr(peerId: PeerId, multiaddr: Multiaddr[]) {
|
async dialWithMultiAddr(peerId: PeerId, multiaddr: Multiaddr[]) {
|
||||||
this.libp2p.peerStore.addressBook.set(peerId, multiaddr);
|
this.libp2p.peerStore.addressBook.set(peerId, multiaddr);
|
||||||
await this.libp2p.dialProtocol(peerId, CODEC);
|
await this.libp2p.dialProtocol(peerId, RelayCodec);
|
||||||
}
|
}
|
||||||
|
|
||||||
async stop() {
|
async stop() {
|
||||||
|
|
|
@ -1,106 +0,0 @@
|
||||||
import Gossipsub from 'libp2p-gossipsub';
|
|
||||||
import { Libp2p } from 'libp2p-gossipsub/src/interfaces';
|
|
||||||
import Pubsub from 'libp2p-interfaces/src/pubsub';
|
|
||||||
import { SignaturePolicy } from 'libp2p-interfaces/src/pubsub/signature-policy';
|
|
||||||
|
|
||||||
import { getWakuPeers } from './get_waku_peers';
|
|
||||||
import { WakuMessage } from './waku_message';
|
|
||||||
|
|
||||||
export const CODEC = '/vac/waku/relay/2.0.0-beta2';
|
|
||||||
|
|
||||||
// As per waku specs, the topic is fixed, value taken from nim-waku
|
|
||||||
export const TOPIC = '/waku/2/default-waku/proto';
|
|
||||||
|
|
||||||
// This is the class to pass to libp2p as pubsub protocol
|
|
||||||
export class WakuRelayPubsub extends Gossipsub {
|
|
||||||
/**
|
|
||||||
*
|
|
||||||
* @param libp2p: Libp2p
|
|
||||||
*/
|
|
||||||
constructor(libp2p: Libp2p) {
|
|
||||||
super(libp2p, {
|
|
||||||
emitSelf: false,
|
|
||||||
// Ensure that no signature is expected in the messages.
|
|
||||||
globalSignaturePolicy: SignaturePolicy.StrictNoSign,
|
|
||||||
});
|
|
||||||
|
|
||||||
const multicodecs = [CODEC];
|
|
||||||
|
|
||||||
// This is the downside of using `libp2p-gossipsub` instead of
|
|
||||||
// implementing WakuRelay from scratch.
|
|
||||||
Object.assign(this, { multicodecs });
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Join topic
|
|
||||||
* @param {string} topic
|
|
||||||
* @returns {void}
|
|
||||||
* @override
|
|
||||||
*/
|
|
||||||
join(topic: string): void {
|
|
||||||
if (!this.started) {
|
|
||||||
throw new Error('WakuRelayPubsub has not started');
|
|
||||||
}
|
|
||||||
|
|
||||||
const fanoutPeers = this.fanout.get(topic);
|
|
||||||
if (fanoutPeers) {
|
|
||||||
// these peers have a score above the publish threshold, which may be negative
|
|
||||||
// so drop the ones with a negative score
|
|
||||||
fanoutPeers.forEach((id) => {
|
|
||||||
if (this.score.score(id) < 0) {
|
|
||||||
fanoutPeers.delete(id);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
if (fanoutPeers.size < this._options.D) {
|
|
||||||
// we need more peers; eager, as this would get fixed in the next heartbeat
|
|
||||||
getWakuPeers(
|
|
||||||
this,
|
|
||||||
topic,
|
|
||||||
this._options.D - fanoutPeers.size,
|
|
||||||
(id: string): boolean => {
|
|
||||||
// filter our current peers, direct peers, and peers with negative scores
|
|
||||||
return (
|
|
||||||
!fanoutPeers.has(id) &&
|
|
||||||
!this.direct.has(id) &&
|
|
||||||
this.score.score(id) >= 0
|
|
||||||
);
|
|
||||||
}
|
|
||||||
).forEach((id) => fanoutPeers.add(id));
|
|
||||||
}
|
|
||||||
this.mesh.set(topic, fanoutPeers);
|
|
||||||
this.fanout.delete(topic);
|
|
||||||
this.lastpub.delete(topic);
|
|
||||||
} else {
|
|
||||||
const peers = getWakuPeers(
|
|
||||||
this,
|
|
||||||
topic,
|
|
||||||
this._options.D,
|
|
||||||
(id: string): boolean => {
|
|
||||||
// filter direct peers and peers with negative score
|
|
||||||
return !this.direct.has(id) && this.score.score(id) >= 0;
|
|
||||||
}
|
|
||||||
);
|
|
||||||
this.mesh.set(topic, peers);
|
|
||||||
}
|
|
||||||
this.mesh.get(topic)!.forEach((id) => {
|
|
||||||
this.log('JOIN: Add mesh link to %s in %s', id, topic);
|
|
||||||
this._sendGraft(id, topic);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: Implement dial for an address with format '/ip4/127.0.0.1/tcp/60000/p2p/16Uiu2HAkyzsXzENw5XBDYEQQAeQTCYjBJpMLgBmEXuwbtcrgxBJ4'
|
|
||||||
// This class provides an interface to execute the waku relay protocol
|
|
||||||
export class WakuRelay {
|
|
||||||
constructor(private pubsub: Pubsub) {}
|
|
||||||
|
|
||||||
// At this stage we are always using the same topic so we do not pass it as a parameter
|
|
||||||
async subscribe() {
|
|
||||||
await this.pubsub.subscribe(TOPIC);
|
|
||||||
}
|
|
||||||
|
|
||||||
async publish(message: WakuMessage) {
|
|
||||||
const msg = message.toBinary();
|
|
||||||
await this.pubsub.publish(TOPIC, msg);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -0,0 +1,77 @@
|
||||||
|
export const second = 1000;
|
||||||
|
export const minute = 60 * second;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* RelayCodec is the libp2p identifier for the waku relay protocol
|
||||||
|
*/
|
||||||
|
export const RelayCodec = '/vac/waku/relay/2.0.0-beta2';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* RelayDefaultTopic is the default gossipsub topic to use for waku relay
|
||||||
|
*/
|
||||||
|
export const RelayDefaultTopic = '/waku/2/default-waku/proto';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* RelayGossipFactor affects how many peers we will emit gossip to at each heartbeat.
|
||||||
|
* We will send gossip to RelayGossipFactor * (total number of non-mesh peers), or
|
||||||
|
* RelayDlazy, whichever is greater.
|
||||||
|
*/
|
||||||
|
export declare const RelayGossipFactor = 0.25;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* GossipsubHeartbeatInitialDelay is the short delay before the heartbeat timer begins
|
||||||
|
* after the router is initialized.
|
||||||
|
*/
|
||||||
|
export const RelayHeartbeatInitialDelay = 100;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* RelayHeartbeatInterval controls the time between heartbeats.
|
||||||
|
*/
|
||||||
|
export const RelayHeartbeatInterval = second;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* RelayPrunePeers controls the number of peers to include in prune Peer eXchange.
|
||||||
|
* When we prune a peer that's eligible for PX (has a good score, etc), we will try to
|
||||||
|
* send them signed peer records for up to RelayPrunePeers other peers that we
|
||||||
|
* know of.
|
||||||
|
*/
|
||||||
|
export const RelayPrunePeers = 16;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* RelayPruneBackoff controls the backoff time for pruned peers. This is how long
|
||||||
|
* a peer must wait before attempting to graft into our mesh again after being pruned.
|
||||||
|
* When pruning a peer, we send them our value of RelayPruneBackoff so they know
|
||||||
|
* the minimum time to wait. Peers running older versions may not send a backoff time,
|
||||||
|
* so if we receive a prune message without one, we will wait at least RelayPruneBackoff
|
||||||
|
* before attempting to re-graft.
|
||||||
|
*/
|
||||||
|
export const RelayPruneBackoff = minute;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* RelayFanoutTTL controls how long we keep track of the fanout state. If it's been
|
||||||
|
* RelayFanoutTTL since we've published to a topic that we're not subscribed to,
|
||||||
|
* we'll delete the fanout map for that topic.
|
||||||
|
*/
|
||||||
|
export const RelayFanoutTTL = minute;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* RelayOpportunisticGraftTicks is the number of heartbeat ticks for attempting to improve the mesh
|
||||||
|
* with opportunistic grafting. Every RelayOpportunisticGraftTicks we will attempt to select some
|
||||||
|
* high-scoring mesh peers to replace lower-scoring ones, if the median score of our mesh peers falls
|
||||||
|
* below a threshold
|
||||||
|
*/
|
||||||
|
export const RelayOpportunisticGraftTicks = 60;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* RelayOpportunisticGraftPeers is the number of peers to opportunistically graft.
|
||||||
|
*/
|
||||||
|
export const RelayOpportunisticGraftPeers = 2;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* RelayMaxIHaveLength is the maximum number of messages to include in an IHAVE message.
|
||||||
|
* Also controls the maximum number of IHAVE ids we will accept and request with IWANT from a
|
||||||
|
* peer within a heartbeat, to protect from IHAVE floods. You should adjust this value from the
|
||||||
|
* default if your system is pushing more than 5000 messages in GossipsubHistoryGossip heartbeats;
|
||||||
|
* with the defaults this is 1666 messages/s.
|
||||||
|
*/
|
||||||
|
export declare const RelayMaxIHaveLength = 5000;
|
|
@ -1,6 +1,6 @@
|
||||||
import { shuffle } from 'libp2p-gossipsub/src/utils';
|
import { shuffle } from 'libp2p-gossipsub/src/utils';
|
||||||
|
|
||||||
import { CODEC, WakuRelayPubsub } from './waku_relay';
|
import { RelayCodec, WakuRelayPubsub } from './index';
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Given a topic, returns up to count peers subscribed to that topic
|
* Given a topic, returns up to count peers subscribed to that topic
|
||||||
|
@ -13,7 +13,7 @@ import { CODEC, WakuRelayPubsub } from './waku_relay';
|
||||||
* @returns {Set<string>}
|
* @returns {Set<string>}
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
export function getWakuPeers(
|
export function getRelayPeers(
|
||||||
router: WakuRelayPubsub,
|
router: WakuRelayPubsub,
|
||||||
topic: string,
|
topic: string,
|
||||||
count: number,
|
count: number,
|
||||||
|
@ -27,12 +27,12 @@ export function getWakuPeers(
|
||||||
// Adds all peers using our protocol
|
// Adds all peers using our protocol
|
||||||
// that also pass the filter function
|
// that also pass the filter function
|
||||||
let peers: string[] = [];
|
let peers: string[] = [];
|
||||||
peersInTopic.forEach((id) => {
|
peersInTopic.forEach((id: string) => {
|
||||||
const peerStreams = router.peers.get(id);
|
const peerStreams = router.peers.get(id);
|
||||||
if (!peerStreams) {
|
if (!peerStreams) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (peerStreams.protocol == CODEC && filter(id)) {
|
if (peerStreams.protocol == RelayCodec && filter(id)) {
|
||||||
peers.push(id);
|
peers.push(id);
|
||||||
}
|
}
|
||||||
});
|
});
|
|
@ -1,14 +1,14 @@
|
||||||
import { expect } from 'chai';
|
import { expect } from 'chai';
|
||||||
import Pubsub from 'libp2p-interfaces/src/pubsub';
|
import Pubsub from 'libp2p-interfaces/src/pubsub';
|
||||||
|
|
||||||
import { NOISE_KEY_1, NOISE_KEY_2 } from '../test_utils/constants';
|
import { NOISE_KEY_1, NOISE_KEY_2 } from '../../test_utils/constants';
|
||||||
import { delay } from '../test_utils/delay';
|
import { delay } from '../../test_utils/delay';
|
||||||
import { makeLogFileName } from '../test_utils/log_file';
|
import { makeLogFileName } from '../../test_utils/log_file';
|
||||||
import { NimWaku } from '../test_utils/nim_waku';
|
import { NimWaku } from '../../test_utils/nim_waku';
|
||||||
|
import Waku from '../waku';
|
||||||
|
import { WakuMessage } from '../waku_message';
|
||||||
|
|
||||||
import Waku from './waku';
|
import { RelayCodec, RelayDefaultTopic } from './index';
|
||||||
import { WakuMessage } from './waku_message';
|
|
||||||
import { CODEC, TOPIC } from './waku_relay';
|
|
||||||
|
|
||||||
describe('Waku Relay', () => {
|
describe('Waku Relay', () => {
|
||||||
afterEach(function () {
|
afterEach(function () {
|
||||||
|
@ -41,13 +41,15 @@ describe('Waku Relay', () => {
|
||||||
|
|
||||||
await Promise.all([
|
await Promise.all([
|
||||||
new Promise((resolve) =>
|
new Promise((resolve) =>
|
||||||
waku1.libp2p.pubsub.once('pubsub:subscription-change', (...args) =>
|
waku1.libp2p.pubsub.once(
|
||||||
resolve(args)
|
'pubsub:subscription-change',
|
||||||
|
(...args: any[]) => resolve(args)
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
new Promise((resolve) =>
|
new Promise((resolve) =>
|
||||||
waku2.libp2p.pubsub.once('pubsub:subscription-change', (...args) =>
|
waku2.libp2p.pubsub.once(
|
||||||
resolve(args)
|
'pubsub:subscription-change',
|
||||||
|
(...args: any[]) => resolve(args)
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
]);
|
]);
|
||||||
|
@ -59,8 +61,8 @@ describe('Waku Relay', () => {
|
||||||
});
|
});
|
||||||
|
|
||||||
it('Subscribe', async function () {
|
it('Subscribe', async function () {
|
||||||
const subscribers1 = waku1.libp2p.pubsub.getSubscribers(TOPIC);
|
const subscribers1 = waku1.libp2p.pubsub.getSubscribers(RelayDefaultTopic);
|
||||||
const subscribers2 = waku2.libp2p.pubsub.getSubscribers(TOPIC);
|
const subscribers2 = waku2.libp2p.pubsub.getSubscribers(RelayDefaultTopic);
|
||||||
|
|
||||||
expect(subscribers1).to.contain(waku2.libp2p.peerId.toB58String());
|
expect(subscribers1).to.contain(waku2.libp2p.peerId.toB58String());
|
||||||
expect(subscribers2).to.contain(waku1.libp2p.peerId.toB58String());
|
expect(subscribers2).to.contain(waku1.libp2p.peerId.toB58String());
|
||||||
|
@ -69,28 +71,17 @@ describe('Waku Relay', () => {
|
||||||
it('Register correct protocols', async function () {
|
it('Register correct protocols', async function () {
|
||||||
const protocols = Array.from(waku1.libp2p.upgrader.protocols.keys());
|
const protocols = Array.from(waku1.libp2p.upgrader.protocols.keys());
|
||||||
|
|
||||||
expect(protocols).to.contain(CODEC);
|
expect(protocols).to.contain(RelayCodec);
|
||||||
expect(protocols.findIndex((value) => value.match(/sub/))).to.eq(-1);
|
expect(protocols.findIndex((value) => value.match(/sub/))).to.eq(-1);
|
||||||
});
|
});
|
||||||
|
|
||||||
// TODO: Fix this
|
it('Publish', async function () {
|
||||||
it.skip('Publish', async function () {
|
|
||||||
this.timeout(10000);
|
this.timeout(10000);
|
||||||
|
|
||||||
const message = WakuMessage.fromUtf8String('JS to JS communication works');
|
const message = WakuMessage.fromUtf8String('JS to JS communication works');
|
||||||
// waku.libp2p.pubsub.globalSignaturePolicy = 'StrictSign';
|
|
||||||
|
|
||||||
const receivedPromise = waitForNextData(waku2.libp2p.pubsub);
|
const receivedPromise = waitForNextData(waku2.libp2p.pubsub);
|
||||||
|
|
||||||
await Promise.all([
|
|
||||||
new Promise((resolve) =>
|
|
||||||
waku1.libp2p.pubsub.once('gossipsub:heartbeat', resolve)
|
|
||||||
),
|
|
||||||
new Promise((resolve) =>
|
|
||||||
waku2.libp2p.pubsub.once('gossipsub:heartbeat', resolve)
|
|
||||||
),
|
|
||||||
]);
|
|
||||||
|
|
||||||
await waku1.relay.publish(message);
|
await waku1.relay.publish(message);
|
||||||
|
|
||||||
const receivedMsg = await receivedPromise;
|
const receivedMsg = await receivedPromise;
|
||||||
|
@ -133,7 +124,9 @@ describe('Waku Relay', () => {
|
||||||
|
|
||||||
it('nim subscribes to js', async function () {
|
it('nim subscribes to js', async function () {
|
||||||
const nimPeerId = await nimWaku.getPeerId();
|
const nimPeerId = await nimWaku.getPeerId();
|
||||||
const subscribers = waku.libp2p.pubsub.getSubscribers(TOPIC);
|
const subscribers = waku.libp2p.pubsub.getSubscribers(
|
||||||
|
RelayDefaultTopic
|
||||||
|
);
|
||||||
|
|
||||||
expect(subscribers).to.contain(nimPeerId.toB58String());
|
expect(subscribers).to.contain(nimPeerId.toB58String());
|
||||||
});
|
});
|
||||||
|
@ -205,7 +198,9 @@ describe('Waku Relay', () => {
|
||||||
});
|
});
|
||||||
|
|
||||||
it('nim subscribes to js', async function () {
|
it('nim subscribes to js', async function () {
|
||||||
const subscribers = waku.libp2p.pubsub.getSubscribers(TOPIC);
|
const subscribers = waku.libp2p.pubsub.getSubscribers(
|
||||||
|
RelayDefaultTopic
|
||||||
|
);
|
||||||
|
|
||||||
const nimPeerId = await nimWaku.getPeerId();
|
const nimPeerId = await nimWaku.getPeerId();
|
||||||
expect(subscribers).to.contain(nimPeerId.toB58String());
|
expect(subscribers).to.contain(nimPeerId.toB58String());
|
||||||
|
@ -274,7 +269,9 @@ describe('Waku Relay', () => {
|
||||||
});
|
});
|
||||||
|
|
||||||
it('nim subscribes to js', async function () {
|
it('nim subscribes to js', async function () {
|
||||||
const subscribers = waku.libp2p.pubsub.getSubscribers(TOPIC);
|
const subscribers = waku.libp2p.pubsub.getSubscribers(
|
||||||
|
RelayDefaultTopic
|
||||||
|
);
|
||||||
|
|
||||||
const nimPeerId = await nimWaku.getPeerId();
|
const nimPeerId = await nimWaku.getPeerId();
|
||||||
expect(subscribers).to.contain(nimPeerId.toB58String());
|
expect(subscribers).to.contain(nimPeerId.toB58String());
|
||||||
|
@ -390,7 +387,7 @@ describe('Waku Relay', () => {
|
||||||
|
|
||||||
function waitForNextData(pubsub: Pubsub): Promise<WakuMessage> {
|
function waitForNextData(pubsub: Pubsub): Promise<WakuMessage> {
|
||||||
return new Promise((resolve) => {
|
return new Promise((resolve) => {
|
||||||
pubsub.once(TOPIC, resolve);
|
pubsub.once(RelayDefaultTopic, resolve);
|
||||||
}).then((msg: any) => {
|
}).then((msg: any) => {
|
||||||
return WakuMessage.decode(msg.data);
|
return WakuMessage.decode(msg.data);
|
||||||
});
|
});
|
|
@ -0,0 +1,308 @@
|
||||||
|
import Gossipsub from 'libp2p-gossipsub';
|
||||||
|
import { Libp2p } from 'libp2p-gossipsub/src/interfaces';
|
||||||
|
import { ControlPrune, PeerInfo } from 'libp2p-gossipsub/src/message';
|
||||||
|
import {
|
||||||
|
createGossipRpc,
|
||||||
|
messageIdToString,
|
||||||
|
shuffle,
|
||||||
|
} from 'libp2p-gossipsub/src/utils';
|
||||||
|
import Pubsub, { InMessage } from 'libp2p-interfaces/src/pubsub';
|
||||||
|
import { SignaturePolicy } from 'libp2p-interfaces/src/pubsub/signature-policy';
|
||||||
|
import PeerId from 'peer-id';
|
||||||
|
|
||||||
|
import { WakuMessage } from '../waku_message';
|
||||||
|
|
||||||
|
import * as constants from './constants';
|
||||||
|
import { getRelayPeers } from './get_relay_peers';
|
||||||
|
import { RelayHeartbeat } from './relay_heartbeat';
|
||||||
|
|
||||||
|
export * from './constants';
|
||||||
|
export * from './relay_heartbeat';
|
||||||
|
|
||||||
|
// This is the class to pass to libp2p as pubsub protocol
|
||||||
|
export class WakuRelayPubsub extends Gossipsub {
|
||||||
|
heartbeat: RelayHeartbeat;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @param libp2p: Libp2p
|
||||||
|
*/
|
||||||
|
constructor(libp2p: Libp2p) {
|
||||||
|
super(libp2p, {
|
||||||
|
emitSelf: false,
|
||||||
|
// Ensure that no signature is expected in the messages.
|
||||||
|
globalSignaturePolicy: SignaturePolicy.StrictNoSign,
|
||||||
|
});
|
||||||
|
|
||||||
|
this.heartbeat = new RelayHeartbeat(this);
|
||||||
|
|
||||||
|
const multicodecs = [constants.RelayCodec];
|
||||||
|
|
||||||
|
// This is the downside of using `libp2p-gossipsub` instead of
|
||||||
|
// implementing WakuRelay from scratch.
|
||||||
|
Object.assign(this, { multicodecs });
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Join topic
|
||||||
|
* @param {string} topic
|
||||||
|
* @returns {void}
|
||||||
|
* @override
|
||||||
|
*/
|
||||||
|
join(topic: string): void {
|
||||||
|
if (!this.started) {
|
||||||
|
throw new Error('WakuRelayPubsub has not started');
|
||||||
|
}
|
||||||
|
|
||||||
|
const fanoutPeers = this.fanout.get(topic);
|
||||||
|
if (fanoutPeers) {
|
||||||
|
// these peers have a score above the publish threshold, which may be negative
|
||||||
|
// so drop the ones with a negative score
|
||||||
|
fanoutPeers.forEach((id) => {
|
||||||
|
if (this.score.score(id) < 0) {
|
||||||
|
fanoutPeers.delete(id);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
if (fanoutPeers.size < this._options.D) {
|
||||||
|
// we need more peers; eager, as this would get fixed in the next heartbeat
|
||||||
|
getRelayPeers(
|
||||||
|
this,
|
||||||
|
topic,
|
||||||
|
this._options.D - fanoutPeers.size,
|
||||||
|
(id: string): boolean => {
|
||||||
|
// filter our current peers, direct peers, and peers with negative scores
|
||||||
|
return (
|
||||||
|
!fanoutPeers.has(id) &&
|
||||||
|
!this.direct.has(id) &&
|
||||||
|
this.score.score(id) >= 0
|
||||||
|
);
|
||||||
|
}
|
||||||
|
).forEach((id) => fanoutPeers.add(id));
|
||||||
|
}
|
||||||
|
this.mesh.set(topic, fanoutPeers);
|
||||||
|
this.fanout.delete(topic);
|
||||||
|
this.lastpub.delete(topic);
|
||||||
|
} else {
|
||||||
|
const peers = getRelayPeers(
|
||||||
|
this,
|
||||||
|
topic,
|
||||||
|
this._options.D,
|
||||||
|
(id: string): boolean => {
|
||||||
|
// filter direct peers and peers with negative score
|
||||||
|
return !this.direct.has(id) && this.score.score(id) >= 0;
|
||||||
|
}
|
||||||
|
);
|
||||||
|
this.mesh.set(topic, peers);
|
||||||
|
}
|
||||||
|
this.mesh.get(topic)!.forEach((id) => {
|
||||||
|
this.log('JOIN: Add mesh link to %s in %s', id, topic);
|
||||||
|
this._sendGraft(id, topic);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Publish messages
|
||||||
|
*
|
||||||
|
* @override
|
||||||
|
* @param {InMessage} msg
|
||||||
|
* @returns {void}
|
||||||
|
*/
|
||||||
|
async _publish(msg: InMessage): Promise<void> {
|
||||||
|
if (msg.receivedFrom !== this.peerId.toB58String()) {
|
||||||
|
this.score.deliverMessage(msg);
|
||||||
|
this.gossipTracer.deliverMessage(msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
const msgID = this.getMsgId(msg);
|
||||||
|
const msgIdStr = messageIdToString(msgID);
|
||||||
|
// put in seen cache
|
||||||
|
this.seenCache.put(msgIdStr);
|
||||||
|
|
||||||
|
this.messageCache.put(msg);
|
||||||
|
|
||||||
|
const toSend = new Set<string>();
|
||||||
|
msg.topicIDs.forEach((topic) => {
|
||||||
|
const peersInTopic = this.topics.get(topic);
|
||||||
|
if (!peersInTopic) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// direct peers
|
||||||
|
this.direct.forEach((id) => {
|
||||||
|
toSend.add(id);
|
||||||
|
});
|
||||||
|
|
||||||
|
let meshPeers = this.mesh.get(topic);
|
||||||
|
if (!meshPeers || !meshPeers.size) {
|
||||||
|
// We are not in the mesh for topic, use fanout peers
|
||||||
|
meshPeers = this.fanout.get(topic);
|
||||||
|
if (!meshPeers) {
|
||||||
|
// If we are not in the fanout, then pick peers in topic above the publishThreshold
|
||||||
|
const peers = getRelayPeers(this, topic, this._options.D, (id) => {
|
||||||
|
return (
|
||||||
|
this.score.score(id) >=
|
||||||
|
this._options.scoreThresholds.publishThreshold
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
if (peers.size > 0) {
|
||||||
|
meshPeers = peers;
|
||||||
|
this.fanout.set(topic, peers);
|
||||||
|
} else {
|
||||||
|
meshPeers = new Set();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Store the latest publishing time
|
||||||
|
this.lastpub.set(topic, this._now());
|
||||||
|
}
|
||||||
|
|
||||||
|
meshPeers!.forEach((peer) => {
|
||||||
|
toSend.add(peer);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
// Publish messages to peers
|
||||||
|
const rpc = createGossipRpc([Gossipsub.utils.normalizeOutRpcMessage(msg)]);
|
||||||
|
toSend.forEach((id) => {
|
||||||
|
if (id === msg.from) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
this._sendRpc(id, rpc);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Emits gossip to peers in a particular topic
|
||||||
|
* @param {string} topic
|
||||||
|
* @param {Set<string>} exclude peers to exclude
|
||||||
|
* @returns {void}
|
||||||
|
*/
|
||||||
|
_emitGossip(topic: string, exclude: Set<string>): void {
|
||||||
|
const messageIDs = this.messageCache.getGossipIDs(topic);
|
||||||
|
if (!messageIDs.length) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// shuffle to emit in random order
|
||||||
|
shuffle(messageIDs);
|
||||||
|
|
||||||
|
// if we are emitting more than GossipsubMaxIHaveLength ids, truncate the list
|
||||||
|
if (messageIDs.length > constants.RelayMaxIHaveLength) {
|
||||||
|
// we do the truncation (with shuffling) per peer below
|
||||||
|
this.log(
|
||||||
|
'too many messages for gossip; will truncate IHAVE list (%d messages)',
|
||||||
|
messageIDs.length
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send gossip to GossipFactor peers above threshold with a minimum of D_lazy
|
||||||
|
// First we collect the peers above gossipThreshold that are not in the exclude set
|
||||||
|
// and then randomly select from that set
|
||||||
|
// We also exclude direct peers, as there is no reason to emit gossip to them
|
||||||
|
const peersToGossip: string[] = [];
|
||||||
|
const topicPeers = this.topics.get(topic);
|
||||||
|
if (!topicPeers) {
|
||||||
|
// no topic peers, no gossip
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
topicPeers.forEach((id) => {
|
||||||
|
const peerStreams = this.peers.get(id);
|
||||||
|
if (!peerStreams) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (
|
||||||
|
!exclude.has(id) &&
|
||||||
|
!this.direct.has(id) &&
|
||||||
|
peerStreams.protocol == constants.RelayCodec &&
|
||||||
|
this.score.score(id) >= this._options.scoreThresholds.gossipThreshold
|
||||||
|
) {
|
||||||
|
peersToGossip.push(id);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let target = this._options.Dlazy;
|
||||||
|
const factor = constants.RelayGossipFactor * peersToGossip.length;
|
||||||
|
if (factor > target) {
|
||||||
|
target = factor;
|
||||||
|
}
|
||||||
|
if (target > peersToGossip.length) {
|
||||||
|
target = peersToGossip.length;
|
||||||
|
} else {
|
||||||
|
shuffle(peersToGossip);
|
||||||
|
}
|
||||||
|
// Emit the IHAVE gossip to the selected peers up to the target
|
||||||
|
peersToGossip.slice(0, target).forEach((id) => {
|
||||||
|
let peerMessageIDs = messageIDs;
|
||||||
|
if (messageIDs.length > constants.RelayMaxIHaveLength) {
|
||||||
|
// shuffle and slice message IDs per peer so that we emit a different set for each peer
|
||||||
|
// we have enough redundancy in the system that this will significantly increase the message
|
||||||
|
// coverage when we do truncate
|
||||||
|
peerMessageIDs = shuffle(peerMessageIDs.slice()).slice(
|
||||||
|
0,
|
||||||
|
constants.RelayMaxIHaveLength
|
||||||
|
);
|
||||||
|
}
|
||||||
|
this._pushGossip(id, {
|
||||||
|
topicID: topic,
|
||||||
|
messageIDs: peerMessageIDs,
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Make a PRUNE control message for a peer in a topic
|
||||||
|
* @param {string} id
|
||||||
|
* @param {string} topic
|
||||||
|
* @param {boolean} doPX
|
||||||
|
* @returns {ControlPrune}
|
||||||
|
*/
|
||||||
|
_makePrune(id: string, topic: string, doPX: boolean): ControlPrune {
|
||||||
|
// backoff is measured in seconds
|
||||||
|
// RelayPruneBackoff is measured in milliseconds
|
||||||
|
const backoff = constants.RelayPruneBackoff / 1000;
|
||||||
|
const px: PeerInfo[] = [];
|
||||||
|
if (doPX) {
|
||||||
|
// select peers for Peer eXchange
|
||||||
|
const peers = getRelayPeers(
|
||||||
|
this,
|
||||||
|
topic,
|
||||||
|
constants.RelayPrunePeers,
|
||||||
|
(xid: string): boolean => {
|
||||||
|
return xid !== id && this.score.score(xid) >= 0;
|
||||||
|
}
|
||||||
|
);
|
||||||
|
peers.forEach((p) => {
|
||||||
|
// see if we have a signed record to send back; if we don't, just send
|
||||||
|
// the peer ID and let the pruned peer find them in the DHT -- we can't trust
|
||||||
|
// unsigned address records through PX anyways
|
||||||
|
// Finding signed records in the DHT is not supported at the time of writing in js-libp2p
|
||||||
|
const peerId = PeerId.createFromB58String(p);
|
||||||
|
px.push({
|
||||||
|
peerID: peerId.toBytes(),
|
||||||
|
signedPeerRecord: this._libp2p.peerStore.addressBook.getRawEnvelope(
|
||||||
|
peerId
|
||||||
|
),
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
return {
|
||||||
|
topicID: topic,
|
||||||
|
peers: px,
|
||||||
|
backoff: backoff,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// This class provides an interface to execute the waku relay protocol
|
||||||
|
export class WakuRelay {
|
||||||
|
constructor(private pubsub: Pubsub) {}
|
||||||
|
|
||||||
|
// At this stage we are always using the same topic so we do not pass it as a parameter
|
||||||
|
async subscribe() {
|
||||||
|
await this.pubsub.subscribe(constants.RelayDefaultTopic);
|
||||||
|
}
|
||||||
|
|
||||||
|
async publish(message: WakuMessage) {
|
||||||
|
const msg = message.toBinary();
|
||||||
|
await this.pubsub.publish(constants.RelayDefaultTopic, msg);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,372 @@
|
||||||
|
import Gossipsub from 'libp2p-gossipsub';
|
||||||
|
import { Heartbeat } from 'libp2p-gossipsub/src/heartbeat';
|
||||||
|
import { shuffle } from 'libp2p-gossipsub/src/utils';
|
||||||
|
|
||||||
|
import * as constants from './constants';
|
||||||
|
import { getRelayPeers } from './get_relay_peers';
|
||||||
|
|
||||||
|
export class RelayHeartbeat extends Heartbeat {
|
||||||
|
/**
|
||||||
|
* @param {Object} gossipsub
|
||||||
|
* @constructor
|
||||||
|
*/
|
||||||
|
constructor(gossipsub: Gossipsub) {
|
||||||
|
super(gossipsub);
|
||||||
|
}
|
||||||
|
|
||||||
|
start(): void {
|
||||||
|
if (this._heartbeatTimer) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const heartbeat = this._heartbeat.bind(this);
|
||||||
|
|
||||||
|
const timeout = setTimeout(() => {
|
||||||
|
heartbeat();
|
||||||
|
this._heartbeatTimer!.runPeriodically(
|
||||||
|
heartbeat,
|
||||||
|
constants.RelayHeartbeatInterval
|
||||||
|
);
|
||||||
|
}, constants.RelayHeartbeatInitialDelay);
|
||||||
|
|
||||||
|
this._heartbeatTimer = {
|
||||||
|
_intervalId: undefined,
|
||||||
|
runPeriodically: (fn, period) => {
|
||||||
|
this._heartbeatTimer!._intervalId = setInterval(fn, period);
|
||||||
|
},
|
||||||
|
cancel: () => {
|
||||||
|
clearTimeout(timeout);
|
||||||
|
clearInterval(this._heartbeatTimer!._intervalId as NodeJS.Timeout);
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unmounts the gossipsub protocol and shuts down every connection
|
||||||
|
* @override
|
||||||
|
* @returns {void}
|
||||||
|
*/
|
||||||
|
stop(): void {
|
||||||
|
if (!this._heartbeatTimer) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
this._heartbeatTimer.cancel();
|
||||||
|
this._heartbeatTimer = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Maintains the mesh and fanout maps in gossipsub.
|
||||||
|
*
|
||||||
|
* @returns {void}
|
||||||
|
*/
|
||||||
|
_heartbeat(): void {
|
||||||
|
const { D, Dlo, Dhi, Dscore, Dout } = this.gossipsub._options;
|
||||||
|
this.gossipsub.heartbeatTicks++;
|
||||||
|
|
||||||
|
// cache scores through the heartbeat
|
||||||
|
const scores = new Map<string, number>();
|
||||||
|
const getScore = (id: string): number => {
|
||||||
|
let s = scores.get(id);
|
||||||
|
if (s === undefined) {
|
||||||
|
s = this.gossipsub.score.score(id);
|
||||||
|
scores.set(id, s);
|
||||||
|
}
|
||||||
|
return s;
|
||||||
|
};
|
||||||
|
|
||||||
|
// peer id => topic[]
|
||||||
|
const toGraft = new Map<string, string[]>();
|
||||||
|
// peer id => topic[]
|
||||||
|
const toPrune = new Map<string, string[]>();
|
||||||
|
// peer id => don't px
|
||||||
|
const noPX = new Map<string, boolean>();
|
||||||
|
|
||||||
|
// clean up expired backoffs
|
||||||
|
this.gossipsub._clearBackoff();
|
||||||
|
|
||||||
|
// clean up peerhave/iasked counters
|
||||||
|
this.gossipsub.peerhave.clear();
|
||||||
|
this.gossipsub.iasked.clear();
|
||||||
|
|
||||||
|
// apply IWANT request penalties
|
||||||
|
this.gossipsub._applyIwantPenalties();
|
||||||
|
|
||||||
|
// ensure direct peers are connected
|
||||||
|
this.gossipsub._directConnect();
|
||||||
|
|
||||||
|
// maintain the mesh for topics we have joined
|
||||||
|
this.gossipsub.mesh.forEach((peers, topic) => {
|
||||||
|
// prune/graft helper functions (defined per topic)
|
||||||
|
const prunePeer = (id: string): void => {
|
||||||
|
this.gossipsub.log(
|
||||||
|
'HEARTBEAT: Remove mesh link to %s in %s',
|
||||||
|
id,
|
||||||
|
topic
|
||||||
|
);
|
||||||
|
// update peer score
|
||||||
|
this.gossipsub.score.prune(id, topic);
|
||||||
|
// add prune backoff record
|
||||||
|
this.gossipsub._addBackoff(id, topic);
|
||||||
|
// remove peer from mesh
|
||||||
|
peers.delete(id);
|
||||||
|
// add to toPrune
|
||||||
|
const topics = toPrune.get(id);
|
||||||
|
if (!topics) {
|
||||||
|
toPrune.set(id, [topic]);
|
||||||
|
} else {
|
||||||
|
topics.push(topic);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
const graftPeer = (id: string): void => {
|
||||||
|
this.gossipsub.log('HEARTBEAT: Add mesh link to %s in %s', id, topic);
|
||||||
|
// update peer score
|
||||||
|
this.gossipsub.score.graft(id, topic);
|
||||||
|
// add peer to mesh
|
||||||
|
peers.add(id);
|
||||||
|
// add to toGraft
|
||||||
|
const topics = toGraft.get(id);
|
||||||
|
if (!topics) {
|
||||||
|
toGraft.set(id, [topic]);
|
||||||
|
} else {
|
||||||
|
topics.push(topic);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// drop all peers with negative score, without PX
|
||||||
|
peers.forEach((id) => {
|
||||||
|
const score = getScore(id);
|
||||||
|
if (score < 0) {
|
||||||
|
this.gossipsub.log(
|
||||||
|
'HEARTBEAT: Prune peer %s with negative score: score=%d, topic=%s',
|
||||||
|
id,
|
||||||
|
score,
|
||||||
|
topic
|
||||||
|
);
|
||||||
|
prunePeer(id);
|
||||||
|
noPX.set(id, true);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// do we have enough peers?
|
||||||
|
if (peers.size < Dlo) {
|
||||||
|
const backoff = this.gossipsub.backoff.get(topic);
|
||||||
|
const ineed = D - peers.size;
|
||||||
|
const peersSet = getRelayPeers(
|
||||||
|
this.gossipsub,
|
||||||
|
topic,
|
||||||
|
ineed,
|
||||||
|
(id: string) => {
|
||||||
|
// filter out mesh peers, direct peers, peers we are backing off, peers with negative score
|
||||||
|
return (
|
||||||
|
!peers.has(id) &&
|
||||||
|
!this.gossipsub.direct.has(id) &&
|
||||||
|
(!backoff || !backoff.has(id)) &&
|
||||||
|
getScore(id) >= 0
|
||||||
|
);
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
peersSet.forEach(graftPeer);
|
||||||
|
}
|
||||||
|
|
||||||
|
// do we have to many peers?
|
||||||
|
if (peers.size > Dhi) {
|
||||||
|
let peersArray = Array.from(peers);
|
||||||
|
// sort by score
|
||||||
|
peersArray.sort((a, b) => getScore(b) - getScore(a));
|
||||||
|
// We keep the first D_score peers by score and the remaining up to D randomly
|
||||||
|
// under the constraint that we keep D_out peers in the mesh (if we have that many)
|
||||||
|
peersArray = peersArray
|
||||||
|
.slice(0, Dscore)
|
||||||
|
.concat(shuffle(peersArray.slice(Dscore)));
|
||||||
|
|
||||||
|
// count the outbound peers we are keeping
|
||||||
|
let outbound = 0;
|
||||||
|
peersArray.slice(0, D).forEach((p) => {
|
||||||
|
if (this.gossipsub.outbound.get(p)) {
|
||||||
|
outbound++;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// if it's less than D_out, bubble up some outbound peers from the random selection
|
||||||
|
if (outbound < Dout) {
|
||||||
|
const rotate = (i: number): void => {
|
||||||
|
// rotate the peersArray to the right and put the ith peer in the front
|
||||||
|
const p = peersArray[i];
|
||||||
|
for (let j = i; j > 0; j--) {
|
||||||
|
peersArray[j] = peersArray[j - 1];
|
||||||
|
}
|
||||||
|
peersArray[0] = p;
|
||||||
|
};
|
||||||
|
|
||||||
|
// first bubble up all outbound peers already in the selection to the front
|
||||||
|
if (outbound > 0) {
|
||||||
|
let ihave = outbound;
|
||||||
|
for (let i = 1; i < D && ihave > 0; i++) {
|
||||||
|
if (this.gossipsub.outbound.get(peersArray[i])) {
|
||||||
|
rotate(i);
|
||||||
|
ihave--;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// now bubble up enough outbound peers outside the selection to the front
|
||||||
|
let ineed = D - outbound;
|
||||||
|
for (let i = D; i < peersArray.length && ineed > 0; i++) {
|
||||||
|
if (this.gossipsub.outbound.get(peersArray[i])) {
|
||||||
|
rotate(i);
|
||||||
|
ineed--;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// prune the excess peers
|
||||||
|
peersArray.slice(D).forEach(prunePeer);
|
||||||
|
}
|
||||||
|
|
||||||
|
// do we have enough outbound peers?
|
||||||
|
if (peers.size >= Dlo) {
|
||||||
|
// count the outbound peers we have
|
||||||
|
let outbound = 0;
|
||||||
|
peers.forEach((p) => {
|
||||||
|
if (this.gossipsub.outbound.get(p)) {
|
||||||
|
outbound++;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// if it's less than D_out, select some peers with outbound connections and graft them
|
||||||
|
if (outbound < Dout) {
|
||||||
|
const ineed = Dout - outbound;
|
||||||
|
const backoff = this.gossipsub.backoff.get(topic);
|
||||||
|
getRelayPeers(this.gossipsub, topic, ineed, (id: string): boolean => {
|
||||||
|
// filter our current mesh peers, direct peers, peers we are backing off, peers with negative score
|
||||||
|
return (
|
||||||
|
!peers.has(id) &&
|
||||||
|
!this.gossipsub.direct.has(id) &&
|
||||||
|
(!backoff || !backoff.has(id)) &&
|
||||||
|
getScore(id) >= 0
|
||||||
|
);
|
||||||
|
}).forEach(graftPeer);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// should we try to improve the mesh with opportunistic grafting?
|
||||||
|
if (
|
||||||
|
this.gossipsub.heartbeatTicks %
|
||||||
|
constants.RelayOpportunisticGraftTicks ===
|
||||||
|
0 &&
|
||||||
|
peers.size > 1
|
||||||
|
) {
|
||||||
|
// Opportunistic grafting works as follows: we check the median score of peers in the
|
||||||
|
// mesh; if this score is below the opportunisticGraftThreshold, we select a few peers at
|
||||||
|
// random with score over the median.
|
||||||
|
// The intention is to (slowly) improve an under performing mesh by introducing good
|
||||||
|
// scoring peers that may have been gossiping at us. This allows us to get out of sticky
|
||||||
|
// situations where we are stuck with poor peers and also recover from churn of good peers.
|
||||||
|
|
||||||
|
// now compute the median peer score in the mesh
|
||||||
|
const peersList = Array.from(peers).sort(
|
||||||
|
(a, b) => getScore(a) - getScore(b)
|
||||||
|
);
|
||||||
|
const medianIndex = peers.size / 2;
|
||||||
|
const medianScore = getScore(peersList[medianIndex]);
|
||||||
|
|
||||||
|
// if the median score is below the threshold, select a better peer (if any) and GRAFT
|
||||||
|
if (
|
||||||
|
medianScore <
|
||||||
|
this.gossipsub._options.scoreThresholds.opportunisticGraftThreshold
|
||||||
|
) {
|
||||||
|
const backoff = this.gossipsub.backoff.get(topic);
|
||||||
|
const peersToGraft = getRelayPeers(
|
||||||
|
this.gossipsub,
|
||||||
|
topic,
|
||||||
|
constants.RelayOpportunisticGraftPeers,
|
||||||
|
(id: string): boolean => {
|
||||||
|
// filter out current mesh peers, direct peers, peers we are backing off, peers below or at threshold
|
||||||
|
return (
|
||||||
|
peers.has(id) &&
|
||||||
|
!this.gossipsub.direct.has(id) &&
|
||||||
|
(!backoff || !backoff.has(id)) &&
|
||||||
|
getScore(id) > medianScore
|
||||||
|
);
|
||||||
|
}
|
||||||
|
);
|
||||||
|
peersToGraft.forEach((id: string) => {
|
||||||
|
this.gossipsub.log(
|
||||||
|
'HEARTBEAT: Opportunistically graft peer %s on topic %s',
|
||||||
|
id,
|
||||||
|
topic
|
||||||
|
);
|
||||||
|
graftPeer(id);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2nd arg are mesh peers excluded from gossip. We have already pushed
|
||||||
|
// messages to them, so its redundant to gossip IHAVEs.
|
||||||
|
this.gossipsub._emitGossip(topic, peers);
|
||||||
|
});
|
||||||
|
|
||||||
|
// expire fanout for topics we haven't published to in a while
|
||||||
|
const now = this.gossipsub._now();
|
||||||
|
this.gossipsub.lastpub.forEach((lastpub, topic) => {
|
||||||
|
if (lastpub + constants.RelayFanoutTTL < now) {
|
||||||
|
this.gossipsub.fanout.delete(topic);
|
||||||
|
this.gossipsub.lastpub.delete(topic);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// maintain our fanout for topics we are publishing but we have not joined
|
||||||
|
this.gossipsub.fanout.forEach((fanoutPeers, topic) => {
|
||||||
|
// checks whether our peers are still in the topic and have a score above the publish threshold
|
||||||
|
const topicPeers = this.gossipsub.topics.get(topic);
|
||||||
|
fanoutPeers.forEach((id) => {
|
||||||
|
if (
|
||||||
|
!topicPeers!.has(id) ||
|
||||||
|
getScore(id) <
|
||||||
|
this.gossipsub._options.scoreThresholds.publishThreshold
|
||||||
|
) {
|
||||||
|
fanoutPeers.delete(id);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// do we need more peers?
|
||||||
|
if (fanoutPeers.size < D) {
|
||||||
|
const ineed = D - fanoutPeers.size;
|
||||||
|
const peersSet = getRelayPeers(
|
||||||
|
this.gossipsub,
|
||||||
|
topic,
|
||||||
|
ineed,
|
||||||
|
(id: string): boolean => {
|
||||||
|
// filter out existing fanout peers, direct peers, and peers with score above the publish threshold
|
||||||
|
return (
|
||||||
|
!fanoutPeers.has(id) &&
|
||||||
|
!this.gossipsub.direct.has(id) &&
|
||||||
|
getScore(id) >=
|
||||||
|
this.gossipsub._options.scoreThresholds.publishThreshold
|
||||||
|
);
|
||||||
|
}
|
||||||
|
);
|
||||||
|
peersSet.forEach((id: string) => {
|
||||||
|
fanoutPeers.add(id);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2nd arg are fanout peers excluded from gossip.
|
||||||
|
// We have already pushed messages to them, so its redundant to gossip IHAVEs
|
||||||
|
this.gossipsub._emitGossip(topic, fanoutPeers);
|
||||||
|
});
|
||||||
|
|
||||||
|
// send coalesced GRAFT/PRUNE messages (will piggyback gossip)
|
||||||
|
this.gossipsub._sendGraftPrune(toGraft, toPrune, noPX);
|
||||||
|
|
||||||
|
// flush pending gossip that wasn't piggybacked above
|
||||||
|
this.gossipsub._flush();
|
||||||
|
|
||||||
|
// advance the message history window
|
||||||
|
this.gossipsub.messageCache.shift();
|
||||||
|
|
||||||
|
this.gossipsub.emit('gossipsub:heartbeat');
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,5 @@
|
||||||
|
export * from './async_fs';
|
||||||
|
export * from './constants';
|
||||||
|
export * from './delay';
|
||||||
|
export * from './log_file';
|
||||||
|
export * from './nim_waku';
|
|
@ -8,7 +8,7 @@ import multiaddr from 'multiaddr';
|
||||||
import PeerId from 'peer-id';
|
import PeerId from 'peer-id';
|
||||||
|
|
||||||
import { WakuMessage } from '../lib/waku_message';
|
import { WakuMessage } from '../lib/waku_message';
|
||||||
import { TOPIC } from '../lib/waku_relay';
|
import { RelayDefaultTopic } from '../lib/waku_relay';
|
||||||
|
|
||||||
import { existsAsync, mkdirAsync, openAsync } from './async_fs';
|
import { existsAsync, mkdirAsync, openAsync } from './async_fs';
|
||||||
import waitForLine from './log_file';
|
import waitForLine from './log_file';
|
||||||
|
@ -145,7 +145,7 @@ export class NimWaku {
|
||||||
};
|
};
|
||||||
|
|
||||||
const res = await this.rpcCall('post_waku_v2_relay_v1_message', [
|
const res = await this.rpcCall('post_waku_v2_relay_v1_message', [
|
||||||
TOPIC,
|
RelayDefaultTopic,
|
||||||
rpcMessage,
|
rpcMessage,
|
||||||
]);
|
]);
|
||||||
|
|
||||||
|
@ -155,7 +155,9 @@ export class NimWaku {
|
||||||
async messages() {
|
async messages() {
|
||||||
this.checkProcess();
|
this.checkProcess();
|
||||||
|
|
||||||
const res = await this.rpcCall('get_waku_v2_relay_v1_messages', [TOPIC]);
|
const res = await this.rpcCall('get_waku_v2_relay_v1_messages', [
|
||||||
|
RelayDefaultTopic,
|
||||||
|
]);
|
||||||
|
|
||||||
return res.result;
|
return res.result;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue