diff --git a/src/lib/waku_relay.spec.ts b/src/lib/waku_relay.spec.ts index bedcacd766..cbb6d14729 100644 --- a/src/lib/waku_relay.spec.ts +++ b/src/lib/waku_relay.spec.ts @@ -73,24 +73,13 @@ describe('Waku Relay', () => { expect(protocols.findIndex((value) => value.match(/sub/))).to.eq(-1); }); - // TODO: Fix this - it.skip('Publish', async function () { + it('Publish', async function () { this.timeout(10000); const message = WakuMessage.fromUtf8String('JS to JS communication works'); - // waku.libp2p.pubsub.globalSignaturePolicy = 'StrictSign'; const receivedPromise = waitForNextData(waku2.libp2p.pubsub); - await Promise.all([ - new Promise((resolve) => - waku1.libp2p.pubsub.once('gossipsub:heartbeat', resolve) - ), - new Promise((resolve) => - waku2.libp2p.pubsub.once('gossipsub:heartbeat', resolve) - ), - ]); - await waku1.relay.publish(message); const receivedMsg = await receivedPromise; diff --git a/src/lib/waku_relay.ts b/src/lib/waku_relay.ts index 1069e1f159..7e10757f71 100644 --- a/src/lib/waku_relay.ts +++ b/src/lib/waku_relay.ts @@ -1,6 +1,7 @@ import Gossipsub from 'libp2p-gossipsub'; import { Libp2p } from 'libp2p-gossipsub/src/interfaces'; -import Pubsub from 'libp2p-interfaces/src/pubsub'; +import { createGossipRpc, messageIdToString } from 'libp2p-gossipsub/src/utils'; +import Pubsub, { InMessage } from 'libp2p-interfaces/src/pubsub'; import { SignaturePolicy } from 'libp2p-interfaces/src/pubsub/signature-policy'; import { getWakuPeers } from './get_waku_peers'; @@ -87,9 +88,78 @@ export class WakuRelayPubsub extends Gossipsub { this._sendGraft(id, topic); }); } + + /** + * Publish messages + * + * @override + * @param {InMessage} msg + * @returns {void} + */ + async _publish(msg: InMessage): Promise { + if (msg.receivedFrom !== this.peerId.toB58String()) { + this.score.deliverMessage(msg); + this.gossipTracer.deliverMessage(msg); + } + + const msgID = this.getMsgId(msg); + const msgIdStr = messageIdToString(msgID); + // put in seen cache + this.seenCache.put(msgIdStr); + + this.messageCache.put(msg); + + const toSend = new Set(); + msg.topicIDs.forEach((topic) => { + const peersInTopic = this.topics.get(topic); + if (!peersInTopic) { + return; + } + + // direct peers + this.direct.forEach((id) => { + toSend.add(id); + }); + + let meshPeers = this.mesh.get(topic); + if (!meshPeers || !meshPeers.size) { + // We are not in the mesh for topic, use fanout peers + meshPeers = this.fanout.get(topic); + if (!meshPeers) { + // If we are not in the fanout, then pick peers in topic above the publishThreshold + const peers = getWakuPeers(this, topic, this._options.D, (id) => { + return ( + this.score.score(id) >= + this._options.scoreThresholds.publishThreshold + ); + }); + + if (peers.size > 0) { + meshPeers = peers; + this.fanout.set(topic, peers); + } else { + meshPeers = new Set(); + } + } + // Store the latest publishing time + this.lastpub.set(topic, this._now()); + } + + meshPeers!.forEach((peer) => { + toSend.add(peer); + }); + }); + // Publish messages to peers + const rpc = createGossipRpc([Gossipsub.utils.normalizeOutRpcMessage(msg)]); + toSend.forEach((id) => { + if (id === msg.from) { + return; + } + this._sendRpc(id, rpc); + }); + } } -// TODO: Implement dial for an address with format '/ip4/127.0.0.1/tcp/60000/p2p/16Uiu2HAkyzsXzENw5XBDYEQQAeQTCYjBJpMLgBmEXuwbtcrgxBJ4' // This class provides an interface to execute the waku relay protocol export class WakuRelay { constructor(private pubsub: Pubsub) {}