Add observer interface that directly pass waku messages

This commit is contained in:
Franck Royer 2021-05-10 12:27:20 +10:00
parent ace5f2776f
commit 71a5b23bd2
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4
2 changed files with 41 additions and 16 deletions

View File

@ -1,5 +1,4 @@
import { expect } from 'chai'; import { expect } from 'chai';
import Pubsub from 'libp2p-interfaces/src/pubsub';
import TCP from 'libp2p-tcp'; import TCP from 'libp2p-tcp';
import { import {
@ -74,11 +73,13 @@ describe('Waku Relay', () => {
const message = WakuMessage.fromUtf8String('JS to JS communication works'); const message = WakuMessage.fromUtf8String('JS to JS communication works');
const receivedPromise = waitForNextData(waku2.libp2p.pubsub); const receivedMsgPromise: Promise<WakuMessage> = new Promise((resolve) => {
waku2.relay.addObserver(resolve);
});
await waku1.relay.send(message); await waku1.relay.send(message);
const receivedMsg = await receivedPromise; const receivedMsg = await receivedMsgPromise;
expect(receivedMsg.contentTopic).to.eq(message.contentTopic); expect(receivedMsg.contentTopic).to.eq(message.contentTopic);
expect(receivedMsg.version).to.eq(message.version); expect(receivedMsg.version).to.eq(message.version);
@ -148,11 +149,15 @@ describe('Waku Relay', () => {
this.timeout(5000); this.timeout(5000);
const message = WakuMessage.fromUtf8String('Here is another message.'); const message = WakuMessage.fromUtf8String('Here is another message.');
const receivedPromise = waitForNextData(waku.libp2p.pubsub); const receivedMsgPromise: Promise<WakuMessage> = new Promise(
(resolve) => {
waku.relay.addObserver(resolve);
}
);
await nimWaku.sendMessage(message); await nimWaku.sendMessage(message);
const receivedMsg = await receivedPromise; const receivedMsg = await receivedMsgPromise;
expect(receivedMsg.contentTopic).to.eq(message.contentTopic); expect(receivedMsg.contentTopic).to.eq(message.contentTopic);
expect(receivedMsg.version).to.eq(message.version); expect(receivedMsg.version).to.eq(message.version);
@ -233,11 +238,15 @@ describe('Waku Relay', () => {
const message = WakuMessage.fromUtf8String('Here is another message.'); const message = WakuMessage.fromUtf8String('Here is another message.');
const receivedPromise = waitForNextData(waku.libp2p.pubsub); const receivedMsgPromise: Promise<WakuMessage> = new Promise(
(resolve) => {
waku.relay.addObserver(resolve);
}
);
await nimWaku.sendMessage(message); await nimWaku.sendMessage(message);
const receivedMsg = await receivedPromise; const receivedMsg = await receivedMsgPromise;
expect(receivedMsg.contentTopic).to.eq(message.contentTopic); expect(receivedMsg.contentTopic).to.eq(message.contentTopic);
expect(receivedMsg.version).to.eq(message.version); expect(receivedMsg.version).to.eq(message.version);
@ -313,21 +322,18 @@ describe('Waku Relay', () => {
const msgStr = 'Hello there!'; const msgStr = 'Hello there!';
const message = WakuMessage.fromUtf8String(msgStr); const message = WakuMessage.fromUtf8String(msgStr);
const waku2ReceivedPromise = waitForNextData(waku2.libp2p.pubsub); const waku2ReceivedMsgPromise: Promise<WakuMessage> = new Promise(
(resolve) => {
waku2.relay.addObserver(resolve);
}
);
await waku1.relay.send(message); await waku1.relay.send(message);
console.log('Waiting for message'); console.log('Waiting for message');
const waku2ReceivedMsg = await waku2ReceivedPromise; const waku2ReceivedMsg = await waku2ReceivedMsgPromise;
expect(waku2ReceivedMsg.payloadAsUtf8).to.eq(msgStr); expect(waku2ReceivedMsg.payloadAsUtf8).to.eq(msgStr);
}); });
}); });
}); });
}); });
async function waitForNextData(pubsub: Pubsub): Promise<WakuMessage> {
const msg = (await new Promise((resolve) => {
pubsub.once(RelayDefaultTopic, resolve);
})) as Pubsub.InMessage;
return WakuMessage.decode(msg.data);
}

View File

@ -56,6 +56,7 @@ interface GossipOptions {
*/ */
export class WakuRelay extends Gossipsub implements Pubsub { export class WakuRelay extends Gossipsub implements Pubsub {
heartbeat: RelayHeartbeat; heartbeat: RelayHeartbeat;
public observers: Array<(message: WakuMessage) => void>;
/** /**
* *
@ -72,6 +73,7 @@ export class WakuRelay extends Gossipsub implements Pubsub {
); );
this.heartbeat = new RelayHeartbeat(this); this.heartbeat = new RelayHeartbeat(this);
this.observers = [];
const multicodecs = [constants.RelayCodec]; const multicodecs = [constants.RelayCodec];
@ -86,6 +88,13 @@ export class WakuRelay extends Gossipsub implements Pubsub {
* @returns {void} * @returns {void}
*/ */
public start(): void { public start(): void {
this.on(constants.RelayDefaultTopic, (event) => {
const wakuMsg = WakuMessage.decode(event.data);
this.observers.forEach((callbackFn) => {
callbackFn(wakuMsg);
});
});
super.start(); super.start();
super.subscribe(constants.RelayDefaultTopic); super.subscribe(constants.RelayDefaultTopic);
} }
@ -101,6 +110,16 @@ export class WakuRelay extends Gossipsub implements Pubsub {
await super.publish(constants.RelayDefaultTopic, Buffer.from(msg)); await super.publish(constants.RelayDefaultTopic, Buffer.from(msg));
} }
/**
* Register an observer of new messages received via waku relay
*
* @param callback called when a new message is received via waku relay
* @returns {void}
*/
addObserver(callback: (message: WakuMessage) => void): void {
this.observers.push(callback);
}
/** /**
* Join pubsub topic. * Join pubsub topic.
* This is present to override the behavior of Gossipsub and should not * This is present to override the behavior of Gossipsub and should not