From ba6b67cc53284cbce96a765ec8b6983dcbcd497b Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Mon, 10 May 2021 14:54:08 +1000 Subject: [PATCH] Enable observers to filter by contentTopic --- examples/cli-chat/src/chat.ts | 15 ++++++----- examples/web-chat/src/App.tsx | 2 +- src/lib/waku_relay/index.spec.ts | 36 +++++++++++++++++++++++++ src/lib/waku_relay/index.ts | 46 +++++++++++++++++++++++++++----- 4 files changed, 85 insertions(+), 14 deletions(-) diff --git a/examples/cli-chat/src/chat.ts b/examples/cli-chat/src/chat.ts index 4b8d7f6026..d11f00fdd9 100644 --- a/examples/cli-chat/src/chat.ts +++ b/examples/cli-chat/src/chat.ts @@ -42,12 +42,15 @@ export default async function startChat(): Promise { console.log(`Hi, ${nick}!`); - waku.relay.addObserver((message) => { - if (message.payload) { - const chatMsg = ChatMessage.decode(message.payload); - console.log(formatMessage(chatMsg)); - } - }); + waku.relay.addObserver( + (message) => { + if (message.payload) { + const chatMsg = ChatMessage.decode(message.payload); + console.log(formatMessage(chatMsg)); + } + }, + [ChatContentTopic] + ); if (opts.staticNode) { console.log(`Dialing ${opts.staticNode}`); diff --git a/examples/web-chat/src/App.tsx b/examples/web-chat/src/App.tsx index 58b7ef3f8a..d25a3415fd 100644 --- a/examples/web-chat/src/App.tsx +++ b/examples/web-chat/src/App.tsx @@ -97,7 +97,7 @@ export default function App() { .then(() => console.log('Waku init done')) .catch((e) => console.log('Waku init failed ', e)); } else { - stateWaku.relay.addObserver(handleRelayMessage); + stateWaku.relay.addObserver(handleRelayMessage, [ChatContentTopic]); stateWaku.libp2p.peerStore.on( 'change:protocols', diff --git a/src/lib/waku_relay/index.spec.ts b/src/lib/waku_relay/index.spec.ts index 4c76b79b1a..1b5a3d4609 100644 --- a/src/lib/waku_relay/index.spec.ts +++ b/src/lib/waku_relay/index.spec.ts @@ -87,6 +87,42 @@ describe('Waku Relay', () => { expect(receivedMsg.payloadAsUtf8).to.eq(messageText); }); + it('Filter on content topics', async function () { + this.timeout(10000); + + const fooMessageText = 'Published on content topic foo'; + const barMessageText = 'Published on content topic bar'; + const fooMessage = WakuMessage.fromUtf8String(fooMessageText, 'foo'); + const barMessage = WakuMessage.fromUtf8String(barMessageText, 'bar'); + + const receivedBarMsgPromise: Promise = new Promise( + (resolve) => { + waku2.relay.addObserver(resolve, ['bar']); + } + ); + + const allMessages: WakuMessage[] = []; + waku2.relay.addObserver((wakuMsg) => { + allMessages.push(wakuMsg); + }); + + await waku1.relay.send(fooMessage); + await waku1.relay.send(barMessage); + + const receivedBarMsg = await receivedBarMsgPromise; + + expect(receivedBarMsg.contentTopic).to.eq(barMessage.contentTopic); + expect(receivedBarMsg.version).to.eq(barMessage.version); + expect(receivedBarMsg.payloadAsUtf8).to.eq(barMessageText); + expect(allMessages.length).to.eq(2); + expect(allMessages[0].contentTopic).to.eq(fooMessage.contentTopic); + expect(allMessages[0].version).to.eq(fooMessage.version); + expect(allMessages[0].payloadAsUtf8).to.eq(fooMessageText); + expect(allMessages[1].contentTopic).to.eq(barMessage.contentTopic); + expect(allMessages[1].version).to.eq(barMessage.version); + expect(allMessages[1].payloadAsUtf8).to.eq(barMessageText); + }); + describe('Interop: Nim', function () { describe('Nim connects to js', function () { let waku: Waku; diff --git a/src/lib/waku_relay/index.ts b/src/lib/waku_relay/index.ts index a4b5f12452..495bdda27b 100644 --- a/src/lib/waku_relay/index.ts +++ b/src/lib/waku_relay/index.ts @@ -56,7 +56,13 @@ interface GossipOptions { */ export class WakuRelay extends Gossipsub implements Pubsub { heartbeat: RelayHeartbeat; - public observers: Array<(message: WakuMessage) => void>; + /** + * observers called when receiving new message. + * Observers under key "" are always called. + */ + public observers: { + [contentTopic: string]: Array<(message: WakuMessage) => void>; + }; /** * @@ -73,7 +79,7 @@ export class WakuRelay extends Gossipsub implements Pubsub { ); this.heartbeat = new RelayHeartbeat(this); - this.observers = []; + this.observers = {}; const multicodecs = [constants.RelayCodec]; @@ -90,9 +96,18 @@ export class WakuRelay extends Gossipsub implements Pubsub { public start(): void { this.on(constants.RelayDefaultTopic, (event) => { const wakuMsg = WakuMessage.decode(event.data); - this.observers.forEach((callbackFn) => { - callbackFn(wakuMsg); - }); + if (this.observers['']) { + this.observers[''].forEach((callbackFn) => { + callbackFn(wakuMsg); + }); + } + if (wakuMsg.contentTopic) { + if (this.observers[wakuMsg.contentTopic]) { + this.observers[wakuMsg.contentTopic].forEach((callbackFn) => { + callbackFn(wakuMsg); + }); + } + } }); super.start(); @@ -114,10 +129,27 @@ export class WakuRelay extends Gossipsub implements Pubsub { * Register an observer of new messages received via waku relay * * @param callback called when a new message is received via waku relay + * @param contentTopics Content Topics for which the callback with be called, + * all of them if undefined, [] or ["",..] is passed. * @returns {void} */ - addObserver(callback: (message: WakuMessage) => void): void { - this.observers.push(callback); + addObserver( + callback: (message: WakuMessage) => void, + contentTopics: string[] = [] + ): void { + if (contentTopics.length === 0) { + if (!this.observers['']) { + this.observers[''] = []; + } + this.observers[''].push(callback); + } else { + contentTopics.forEach((contentTopic) => { + if (!this.observers[contentTopic]) { + this.observers[contentTopic] = []; + } + this.observers[contentTopic].push(callback); + }); + } } /**