From 7bd48b6220e935408021896c39ed9c30bad65ed2 Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Thu, 1 Apr 2021 16:37:05 +1100 Subject: [PATCH] Move all relay files under common folder --- src/lib/waku_relay.ts | 181 ----------------- src/lib/{ => waku_relay}/get_waku_peers.ts | 4 +- .../index.spec.ts} | 24 +-- src/lib/waku_relay/index.ts | 183 ++++++++++++++++++ src/lib/waku_relay/relay_heartbeat.ts | 30 +-- 5 files changed, 215 insertions(+), 207 deletions(-) delete mode 100644 src/lib/waku_relay.ts rename src/lib/{ => waku_relay}/get_waku_peers.ts (92%) rename src/lib/{waku_relay.spec.ts => waku_relay/index.spec.ts} (95%) diff --git a/src/lib/waku_relay.ts b/src/lib/waku_relay.ts deleted file mode 100644 index 87d2727281..0000000000 --- a/src/lib/waku_relay.ts +++ /dev/null @@ -1,181 +0,0 @@ -import Gossipsub from 'libp2p-gossipsub'; -import { Libp2p } from 'libp2p-gossipsub/src/interfaces'; -import { createGossipRpc, messageIdToString } from 'libp2p-gossipsub/src/utils'; -import Pubsub, { InMessage } from 'libp2p-interfaces/src/pubsub'; -import { SignaturePolicy } from 'libp2p-interfaces/src/pubsub/signature-policy'; - -import { getWakuPeers } from './get_waku_peers'; -import { WakuMessage } from './waku_message'; -import { RelayHeartbeat } from './waku_relay/relay_heartbeat'; - -export const CODEC = '/vac/waku/relay/2.0.0-beta2'; - -// As per waku specs, the topic is fixed, value taken from nim-waku -export const TOPIC = '/waku/2/default-waku/proto'; - -// This is the class to pass to libp2p as pubsub protocol -export class WakuRelayPubsub extends Gossipsub { - heartbeat: RelayHeartbeat; - - /** - * - * @param libp2p: Libp2p - */ - constructor(libp2p: Libp2p) { - super(libp2p, { - emitSelf: false, - // Ensure that no signature is expected in the messages. - globalSignaturePolicy: SignaturePolicy.StrictNoSign, - }); - - this.heartbeat = new RelayHeartbeat(this); - - const multicodecs = [CODEC]; - - // This is the downside of using `libp2p-gossipsub` instead of - // implementing WakuRelay from scratch. - Object.assign(this, { multicodecs }); - } - - /** - * Join topic - * @param {string} topic - * @returns {void} - * @override - */ - join(topic: string): void { - if (!this.started) { - throw new Error('WakuRelayPubsub has not started'); - } - - const fanoutPeers = this.fanout.get(topic); - if (fanoutPeers) { - // these peers have a score above the publish threshold, which may be negative - // so drop the ones with a negative score - fanoutPeers.forEach((id) => { - if (this.score.score(id) < 0) { - fanoutPeers.delete(id); - } - }); - if (fanoutPeers.size < this._options.D) { - // we need more peers; eager, as this would get fixed in the next heartbeat - getWakuPeers( - this, - topic, - this._options.D - fanoutPeers.size, - (id: string): boolean => { - // filter our current peers, direct peers, and peers with negative scores - return ( - !fanoutPeers.has(id) && - !this.direct.has(id) && - this.score.score(id) >= 0 - ); - } - ).forEach((id) => fanoutPeers.add(id)); - } - this.mesh.set(topic, fanoutPeers); - this.fanout.delete(topic); - this.lastpub.delete(topic); - } else { - const peers = getWakuPeers( - this, - topic, - this._options.D, - (id: string): boolean => { - // filter direct peers and peers with negative score - return !this.direct.has(id) && this.score.score(id) >= 0; - } - ); - this.mesh.set(topic, peers); - } - this.mesh.get(topic)!.forEach((id) => { - this.log('JOIN: Add mesh link to %s in %s', id, topic); - this._sendGraft(id, topic); - }); - } - - /** - * Publish messages - * - * @override - * @param {InMessage} msg - * @returns {void} - */ - async _publish(msg: InMessage): Promise { - if (msg.receivedFrom !== this.peerId.toB58String()) { - this.score.deliverMessage(msg); - this.gossipTracer.deliverMessage(msg); - } - - const msgID = this.getMsgId(msg); - const msgIdStr = messageIdToString(msgID); - // put in seen cache - this.seenCache.put(msgIdStr); - - this.messageCache.put(msg); - - const toSend = new Set(); - msg.topicIDs.forEach((topic) => { - const peersInTopic = this.topics.get(topic); - if (!peersInTopic) { - return; - } - - // direct peers - this.direct.forEach((id) => { - toSend.add(id); - }); - - let meshPeers = this.mesh.get(topic); - if (!meshPeers || !meshPeers.size) { - // We are not in the mesh for topic, use fanout peers - meshPeers = this.fanout.get(topic); - if (!meshPeers) { - // If we are not in the fanout, then pick peers in topic above the publishThreshold - const peers = getWakuPeers(this, topic, this._options.D, (id) => { - return ( - this.score.score(id) >= - this._options.scoreThresholds.publishThreshold - ); - }); - - if (peers.size > 0) { - meshPeers = peers; - this.fanout.set(topic, peers); - } else { - meshPeers = new Set(); - } - } - // Store the latest publishing time - this.lastpub.set(topic, this._now()); - } - - meshPeers!.forEach((peer) => { - toSend.add(peer); - }); - }); - // Publish messages to peers - const rpc = createGossipRpc([Gossipsub.utils.normalizeOutRpcMessage(msg)]); - toSend.forEach((id) => { - if (id === msg.from) { - return; - } - this._sendRpc(id, rpc); - }); - } -} - -// This class provides an interface to execute the waku relay protocol -export class WakuRelay { - constructor(private pubsub: Pubsub) {} - - // At this stage we are always using the same topic so we do not pass it as a parameter - async subscribe() { - await this.pubsub.subscribe(TOPIC); - } - - async publish(message: WakuMessage) { - const msg = message.toBinary(); - await this.pubsub.publish(TOPIC, msg); - } -} diff --git a/src/lib/get_waku_peers.ts b/src/lib/waku_relay/get_waku_peers.ts similarity index 92% rename from src/lib/get_waku_peers.ts rename to src/lib/waku_relay/get_waku_peers.ts index 43ebbadc70..9dd788a955 100644 --- a/src/lib/get_waku_peers.ts +++ b/src/lib/waku_relay/get_waku_peers.ts @@ -1,6 +1,6 @@ import { shuffle } from 'libp2p-gossipsub/src/utils'; -import { CODEC, WakuRelayPubsub } from './waku_relay'; +import { CODEC, WakuRelayPubsub } from './index'; /** * Given a topic, returns up to count peers subscribed to that topic @@ -27,7 +27,7 @@ export function getWakuPeers( // Adds all peers using our protocol // that also pass the filter function let peers: string[] = []; - peersInTopic.forEach((id) => { + peersInTopic.forEach((id: string) => { const peerStreams = router.peers.get(id); if (!peerStreams) { return; diff --git a/src/lib/waku_relay.spec.ts b/src/lib/waku_relay/index.spec.ts similarity index 95% rename from src/lib/waku_relay.spec.ts rename to src/lib/waku_relay/index.spec.ts index cbb6d14729..6daab699a4 100644 --- a/src/lib/waku_relay.spec.ts +++ b/src/lib/waku_relay/index.spec.ts @@ -1,14 +1,14 @@ import { expect } from 'chai'; import Pubsub from 'libp2p-interfaces/src/pubsub'; -import { NOISE_KEY_1, NOISE_KEY_2 } from '../test_utils/constants'; -import { delay } from '../test_utils/delay'; -import { makeLogFileName } from '../test_utils/log_file'; -import { NimWaku } from '../test_utils/nim_waku'; +import { NOISE_KEY_1, NOISE_KEY_2 } from '../../test_utils/constants'; +import { delay } from '../../test_utils/delay'; +import { makeLogFileName } from '../../test_utils/log_file'; +import { NimWaku } from '../../test_utils/nim_waku'; +import Waku from '../waku'; +import { WakuMessage } from '../waku_message'; -import Waku from './waku'; -import { WakuMessage } from './waku_message'; -import { CODEC, TOPIC } from './waku_relay'; +import { CODEC, TOPIC } from './index'; describe('Waku Relay', () => { afterEach(function () { @@ -41,13 +41,15 @@ describe('Waku Relay', () => { await Promise.all([ new Promise((resolve) => - waku1.libp2p.pubsub.once('pubsub:subscription-change', (...args) => - resolve(args) + waku1.libp2p.pubsub.once( + 'pubsub:subscription-change', + (...args: any[]) => resolve(args) ) ), new Promise((resolve) => - waku2.libp2p.pubsub.once('pubsub:subscription-change', (...args) => - resolve(args) + waku2.libp2p.pubsub.once( + 'pubsub:subscription-change', + (...args: any[]) => resolve(args) ) ), ]); diff --git a/src/lib/waku_relay/index.ts b/src/lib/waku_relay/index.ts index e93d868e54..cd507ed797 100644 --- a/src/lib/waku_relay/index.ts +++ b/src/lib/waku_relay/index.ts @@ -1,2 +1,185 @@ +import Gossipsub from 'libp2p-gossipsub'; +import { Libp2p } from 'libp2p-gossipsub/src/interfaces'; +import { createGossipRpc, messageIdToString } from 'libp2p-gossipsub/src/utils'; +import Pubsub, { InMessage } from 'libp2p-interfaces/src/pubsub'; +import { SignaturePolicy } from 'libp2p-interfaces/src/pubsub/signature-policy'; + +import { WakuMessage } from '../waku_message'; + +import { getWakuPeers } from './get_waku_peers'; +import { RelayHeartbeat } from './relay_heartbeat'; + export * from './constants'; export * from './relay_heartbeat'; + +export const CODEC = '/vac/waku/relay/2.0.0-beta2'; + +// As per waku specs, the topic is fixed, value taken from nim-waku +export const TOPIC = '/waku/2/default-waku/proto'; + +// This is the class to pass to libp2p as pubsub protocol +export class WakuRelayPubsub extends Gossipsub { + heartbeat: RelayHeartbeat; + + /** + * + * @param libp2p: Libp2p + */ + constructor(libp2p: Libp2p) { + super(libp2p, { + emitSelf: false, + // Ensure that no signature is expected in the messages. + globalSignaturePolicy: SignaturePolicy.StrictNoSign, + }); + + this.heartbeat = new RelayHeartbeat(this); + + const multicodecs = [CODEC]; + + // This is the downside of using `libp2p-gossipsub` instead of + // implementing WakuRelay from scratch. + Object.assign(this, { multicodecs }); + } + + /** + * Join topic + * @param {string} topic + * @returns {void} + * @override + */ + join(topic: string): void { + if (!this.started) { + throw new Error('WakuRelayPubsub has not started'); + } + + const fanoutPeers = this.fanout.get(topic); + if (fanoutPeers) { + // these peers have a score above the publish threshold, which may be negative + // so drop the ones with a negative score + fanoutPeers.forEach((id) => { + if (this.score.score(id) < 0) { + fanoutPeers.delete(id); + } + }); + if (fanoutPeers.size < this._options.D) { + // we need more peers; eager, as this would get fixed in the next heartbeat + getWakuPeers( + this, + topic, + this._options.D - fanoutPeers.size, + (id: string): boolean => { + // filter our current peers, direct peers, and peers with negative scores + return ( + !fanoutPeers.has(id) && + !this.direct.has(id) && + this.score.score(id) >= 0 + ); + } + ).forEach((id) => fanoutPeers.add(id)); + } + this.mesh.set(topic, fanoutPeers); + this.fanout.delete(topic); + this.lastpub.delete(topic); + } else { + const peers = getWakuPeers( + this, + topic, + this._options.D, + (id: string): boolean => { + // filter direct peers and peers with negative score + return !this.direct.has(id) && this.score.score(id) >= 0; + } + ); + this.mesh.set(topic, peers); + } + this.mesh.get(topic)!.forEach((id) => { + this.log('JOIN: Add mesh link to %s in %s', id, topic); + this._sendGraft(id, topic); + }); + } + + /** + * Publish messages + * + * @override + * @param {InMessage} msg + * @returns {void} + */ + async _publish(msg: InMessage): Promise { + if (msg.receivedFrom !== this.peerId.toB58String()) { + this.score.deliverMessage(msg); + this.gossipTracer.deliverMessage(msg); + } + + const msgID = this.getMsgId(msg); + const msgIdStr = messageIdToString(msgID); + // put in seen cache + this.seenCache.put(msgIdStr); + + this.messageCache.put(msg); + + const toSend = new Set(); + msg.topicIDs.forEach((topic) => { + const peersInTopic = this.topics.get(topic); + if (!peersInTopic) { + return; + } + + // direct peers + this.direct.forEach((id) => { + toSend.add(id); + }); + + let meshPeers = this.mesh.get(topic); + if (!meshPeers || !meshPeers.size) { + // We are not in the mesh for topic, use fanout peers + meshPeers = this.fanout.get(topic); + if (!meshPeers) { + // If we are not in the fanout, then pick peers in topic above the publishThreshold + const peers = getWakuPeers(this, topic, this._options.D, (id) => { + return ( + this.score.score(id) >= + this._options.scoreThresholds.publishThreshold + ); + }); + + if (peers.size > 0) { + meshPeers = peers; + this.fanout.set(topic, peers); + } else { + meshPeers = new Set(); + } + } + // Store the latest publishing time + this.lastpub.set(topic, this._now()); + } + + meshPeers!.forEach((peer) => { + toSend.add(peer); + }); + }); + // Publish messages to peers + const rpc = createGossipRpc([Gossipsub.utils.normalizeOutRpcMessage(msg)]); + toSend.forEach((id) => { + if (id === msg.from) { + return; + } + this._sendRpc(id, rpc); + }); + } +} + +// This class provides an interface to execute the waku relay protocol +export class WakuRelay { + constructor(private pubsub: Pubsub) {} + + // At this stage we are always using the same topic so we do not pass it as a parameter + async subscribe() { + await this.pubsub.subscribe(TOPIC); + } + + async publish(message: WakuMessage) { + const msg = message.toBinary(); + await this.pubsub.publish(TOPIC, msg); + } +} diff --git a/src/lib/waku_relay/relay_heartbeat.ts b/src/lib/waku_relay/relay_heartbeat.ts index a910a56f8b..0842bfee6a 100644 --- a/src/lib/waku_relay/relay_heartbeat.ts +++ b/src/lib/waku_relay/relay_heartbeat.ts @@ -2,9 +2,8 @@ import Gossipsub from 'libp2p-gossipsub'; import { Heartbeat } from 'libp2p-gossipsub/src/heartbeat'; import { shuffle } from 'libp2p-gossipsub/src/utils'; -import { getWakuPeers } from '../get_waku_peers'; - import * as constants from './constants'; +import { getWakuPeers } from './get_waku_peers'; export class RelayHeartbeat extends Heartbeat { /** @@ -153,15 +152,20 @@ export class RelayHeartbeat extends Heartbeat { if (peers.size < Dlo) { const backoff = this.gossipsub.backoff.get(topic); const ineed = D - peers.size; - const peersSet = getWakuPeers(this.gossipsub, topic, ineed, (id) => { - // filter out mesh peers, direct peers, peers we are backing off, peers with negative score - return ( - !peers.has(id) && - !this.gossipsub.direct.has(id) && - (!backoff || !backoff.has(id)) && - getScore(id) >= 0 - ); - }); + const peersSet = getWakuPeers( + this.gossipsub, + topic, + ineed, + (id: string) => { + // filter out mesh peers, direct peers, peers we are backing off, peers with negative score + return ( + !peers.has(id) && + !this.gossipsub.direct.has(id) && + (!backoff || !backoff.has(id)) && + getScore(id) >= 0 + ); + } + ); peersSet.forEach(graftPeer); } @@ -288,7 +292,7 @@ export class RelayHeartbeat extends Heartbeat { ); } ); - peersToGraft.forEach((id) => { + peersToGraft.forEach((id: string) => { this.gossipsub.log( 'HEARTBEAT: Opportunistically graft peer %s on topic %s', id, @@ -344,7 +348,7 @@ export class RelayHeartbeat extends Heartbeat { ); } ); - peersSet.forEach((id) => { + peersSet.forEach((id: string) => { fanoutPeers.add(id); }); }