mirror of
https://github.com/logos-messaging/js-waku.git
synced 2026-01-07 16:23:09 +00:00
Move all relay files under common folder
This commit is contained in:
parent
ed5a363096
commit
7bd48b6220
@ -1,181 +0,0 @@
|
|||||||
import Gossipsub from 'libp2p-gossipsub';
|
|
||||||
import { Libp2p } from 'libp2p-gossipsub/src/interfaces';
|
|
||||||
import { createGossipRpc, messageIdToString } from 'libp2p-gossipsub/src/utils';
|
|
||||||
import Pubsub, { InMessage } from 'libp2p-interfaces/src/pubsub';
|
|
||||||
import { SignaturePolicy } from 'libp2p-interfaces/src/pubsub/signature-policy';
|
|
||||||
|
|
||||||
import { getWakuPeers } from './get_waku_peers';
|
|
||||||
import { WakuMessage } from './waku_message';
|
|
||||||
import { RelayHeartbeat } from './waku_relay/relay_heartbeat';
|
|
||||||
|
|
||||||
export const CODEC = '/vac/waku/relay/2.0.0-beta2';
|
|
||||||
|
|
||||||
// As per waku specs, the topic is fixed, value taken from nim-waku
|
|
||||||
export const TOPIC = '/waku/2/default-waku/proto';
|
|
||||||
|
|
||||||
// This is the class to pass to libp2p as pubsub protocol
|
|
||||||
export class WakuRelayPubsub extends Gossipsub {
|
|
||||||
heartbeat: RelayHeartbeat;
|
|
||||||
|
|
||||||
/**
|
|
||||||
*
|
|
||||||
* @param libp2p: Libp2p
|
|
||||||
*/
|
|
||||||
constructor(libp2p: Libp2p) {
|
|
||||||
super(libp2p, {
|
|
||||||
emitSelf: false,
|
|
||||||
// Ensure that no signature is expected in the messages.
|
|
||||||
globalSignaturePolicy: SignaturePolicy.StrictNoSign,
|
|
||||||
});
|
|
||||||
|
|
||||||
this.heartbeat = new RelayHeartbeat(this);
|
|
||||||
|
|
||||||
const multicodecs = [CODEC];
|
|
||||||
|
|
||||||
// This is the downside of using `libp2p-gossipsub` instead of
|
|
||||||
// implementing WakuRelay from scratch.
|
|
||||||
Object.assign(this, { multicodecs });
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Join topic
|
|
||||||
* @param {string} topic
|
|
||||||
* @returns {void}
|
|
||||||
* @override
|
|
||||||
*/
|
|
||||||
join(topic: string): void {
|
|
||||||
if (!this.started) {
|
|
||||||
throw new Error('WakuRelayPubsub has not started');
|
|
||||||
}
|
|
||||||
|
|
||||||
const fanoutPeers = this.fanout.get(topic);
|
|
||||||
if (fanoutPeers) {
|
|
||||||
// these peers have a score above the publish threshold, which may be negative
|
|
||||||
// so drop the ones with a negative score
|
|
||||||
fanoutPeers.forEach((id) => {
|
|
||||||
if (this.score.score(id) < 0) {
|
|
||||||
fanoutPeers.delete(id);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
if (fanoutPeers.size < this._options.D) {
|
|
||||||
// we need more peers; eager, as this would get fixed in the next heartbeat
|
|
||||||
getWakuPeers(
|
|
||||||
this,
|
|
||||||
topic,
|
|
||||||
this._options.D - fanoutPeers.size,
|
|
||||||
(id: string): boolean => {
|
|
||||||
// filter our current peers, direct peers, and peers with negative scores
|
|
||||||
return (
|
|
||||||
!fanoutPeers.has(id) &&
|
|
||||||
!this.direct.has(id) &&
|
|
||||||
this.score.score(id) >= 0
|
|
||||||
);
|
|
||||||
}
|
|
||||||
).forEach((id) => fanoutPeers.add(id));
|
|
||||||
}
|
|
||||||
this.mesh.set(topic, fanoutPeers);
|
|
||||||
this.fanout.delete(topic);
|
|
||||||
this.lastpub.delete(topic);
|
|
||||||
} else {
|
|
||||||
const peers = getWakuPeers(
|
|
||||||
this,
|
|
||||||
topic,
|
|
||||||
this._options.D,
|
|
||||||
(id: string): boolean => {
|
|
||||||
// filter direct peers and peers with negative score
|
|
||||||
return !this.direct.has(id) && this.score.score(id) >= 0;
|
|
||||||
}
|
|
||||||
);
|
|
||||||
this.mesh.set(topic, peers);
|
|
||||||
}
|
|
||||||
this.mesh.get(topic)!.forEach((id) => {
|
|
||||||
this.log('JOIN: Add mesh link to %s in %s', id, topic);
|
|
||||||
this._sendGraft(id, topic);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Publish messages
|
|
||||||
*
|
|
||||||
* @override
|
|
||||||
* @param {InMessage} msg
|
|
||||||
* @returns {void}
|
|
||||||
*/
|
|
||||||
async _publish(msg: InMessage): Promise<void> {
|
|
||||||
if (msg.receivedFrom !== this.peerId.toB58String()) {
|
|
||||||
this.score.deliverMessage(msg);
|
|
||||||
this.gossipTracer.deliverMessage(msg);
|
|
||||||
}
|
|
||||||
|
|
||||||
const msgID = this.getMsgId(msg);
|
|
||||||
const msgIdStr = messageIdToString(msgID);
|
|
||||||
// put in seen cache
|
|
||||||
this.seenCache.put(msgIdStr);
|
|
||||||
|
|
||||||
this.messageCache.put(msg);
|
|
||||||
|
|
||||||
const toSend = new Set<string>();
|
|
||||||
msg.topicIDs.forEach((topic) => {
|
|
||||||
const peersInTopic = this.topics.get(topic);
|
|
||||||
if (!peersInTopic) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// direct peers
|
|
||||||
this.direct.forEach((id) => {
|
|
||||||
toSend.add(id);
|
|
||||||
});
|
|
||||||
|
|
||||||
let meshPeers = this.mesh.get(topic);
|
|
||||||
if (!meshPeers || !meshPeers.size) {
|
|
||||||
// We are not in the mesh for topic, use fanout peers
|
|
||||||
meshPeers = this.fanout.get(topic);
|
|
||||||
if (!meshPeers) {
|
|
||||||
// If we are not in the fanout, then pick peers in topic above the publishThreshold
|
|
||||||
const peers = getWakuPeers(this, topic, this._options.D, (id) => {
|
|
||||||
return (
|
|
||||||
this.score.score(id) >=
|
|
||||||
this._options.scoreThresholds.publishThreshold
|
|
||||||
);
|
|
||||||
});
|
|
||||||
|
|
||||||
if (peers.size > 0) {
|
|
||||||
meshPeers = peers;
|
|
||||||
this.fanout.set(topic, peers);
|
|
||||||
} else {
|
|
||||||
meshPeers = new Set();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Store the latest publishing time
|
|
||||||
this.lastpub.set(topic, this._now());
|
|
||||||
}
|
|
||||||
|
|
||||||
meshPeers!.forEach((peer) => {
|
|
||||||
toSend.add(peer);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
// Publish messages to peers
|
|
||||||
const rpc = createGossipRpc([Gossipsub.utils.normalizeOutRpcMessage(msg)]);
|
|
||||||
toSend.forEach((id) => {
|
|
||||||
if (id === msg.from) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
this._sendRpc(id, rpc);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// This class provides an interface to execute the waku relay protocol
|
|
||||||
export class WakuRelay {
|
|
||||||
constructor(private pubsub: Pubsub) {}
|
|
||||||
|
|
||||||
// At this stage we are always using the same topic so we do not pass it as a parameter
|
|
||||||
async subscribe() {
|
|
||||||
await this.pubsub.subscribe(TOPIC);
|
|
||||||
}
|
|
||||||
|
|
||||||
async publish(message: WakuMessage) {
|
|
||||||
const msg = message.toBinary();
|
|
||||||
await this.pubsub.publish(TOPIC, msg);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,6 +1,6 @@
|
|||||||
import { shuffle } from 'libp2p-gossipsub/src/utils';
|
import { shuffle } from 'libp2p-gossipsub/src/utils';
|
||||||
|
|
||||||
import { CODEC, WakuRelayPubsub } from './waku_relay';
|
import { CODEC, WakuRelayPubsub } from './index';
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Given a topic, returns up to count peers subscribed to that topic
|
* Given a topic, returns up to count peers subscribed to that topic
|
||||||
@ -27,7 +27,7 @@ export function getWakuPeers(
|
|||||||
// Adds all peers using our protocol
|
// Adds all peers using our protocol
|
||||||
// that also pass the filter function
|
// that also pass the filter function
|
||||||
let peers: string[] = [];
|
let peers: string[] = [];
|
||||||
peersInTopic.forEach((id) => {
|
peersInTopic.forEach((id: string) => {
|
||||||
const peerStreams = router.peers.get(id);
|
const peerStreams = router.peers.get(id);
|
||||||
if (!peerStreams) {
|
if (!peerStreams) {
|
||||||
return;
|
return;
|
||||||
@ -1,14 +1,14 @@
|
|||||||
import { expect } from 'chai';
|
import { expect } from 'chai';
|
||||||
import Pubsub from 'libp2p-interfaces/src/pubsub';
|
import Pubsub from 'libp2p-interfaces/src/pubsub';
|
||||||
|
|
||||||
import { NOISE_KEY_1, NOISE_KEY_2 } from '../test_utils/constants';
|
import { NOISE_KEY_1, NOISE_KEY_2 } from '../../test_utils/constants';
|
||||||
import { delay } from '../test_utils/delay';
|
import { delay } from '../../test_utils/delay';
|
||||||
import { makeLogFileName } from '../test_utils/log_file';
|
import { makeLogFileName } from '../../test_utils/log_file';
|
||||||
import { NimWaku } from '../test_utils/nim_waku';
|
import { NimWaku } from '../../test_utils/nim_waku';
|
||||||
|
import Waku from '../waku';
|
||||||
|
import { WakuMessage } from '../waku_message';
|
||||||
|
|
||||||
import Waku from './waku';
|
import { CODEC, TOPIC } from './index';
|
||||||
import { WakuMessage } from './waku_message';
|
|
||||||
import { CODEC, TOPIC } from './waku_relay';
|
|
||||||
|
|
||||||
describe('Waku Relay', () => {
|
describe('Waku Relay', () => {
|
||||||
afterEach(function () {
|
afterEach(function () {
|
||||||
@ -41,13 +41,15 @@ describe('Waku Relay', () => {
|
|||||||
|
|
||||||
await Promise.all([
|
await Promise.all([
|
||||||
new Promise((resolve) =>
|
new Promise((resolve) =>
|
||||||
waku1.libp2p.pubsub.once('pubsub:subscription-change', (...args) =>
|
waku1.libp2p.pubsub.once(
|
||||||
resolve(args)
|
'pubsub:subscription-change',
|
||||||
|
(...args: any[]) => resolve(args)
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
new Promise((resolve) =>
|
new Promise((resolve) =>
|
||||||
waku2.libp2p.pubsub.once('pubsub:subscription-change', (...args) =>
|
waku2.libp2p.pubsub.once(
|
||||||
resolve(args)
|
'pubsub:subscription-change',
|
||||||
|
(...args: any[]) => resolve(args)
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
]);
|
]);
|
||||||
@ -1,2 +1,185 @@
|
|||||||
|
import Gossipsub from 'libp2p-gossipsub';
|
||||||
|
import { Libp2p } from 'libp2p-gossipsub/src/interfaces';
|
||||||
|
import { createGossipRpc, messageIdToString } from 'libp2p-gossipsub/src/utils';
|
||||||
|
import Pubsub, { InMessage } from 'libp2p-interfaces/src/pubsub';
|
||||||
|
import { SignaturePolicy } from 'libp2p-interfaces/src/pubsub/signature-policy';
|
||||||
|
|
||||||
|
import { WakuMessage } from '../waku_message';
|
||||||
|
|
||||||
|
import { getWakuPeers } from './get_waku_peers';
|
||||||
|
import { RelayHeartbeat } from './relay_heartbeat';
|
||||||
|
|
||||||
export * from './constants';
|
export * from './constants';
|
||||||
export * from './relay_heartbeat';
|
export * from './relay_heartbeat';
|
||||||
|
|
||||||
|
export const CODEC = '/vac/waku/relay/2.0.0-beta2';
|
||||||
|
|
||||||
|
// As per waku specs, the topic is fixed, value taken from nim-waku
|
||||||
|
export const TOPIC = '/waku/2/default-waku/proto';
|
||||||
|
|
||||||
|
// This is the class to pass to libp2p as pubsub protocol
|
||||||
|
export class WakuRelayPubsub extends Gossipsub {
|
||||||
|
heartbeat: RelayHeartbeat;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @param libp2p: Libp2p
|
||||||
|
*/
|
||||||
|
constructor(libp2p: Libp2p) {
|
||||||
|
super(libp2p, {
|
||||||
|
emitSelf: false,
|
||||||
|
// Ensure that no signature is expected in the messages.
|
||||||
|
globalSignaturePolicy: SignaturePolicy.StrictNoSign,
|
||||||
|
});
|
||||||
|
|
||||||
|
this.heartbeat = new RelayHeartbeat(this);
|
||||||
|
|
||||||
|
const multicodecs = [CODEC];
|
||||||
|
|
||||||
|
// This is the downside of using `libp2p-gossipsub` instead of
|
||||||
|
// implementing WakuRelay from scratch.
|
||||||
|
Object.assign(this, { multicodecs });
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Join topic
|
||||||
|
* @param {string} topic
|
||||||
|
* @returns {void}
|
||||||
|
* @override
|
||||||
|
*/
|
||||||
|
join(topic: string): void {
|
||||||
|
if (!this.started) {
|
||||||
|
throw new Error('WakuRelayPubsub has not started');
|
||||||
|
}
|
||||||
|
|
||||||
|
const fanoutPeers = this.fanout.get(topic);
|
||||||
|
if (fanoutPeers) {
|
||||||
|
// these peers have a score above the publish threshold, which may be negative
|
||||||
|
// so drop the ones with a negative score
|
||||||
|
fanoutPeers.forEach((id) => {
|
||||||
|
if (this.score.score(id) < 0) {
|
||||||
|
fanoutPeers.delete(id);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
if (fanoutPeers.size < this._options.D) {
|
||||||
|
// we need more peers; eager, as this would get fixed in the next heartbeat
|
||||||
|
getWakuPeers(
|
||||||
|
this,
|
||||||
|
topic,
|
||||||
|
this._options.D - fanoutPeers.size,
|
||||||
|
(id: string): boolean => {
|
||||||
|
// filter our current peers, direct peers, and peers with negative scores
|
||||||
|
return (
|
||||||
|
!fanoutPeers.has(id) &&
|
||||||
|
!this.direct.has(id) &&
|
||||||
|
this.score.score(id) >= 0
|
||||||
|
);
|
||||||
|
}
|
||||||
|
).forEach((id) => fanoutPeers.add(id));
|
||||||
|
}
|
||||||
|
this.mesh.set(topic, fanoutPeers);
|
||||||
|
this.fanout.delete(topic);
|
||||||
|
this.lastpub.delete(topic);
|
||||||
|
} else {
|
||||||
|
const peers = getWakuPeers(
|
||||||
|
this,
|
||||||
|
topic,
|
||||||
|
this._options.D,
|
||||||
|
(id: string): boolean => {
|
||||||
|
// filter direct peers and peers with negative score
|
||||||
|
return !this.direct.has(id) && this.score.score(id) >= 0;
|
||||||
|
}
|
||||||
|
);
|
||||||
|
this.mesh.set(topic, peers);
|
||||||
|
}
|
||||||
|
this.mesh.get(topic)!.forEach((id) => {
|
||||||
|
this.log('JOIN: Add mesh link to %s in %s', id, topic);
|
||||||
|
this._sendGraft(id, topic);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Publish messages
|
||||||
|
*
|
||||||
|
* @override
|
||||||
|
* @param {InMessage} msg
|
||||||
|
* @returns {void}
|
||||||
|
*/
|
||||||
|
async _publish(msg: InMessage): Promise<void> {
|
||||||
|
if (msg.receivedFrom !== this.peerId.toB58String()) {
|
||||||
|
this.score.deliverMessage(msg);
|
||||||
|
this.gossipTracer.deliverMessage(msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
const msgID = this.getMsgId(msg);
|
||||||
|
const msgIdStr = messageIdToString(msgID);
|
||||||
|
// put in seen cache
|
||||||
|
this.seenCache.put(msgIdStr);
|
||||||
|
|
||||||
|
this.messageCache.put(msg);
|
||||||
|
|
||||||
|
const toSend = new Set<string>();
|
||||||
|
msg.topicIDs.forEach((topic) => {
|
||||||
|
const peersInTopic = this.topics.get(topic);
|
||||||
|
if (!peersInTopic) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// direct peers
|
||||||
|
this.direct.forEach((id) => {
|
||||||
|
toSend.add(id);
|
||||||
|
});
|
||||||
|
|
||||||
|
let meshPeers = this.mesh.get(topic);
|
||||||
|
if (!meshPeers || !meshPeers.size) {
|
||||||
|
// We are not in the mesh for topic, use fanout peers
|
||||||
|
meshPeers = this.fanout.get(topic);
|
||||||
|
if (!meshPeers) {
|
||||||
|
// If we are not in the fanout, then pick peers in topic above the publishThreshold
|
||||||
|
const peers = getWakuPeers(this, topic, this._options.D, (id) => {
|
||||||
|
return (
|
||||||
|
this.score.score(id) >=
|
||||||
|
this._options.scoreThresholds.publishThreshold
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
if (peers.size > 0) {
|
||||||
|
meshPeers = peers;
|
||||||
|
this.fanout.set(topic, peers);
|
||||||
|
} else {
|
||||||
|
meshPeers = new Set();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Store the latest publishing time
|
||||||
|
this.lastpub.set(topic, this._now());
|
||||||
|
}
|
||||||
|
|
||||||
|
meshPeers!.forEach((peer) => {
|
||||||
|
toSend.add(peer);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
// Publish messages to peers
|
||||||
|
const rpc = createGossipRpc([Gossipsub.utils.normalizeOutRpcMessage(msg)]);
|
||||||
|
toSend.forEach((id) => {
|
||||||
|
if (id === msg.from) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
this._sendRpc(id, rpc);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// This class provides an interface to execute the waku relay protocol
|
||||||
|
export class WakuRelay {
|
||||||
|
constructor(private pubsub: Pubsub) {}
|
||||||
|
|
||||||
|
// At this stage we are always using the same topic so we do not pass it as a parameter
|
||||||
|
async subscribe() {
|
||||||
|
await this.pubsub.subscribe(TOPIC);
|
||||||
|
}
|
||||||
|
|
||||||
|
async publish(message: WakuMessage) {
|
||||||
|
const msg = message.toBinary();
|
||||||
|
await this.pubsub.publish(TOPIC, msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@ -2,9 +2,8 @@ import Gossipsub from 'libp2p-gossipsub';
|
|||||||
import { Heartbeat } from 'libp2p-gossipsub/src/heartbeat';
|
import { Heartbeat } from 'libp2p-gossipsub/src/heartbeat';
|
||||||
import { shuffle } from 'libp2p-gossipsub/src/utils';
|
import { shuffle } from 'libp2p-gossipsub/src/utils';
|
||||||
|
|
||||||
import { getWakuPeers } from '../get_waku_peers';
|
|
||||||
|
|
||||||
import * as constants from './constants';
|
import * as constants from './constants';
|
||||||
|
import { getWakuPeers } from './get_waku_peers';
|
||||||
|
|
||||||
export class RelayHeartbeat extends Heartbeat {
|
export class RelayHeartbeat extends Heartbeat {
|
||||||
/**
|
/**
|
||||||
@ -153,15 +152,20 @@ export class RelayHeartbeat extends Heartbeat {
|
|||||||
if (peers.size < Dlo) {
|
if (peers.size < Dlo) {
|
||||||
const backoff = this.gossipsub.backoff.get(topic);
|
const backoff = this.gossipsub.backoff.get(topic);
|
||||||
const ineed = D - peers.size;
|
const ineed = D - peers.size;
|
||||||
const peersSet = getWakuPeers(this.gossipsub, topic, ineed, (id) => {
|
const peersSet = getWakuPeers(
|
||||||
// filter out mesh peers, direct peers, peers we are backing off, peers with negative score
|
this.gossipsub,
|
||||||
return (
|
topic,
|
||||||
!peers.has(id) &&
|
ineed,
|
||||||
!this.gossipsub.direct.has(id) &&
|
(id: string) => {
|
||||||
(!backoff || !backoff.has(id)) &&
|
// filter out mesh peers, direct peers, peers we are backing off, peers with negative score
|
||||||
getScore(id) >= 0
|
return (
|
||||||
);
|
!peers.has(id) &&
|
||||||
});
|
!this.gossipsub.direct.has(id) &&
|
||||||
|
(!backoff || !backoff.has(id)) &&
|
||||||
|
getScore(id) >= 0
|
||||||
|
);
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
peersSet.forEach(graftPeer);
|
peersSet.forEach(graftPeer);
|
||||||
}
|
}
|
||||||
@ -288,7 +292,7 @@ export class RelayHeartbeat extends Heartbeat {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
peersToGraft.forEach((id) => {
|
peersToGraft.forEach((id: string) => {
|
||||||
this.gossipsub.log(
|
this.gossipsub.log(
|
||||||
'HEARTBEAT: Opportunistically graft peer %s on topic %s',
|
'HEARTBEAT: Opportunistically graft peer %s on topic %s',
|
||||||
id,
|
id,
|
||||||
@ -344,7 +348,7 @@ export class RelayHeartbeat extends Heartbeat {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
peersSet.forEach((id) => {
|
peersSet.forEach((id: string) => {
|
||||||
fanoutPeers.add(id);
|
fanoutPeers.add(id);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user