diff --git a/examples/cli-chat/src/chat.ts b/examples/cli-chat/src/chat.ts index 07150b2fa6..d11f00fdd9 100644 --- a/examples/cli-chat/src/chat.ts +++ b/examples/cli-chat/src/chat.ts @@ -6,7 +6,6 @@ import { multiaddr, Multiaddr } from 'multiaddr'; import { ChatMessage } from 'waku/chat_message'; import Waku from 'waku/waku'; import { WakuMessage } from 'waku/waku_message'; -import { RelayDefaultTopic } from 'waku/waku_relay'; import { StoreCodec } from 'waku/waku_store'; const ChatContentTopic = 'dingpu'; @@ -43,15 +42,15 @@ export default async function startChat(): Promise { console.log(`Hi, ${nick}!`); - // TODO: Bubble event to waku, infer topic, decode msg - // Tracked with https://github.com/status-im/js-waku/issues/19 - waku.libp2p.pubsub.on(RelayDefaultTopic, (event) => { - const wakuMsg = WakuMessage.decode(event.data); - if (wakuMsg.payload) { - const chatMsg = ChatMessage.decode(wakuMsg.payload); - console.log(formatMessage(chatMsg)); - } - }); + waku.relay.addObserver( + (message) => { + if (message.payload) { + const chatMsg = ChatMessage.decode(message.payload); + console.log(formatMessage(chatMsg)); + } + }, + [ChatContentTopic] + ); if (opts.staticNode) { console.log(`Dialing ${opts.staticNode}`); diff --git a/examples/web-chat/src/App.tsx b/examples/web-chat/src/App.tsx index e2ec3b68ee..d25a3415fd 100644 --- a/examples/web-chat/src/App.tsx +++ b/examples/web-chat/src/App.tsx @@ -5,7 +5,6 @@ import './App.css'; import { ChatMessage } from './ChatMessage'; import { ChatMessage as WakuChatMessage } from 'waku/chat_message'; import { WakuMessage } from 'waku/waku_message'; -import { RelayDefaultTopic } from 'waku/waku_relay'; import { StoreCodec } from 'waku/waku_store'; import handleCommand from './command'; import Room from './Room'; @@ -52,10 +51,14 @@ export default function App() { let [nick, setNick] = useState(generate()); useEffect(() => { - const handleRelayMessage = (event: { data: Uint8Array }) => { - const chatMsg = decodeWakuMessage(event.data); - if (chatMsg) { - setNewMessages([chatMsg]); + const handleRelayMessage = (wakuMsg: WakuMessage) => { + if (wakuMsg.payload) { + const chatMsg = ChatMessage.fromWakuChatMessage( + WakuChatMessage.decode(wakuMsg.payload) + ); + if (chatMsg) { + setNewMessages([chatMsg]); + } } }; @@ -94,7 +97,7 @@ export default function App() { .then(() => console.log('Waku init done')) .catch((e) => console.log('Waku init failed ', e)); } else { - stateWaku.libp2p.pubsub.on(RelayDefaultTopic, handleRelayMessage); + stateWaku.relay.addObserver(handleRelayMessage, [ChatContentTopic]); stateWaku.libp2p.peerStore.on( 'change:protocols', @@ -103,10 +106,6 @@ export default function App() { // To clean up listener when component unmounts return () => { - stateWaku?.libp2p.pubsub.removeListener( - RelayDefaultTopic, - handleRelayMessage - ); stateWaku?.libp2p.peerStore.removeListener( 'change:protocols', handleProtocolChange.bind({}, stateWaku) @@ -173,13 +172,3 @@ async function initWaku(setter: (waku: Waku) => void) { console.log('Issue starting waku ', e); } } - -function decodeWakuMessage(data: Uint8Array): null | ChatMessage { - const wakuMsg = WakuMessage.decode(data); - if (!wakuMsg.payload) { - return null; - } - return ChatMessage.fromWakuChatMessage( - WakuChatMessage.decode(wakuMsg.payload) - ); -} diff --git a/src/lib/waku_message.ts b/src/lib/waku_message.ts index cb1331c537..407a093fea 100644 --- a/src/lib/waku_message.ts +++ b/src/lib/waku_message.ts @@ -4,8 +4,8 @@ import { Reader } from 'protobufjs/minimal'; // Protecting the user from protobuf oddities import * as proto from '../proto/waku/v2/message'; -export const DEFAULT_CONTENT_TOPIC = '/waku/2/default-content/proto'; -const DEFAULT_VERSION = 0; +export const DefaultContentTopic = '/waku/2/default-content/proto'; +const DefaultVersion = 0; export class WakuMessage { public constructor(public proto: proto.WakuMessage) {} @@ -18,12 +18,12 @@ export class WakuMessage { */ static fromUtf8String( utf8: string, - contentTopic: string = DEFAULT_CONTENT_TOPIC + contentTopic: string = DefaultContentTopic ): WakuMessage { const payload = Buffer.from(utf8, 'utf-8'); return new WakuMessage({ payload, - version: DEFAULT_VERSION, + version: DefaultVersion, contentTopic, }); } @@ -36,11 +36,11 @@ export class WakuMessage { */ static fromBytes( payload: Uint8Array, - contentTopic: string = DEFAULT_CONTENT_TOPIC + contentTopic: string = DefaultContentTopic ): WakuMessage { return new WakuMessage({ payload, - version: DEFAULT_VERSION, + version: DefaultVersion, contentTopic, }); } diff --git a/src/lib/waku_relay/index.spec.ts b/src/lib/waku_relay/index.spec.ts index 2e8eada2e5..1b5a3d4609 100644 --- a/src/lib/waku_relay/index.spec.ts +++ b/src/lib/waku_relay/index.spec.ts @@ -1,5 +1,4 @@ import { expect } from 'chai'; -import Pubsub from 'libp2p-interfaces/src/pubsub'; import TCP from 'libp2p-tcp'; import { @@ -72,19 +71,56 @@ describe('Waku Relay', () => { it('Publish', async function () { this.timeout(10000); - const message = WakuMessage.fromUtf8String('JS to JS communication works'); + const messageText = 'JS to JS communication works'; + const message = WakuMessage.fromUtf8String(messageText); - const receivedPromise = waitForNextData(waku2.libp2p.pubsub); + const receivedMsgPromise: Promise = new Promise((resolve) => { + waku2.relay.addObserver(resolve); + }); await waku1.relay.send(message); - const receivedMsg = await receivedPromise; + const receivedMsg = await receivedMsgPromise; expect(receivedMsg.contentTopic).to.eq(message.contentTopic); expect(receivedMsg.version).to.eq(message.version); + expect(receivedMsg.payloadAsUtf8).to.eq(messageText); + }); - const payload = Buffer.from(receivedMsg.payload!); - expect(Buffer.compare(payload, message.payload!)).to.eq(0); + it('Filter on content topics', async function () { + this.timeout(10000); + + const fooMessageText = 'Published on content topic foo'; + const barMessageText = 'Published on content topic bar'; + const fooMessage = WakuMessage.fromUtf8String(fooMessageText, 'foo'); + const barMessage = WakuMessage.fromUtf8String(barMessageText, 'bar'); + + const receivedBarMsgPromise: Promise = new Promise( + (resolve) => { + waku2.relay.addObserver(resolve, ['bar']); + } + ); + + const allMessages: WakuMessage[] = []; + waku2.relay.addObserver((wakuMsg) => { + allMessages.push(wakuMsg); + }); + + await waku1.relay.send(fooMessage); + await waku1.relay.send(barMessage); + + const receivedBarMsg = await receivedBarMsgPromise; + + expect(receivedBarMsg.contentTopic).to.eq(barMessage.contentTopic); + expect(receivedBarMsg.version).to.eq(barMessage.version); + expect(receivedBarMsg.payloadAsUtf8).to.eq(barMessageText); + expect(allMessages.length).to.eq(2); + expect(allMessages[0].contentTopic).to.eq(fooMessage.contentTopic); + expect(allMessages[0].version).to.eq(fooMessage.version); + expect(allMessages[0].payloadAsUtf8).to.eq(fooMessageText); + expect(allMessages[1].contentTopic).to.eq(barMessage.contentTopic); + expect(allMessages[1].version).to.eq(barMessage.version); + expect(allMessages[1].payloadAsUtf8).to.eq(barMessageText); }); describe('Interop: Nim', function () { @@ -126,7 +162,8 @@ describe('Waku Relay', () => { it('Js publishes to nim', async function () { this.timeout(5000); - const message = WakuMessage.fromUtf8String('This is a message'); + const messageText = 'This is a message'; + const message = WakuMessage.fromUtf8String(messageText); await waku.relay.send(message); @@ -139,26 +176,27 @@ describe('Waku Relay', () => { expect(msgs[0].contentTopic).to.equal(message.contentTopic); expect(msgs[0].version).to.equal(message.version); - - const payload = Buffer.from(msgs[0].payload!); - expect(Buffer.compare(payload, message.payload!)).to.equal(0); + expect(msgs[0].payloadAsUtf8).to.equal(messageText); }); it('Nim publishes to js', async function () { this.timeout(5000); - const message = WakuMessage.fromUtf8String('Here is another message.'); + const messageText = 'Here is another message.'; + const message = WakuMessage.fromUtf8String(messageText); - const receivedPromise = waitForNextData(waku.libp2p.pubsub); + const receivedMsgPromise: Promise = new Promise( + (resolve) => { + waku.relay.addObserver(resolve); + } + ); await nimWaku.sendMessage(message); - const receivedMsg = await receivedPromise; + const receivedMsg = await receivedMsgPromise; expect(receivedMsg.contentTopic).to.eq(message.contentTopic); expect(receivedMsg.version).to.eq(message.version); - - const payload = Buffer.from(receivedMsg.payload!); - expect(Buffer.compare(payload, message.payload!)).to.eq(0); + expect(receivedMsg.payloadAsUtf8).to.eq(messageText); }); }); @@ -209,7 +247,8 @@ describe('Waku Relay', () => { it('Js publishes to nim', async function () { this.timeout(30000); - const message = WakuMessage.fromUtf8String('This is a message'); + const messageText = 'This is a message'; + const message = WakuMessage.fromUtf8String(messageText); await delay(1000); await waku.relay.send(message); @@ -223,27 +262,28 @@ describe('Waku Relay', () => { expect(msgs[0].contentTopic).to.equal(message.contentTopic); expect(msgs[0].version).to.equal(message.version); - - const payload = Buffer.from(msgs[0].payload!); - expect(Buffer.compare(payload, message.payload!)).to.equal(0); + expect(msgs[0].payloadAsUtf8).to.equal(messageText); }); it('Nim publishes to js', async function () { await delay(200); - const message = WakuMessage.fromUtf8String('Here is another message.'); + const messageText = 'Here is another message.'; + const message = WakuMessage.fromUtf8String(messageText); - const receivedPromise = waitForNextData(waku.libp2p.pubsub); + const receivedMsgPromise: Promise = new Promise( + (resolve) => { + waku.relay.addObserver(resolve); + } + ); await nimWaku.sendMessage(message); - const receivedMsg = await receivedPromise; + const receivedMsg = await receivedMsgPromise; expect(receivedMsg.contentTopic).to.eq(message.contentTopic); expect(receivedMsg.version).to.eq(message.version); - - const payload = Buffer.from(receivedMsg.payload!); - expect(Buffer.compare(payload, message.payload!)).to.eq(0); + expect(receivedMsg.payloadAsUtf8).to.eq(messageText); }); }); @@ -313,21 +353,18 @@ describe('Waku Relay', () => { const msgStr = 'Hello there!'; const message = WakuMessage.fromUtf8String(msgStr); - const waku2ReceivedPromise = waitForNextData(waku2.libp2p.pubsub); + const waku2ReceivedMsgPromise: Promise = new Promise( + (resolve) => { + waku2.relay.addObserver(resolve); + } + ); await waku1.relay.send(message); console.log('Waiting for message'); - const waku2ReceivedMsg = await waku2ReceivedPromise; + const waku2ReceivedMsg = await waku2ReceivedMsgPromise; expect(waku2ReceivedMsg.payloadAsUtf8).to.eq(msgStr); }); }); }); }); - -async function waitForNextData(pubsub: Pubsub): Promise { - const msg = (await new Promise((resolve) => { - pubsub.once(RelayDefaultTopic, resolve); - })) as Pubsub.InMessage; - return WakuMessage.decode(msg.data); -} diff --git a/src/lib/waku_relay/index.ts b/src/lib/waku_relay/index.ts index dccec6eeae..495bdda27b 100644 --- a/src/lib/waku_relay/index.ts +++ b/src/lib/waku_relay/index.ts @@ -12,7 +12,7 @@ import { messageIdToString, shuffle, } from 'libp2p-gossipsub/src/utils'; -import { InMessage } from 'libp2p-interfaces/src/pubsub'; +import Pubsub, { InMessage } from 'libp2p-interfaces/src/pubsub'; import { SignaturePolicy } from 'libp2p-interfaces/src/pubsub/signature-policy'; import PeerId from 'peer-id'; @@ -26,7 +26,7 @@ export * from './constants'; export * from './relay_heartbeat'; /** - * See GossipOptions from libp2p-gossipsub + * See {GossipOptions} from libp2p-gossipsub */ interface GossipOptions { emitSelf: boolean; @@ -48,8 +48,21 @@ interface GossipOptions { Dlazy: number; } -export class WakuRelay extends Gossipsub { +/** + * Implements the [Waku v2 Relay protocol]{@link https://rfc.vac.dev/spec/11/}. + * Must be passed as a `pubsub` module to a {Libp2p} instance. + * + * @implements {Pubsub} + */ +export class WakuRelay extends Gossipsub implements Pubsub { heartbeat: RelayHeartbeat; + /** + * observers called when receiving new message. + * Observers under key "" are always called. + */ + public observers: { + [contentTopic: string]: Array<(message: WakuMessage) => void>; + }; /** * @@ -66,6 +79,7 @@ export class WakuRelay extends Gossipsub { ); this.heartbeat = new RelayHeartbeat(this); + this.observers = {}; const multicodecs = [constants.RelayCodec]; @@ -74,28 +88,76 @@ export class WakuRelay extends Gossipsub { /** * Mounts the gossipsub protocol onto the libp2p node - * and subscribes to the default topic + * and subscribes to the default topic. + * * @override * @returns {void} */ - start(): void { + public start(): void { + this.on(constants.RelayDefaultTopic, (event) => { + const wakuMsg = WakuMessage.decode(event.data); + if (this.observers['']) { + this.observers[''].forEach((callbackFn) => { + callbackFn(wakuMsg); + }); + } + if (wakuMsg.contentTopic) { + if (this.observers[wakuMsg.contentTopic]) { + this.observers[wakuMsg.contentTopic].forEach((callbackFn) => { + callbackFn(wakuMsg); + }); + } + } + }); + super.start(); super.subscribe(constants.RelayDefaultTopic); } /** - * Send Waku messages under default topic - * @override + * Send Waku message. + * * @param {WakuMessage} message * @returns {Promise} */ - async send(message: WakuMessage): Promise { + public async send(message: WakuMessage): Promise { const msg = message.encode(); await super.publish(constants.RelayDefaultTopic, Buffer.from(msg)); } /** - * Join topic + * Register an observer of new messages received via waku relay + * + * @param callback called when a new message is received via waku relay + * @param contentTopics Content Topics for which the callback with be called, + * all of them if undefined, [] or ["",..] is passed. + * @returns {void} + */ + addObserver( + callback: (message: WakuMessage) => void, + contentTopics: string[] = [] + ): void { + if (contentTopics.length === 0) { + if (!this.observers['']) { + this.observers[''] = []; + } + this.observers[''].push(callback); + } else { + contentTopics.forEach((contentTopic) => { + if (!this.observers[contentTopic]) { + this.observers[contentTopic] = []; + } + this.observers[contentTopic].push(callback); + }); + } + } + + /** + * Join pubsub topic. + * This is present to override the behavior of Gossipsub and should not + * be used by API Consumers + * + * @ignore * @param {string} topic * @returns {void} * @override @@ -152,8 +214,11 @@ export class WakuRelay extends Gossipsub { } /** - * Publish messages + * Publish messages. + * This is present to override the behavior of Gossipsub and should not + * be used by API Consumers * + * @ignore * @override * @param {InMessage} msg * @returns {void} @@ -222,7 +287,13 @@ export class WakuRelay extends Gossipsub { } /** - * Emits gossip to peers in a particular topic + * Emits gossip to peers in a particular topic. + * + * This is present to override the behavior of Gossipsub and should not + * be used by API Consumers + * + * @ignore + * @override * @param {string} topic * @param {Set} exclude peers to exclude * @returns {void} @@ -300,7 +371,12 @@ export class WakuRelay extends Gossipsub { } /** - * Make a PRUNE control message for a peer in a topic + * Make a PRUNE control message for a peer in a topic. + * This is present to override the behavior of Gossipsub and should not + * be used by API Consumers + * + * @ignore + * @override * @param {string} id * @param {string} topic * @param {boolean} doPX diff --git a/src/lib/waku_store/history_rpc.ts b/src/lib/waku_store/history_rpc.ts index 8adb105616..3e3fdb537a 100644 --- a/src/lib/waku_store/history_rpc.ts +++ b/src/lib/waku_store/history_rpc.ts @@ -2,14 +2,14 @@ import { Reader } from 'protobufjs/minimal'; import { v4 as uuid } from 'uuid'; import * as proto from '../../proto/waku/v2/store'; -import { DEFAULT_CONTENT_TOPIC } from '../waku_message'; +import { DefaultContentTopic } from '../waku_message'; import { RelayDefaultTopic } from '../waku_relay'; export class HistoryRPC { public constructor(public proto: proto.HistoryRPC) {} static createQuery( - contentTopics: string[] = [DEFAULT_CONTENT_TOPIC], + contentTopics: string[] = [DefaultContentTopic], cursor?: proto.Index, pubsubTopic: string = RelayDefaultTopic ): HistoryRPC { diff --git a/src/test_utils/nim_waku.ts b/src/test_utils/nim_waku.ts index 34ac85ab61..3096463076 100644 --- a/src/test_utils/nim_waku.ts +++ b/src/test_utils/nim_waku.ts @@ -9,6 +9,7 @@ import PeerId from 'peer-id'; import { WakuMessage } from '../lib/waku_message'; import { RelayDefaultTopic } from '../lib/waku_relay'; +import * as proto from '../proto/waku/v2/message'; import { existsAsync, mkdirAsync, openAsync } from './async_fs'; import waitForLine from './log_file'; @@ -167,9 +168,9 @@ export class NimWaku { async messages(): Promise { this.checkProcess(); - return this.rpcCall('get_waku_v2_relay_v1_messages', [ + return this.rpcCall('get_waku_v2_relay_v1_messages', [ RelayDefaultTopic, - ]); + ]).then((msgs) => msgs.map((protoMsg) => new WakuMessage(protoMsg))); } async getPeerId(): Promise {