diff --git a/src/lib/waku_relay/index.spec.ts b/src/lib/waku_relay/index.spec.ts index 2e8eada2e5..a2292a5473 100644 --- a/src/lib/waku_relay/index.spec.ts +++ b/src/lib/waku_relay/index.spec.ts @@ -1,5 +1,4 @@ import { expect } from 'chai'; -import Pubsub from 'libp2p-interfaces/src/pubsub'; import TCP from 'libp2p-tcp'; import { @@ -74,11 +73,13 @@ describe('Waku Relay', () => { const message = WakuMessage.fromUtf8String('JS to JS communication works'); - const receivedPromise = waitForNextData(waku2.libp2p.pubsub); + const receivedMsgPromise: Promise = new Promise((resolve) => { + waku2.relay.addObserver(resolve); + }); await waku1.relay.send(message); - const receivedMsg = await receivedPromise; + const receivedMsg = await receivedMsgPromise; expect(receivedMsg.contentTopic).to.eq(message.contentTopic); expect(receivedMsg.version).to.eq(message.version); @@ -148,11 +149,15 @@ describe('Waku Relay', () => { this.timeout(5000); const message = WakuMessage.fromUtf8String('Here is another message.'); - const receivedPromise = waitForNextData(waku.libp2p.pubsub); + const receivedMsgPromise: Promise = new Promise( + (resolve) => { + waku.relay.addObserver(resolve); + } + ); await nimWaku.sendMessage(message); - const receivedMsg = await receivedPromise; + const receivedMsg = await receivedMsgPromise; expect(receivedMsg.contentTopic).to.eq(message.contentTopic); expect(receivedMsg.version).to.eq(message.version); @@ -233,11 +238,15 @@ describe('Waku Relay', () => { const message = WakuMessage.fromUtf8String('Here is another message.'); - const receivedPromise = waitForNextData(waku.libp2p.pubsub); + const receivedMsgPromise: Promise = new Promise( + (resolve) => { + waku.relay.addObserver(resolve); + } + ); await nimWaku.sendMessage(message); - const receivedMsg = await receivedPromise; + const receivedMsg = await receivedMsgPromise; expect(receivedMsg.contentTopic).to.eq(message.contentTopic); expect(receivedMsg.version).to.eq(message.version); @@ -313,21 +322,18 @@ describe('Waku Relay', () => { const msgStr = 'Hello there!'; const message = WakuMessage.fromUtf8String(msgStr); - const waku2ReceivedPromise = waitForNextData(waku2.libp2p.pubsub); + const waku2ReceivedMsgPromise: Promise = new Promise( + (resolve) => { + waku2.relay.addObserver(resolve); + } + ); await waku1.relay.send(message); console.log('Waiting for message'); - const waku2ReceivedMsg = await waku2ReceivedPromise; + const waku2ReceivedMsg = await waku2ReceivedMsgPromise; expect(waku2ReceivedMsg.payloadAsUtf8).to.eq(msgStr); }); }); }); }); - -async function waitForNextData(pubsub: Pubsub): Promise { - const msg = (await new Promise((resolve) => { - pubsub.once(RelayDefaultTopic, resolve); - })) as Pubsub.InMessage; - return WakuMessage.decode(msg.data); -} diff --git a/src/lib/waku_relay/index.ts b/src/lib/waku_relay/index.ts index c9ca6175e2..a4b5f12452 100644 --- a/src/lib/waku_relay/index.ts +++ b/src/lib/waku_relay/index.ts @@ -56,6 +56,7 @@ interface GossipOptions { */ export class WakuRelay extends Gossipsub implements Pubsub { heartbeat: RelayHeartbeat; + public observers: Array<(message: WakuMessage) => void>; /** * @@ -72,6 +73,7 @@ export class WakuRelay extends Gossipsub implements Pubsub { ); this.heartbeat = new RelayHeartbeat(this); + this.observers = []; const multicodecs = [constants.RelayCodec]; @@ -86,6 +88,13 @@ export class WakuRelay extends Gossipsub implements Pubsub { * @returns {void} */ public start(): void { + this.on(constants.RelayDefaultTopic, (event) => { + const wakuMsg = WakuMessage.decode(event.data); + this.observers.forEach((callbackFn) => { + callbackFn(wakuMsg); + }); + }); + super.start(); super.subscribe(constants.RelayDefaultTopic); } @@ -101,6 +110,16 @@ export class WakuRelay extends Gossipsub implements Pubsub { await super.publish(constants.RelayDefaultTopic, Buffer.from(msg)); } + /** + * Register an observer of new messages received via waku relay + * + * @param callback called when a new message is received via waku relay + * @returns {void} + */ + addObserver(callback: (message: WakuMessage) => void): void { + this.observers.push(callback); + } + /** * Join pubsub topic. * This is present to override the behavior of Gossipsub and should not