Check against gossipsub protocol was also done in publish

Meaning it would sometimes fail to recognize another waku node at
a subscriber of a topic, depending on timing with the `subscribe`
call.
This commit is contained in:
Franck Royer 2021-04-01 15:47:34 +11:00
parent f35322967d
commit 95b88d2815
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4
2 changed files with 73 additions and 14 deletions

View File

@ -73,24 +73,13 @@ describe('Waku Relay', () => {
expect(protocols.findIndex((value) => value.match(/sub/))).to.eq(-1);
});
// TODO: Fix this
it.skip('Publish', async function () {
it('Publish', async function () {
this.timeout(10000);
const message = WakuMessage.fromUtf8String('JS to JS communication works');
// waku.libp2p.pubsub.globalSignaturePolicy = 'StrictSign';
const receivedPromise = waitForNextData(waku2.libp2p.pubsub);
await Promise.all([
new Promise((resolve) =>
waku1.libp2p.pubsub.once('gossipsub:heartbeat', resolve)
),
new Promise((resolve) =>
waku2.libp2p.pubsub.once('gossipsub:heartbeat', resolve)
),
]);
await waku1.relay.publish(message);
const receivedMsg = await receivedPromise;

View File

@ -1,6 +1,7 @@
import Gossipsub from 'libp2p-gossipsub';
import { Libp2p } from 'libp2p-gossipsub/src/interfaces';
import Pubsub from 'libp2p-interfaces/src/pubsub';
import { createGossipRpc, messageIdToString } from 'libp2p-gossipsub/src/utils';
import Pubsub, { InMessage } from 'libp2p-interfaces/src/pubsub';
import { SignaturePolicy } from 'libp2p-interfaces/src/pubsub/signature-policy';
import { getWakuPeers } from './get_waku_peers';
@ -87,9 +88,78 @@ export class WakuRelayPubsub extends Gossipsub {
this._sendGraft(id, topic);
});
}
/**
* Publish messages
*
* @override
* @param {InMessage} msg
* @returns {void}
*/
async _publish(msg: InMessage): Promise<void> {
if (msg.receivedFrom !== this.peerId.toB58String()) {
this.score.deliverMessage(msg);
this.gossipTracer.deliverMessage(msg);
}
const msgID = this.getMsgId(msg);
const msgIdStr = messageIdToString(msgID);
// put in seen cache
this.seenCache.put(msgIdStr);
this.messageCache.put(msg);
const toSend = new Set<string>();
msg.topicIDs.forEach((topic) => {
const peersInTopic = this.topics.get(topic);
if (!peersInTopic) {
return;
}
// direct peers
this.direct.forEach((id) => {
toSend.add(id);
});
let meshPeers = this.mesh.get(topic);
if (!meshPeers || !meshPeers.size) {
// We are not in the mesh for topic, use fanout peers
meshPeers = this.fanout.get(topic);
if (!meshPeers) {
// If we are not in the fanout, then pick peers in topic above the publishThreshold
const peers = getWakuPeers(this, topic, this._options.D, (id) => {
return (
this.score.score(id) >=
this._options.scoreThresholds.publishThreshold
);
});
if (peers.size > 0) {
meshPeers = peers;
this.fanout.set(topic, peers);
} else {
meshPeers = new Set();
}
}
// Store the latest publishing time
this.lastpub.set(topic, this._now());
}
meshPeers!.forEach((peer) => {
toSend.add(peer);
});
});
// Publish messages to peers
const rpc = createGossipRpc([Gossipsub.utils.normalizeOutRpcMessage(msg)]);
toSend.forEach((id) => {
if (id === msg.from) {
return;
}
this._sendRpc(id, rpc);
});
}
}
// TODO: Implement dial for an address with format '/ip4/127.0.0.1/tcp/60000/p2p/16Uiu2HAkyzsXzENw5XBDYEQQAeQTCYjBJpMLgBmEXuwbtcrgxBJ4'
// This class provides an interface to execute the waku relay protocol
export class WakuRelay {
constructor(private pubsub: Pubsub) {}