diff --git a/packages/interfaces/src/waku.ts b/packages/interfaces/src/waku.ts index 68f1eca088..9ffe536d25 100644 --- a/packages/interfaces/src/waku.ts +++ b/packages/interfaces/src/waku.ts @@ -55,7 +55,12 @@ export interface IWakuEvents { [WakuEvent.Health]: CustomEvent; } +export interface IMessageEmitterEvents { + [contentTopic: string]: CustomEvent; +} + export type IWakuEventEmitter = TypedEventEmitter; +export type IMessageEmitter = TypedEventEmitter; export interface IWaku { libp2p: Libp2p; @@ -79,6 +84,20 @@ export interface IWaku { */ events: IWakuEventEmitter; + /** + * Emits messages on their content topic. Messages may be coming from subscriptions + * or store queries (TODO). The payload is directly emitted + * + * @example + * ```typescript + * waku.messageEmitter.addEventListener("/some/0/content-topic/proto", (event) => { + * const payload: UInt8Array = event.detail + * MyDecoder.decode(payload); + * }); + * ``` + */ + messageEmitter: IMessageEmitter; + /** * Returns a unique identifier for a node on the network. * @@ -252,13 +271,7 @@ export interface IWaku { */ createEncoder(params: CreateEncoderParams): IEncoder; - subscribe( - contentTopics: ContentTopic[], - callback: (message: { - contentTopic: ContentTopic; - payload: Uint8Array; - }) => void | Promise - ): Promise; + subscribe(contentTopics: ContentTopic[]): Promise; /** * @returns {boolean} `true` if the node was started and `false` otherwise diff --git a/packages/sdk/src/waku/waku.ts b/packages/sdk/src/waku/waku.ts index 0d0757b216..03366de5a2 100644 --- a/packages/sdk/src/waku/waku.ts +++ b/packages/sdk/src/waku/waku.ts @@ -6,7 +6,7 @@ import { } from "@libp2p/interface"; import type { MultiaddrInput } from "@multiformats/multiaddr"; import { ConnectionManager, createDecoder, createEncoder } from "@waku/core"; -import type { +import { ContentTopic, CreateDecoderParams, CreateEncoderParams, @@ -16,6 +16,7 @@ import type { IEncoder, IFilter, ILightPush, + IMessageEmitter, IRelay, IRoutingInfo, IStore, @@ -55,6 +56,7 @@ export class WakuNode implements IWaku { public lightPush?: ILightPush; public readonly events: IWakuEventEmitter = new TypedEventEmitter(); + public readonly messageEmitter: IMessageEmitter = new TypedEventEmitter(); private readonly networkConfig: NetworkConfig; @@ -135,15 +137,9 @@ export class WakuNode implements IWaku { ); } - public async subscribe( - contentTopics: ContentTopic[], - callback: (message: { - contentTopic: ContentTopic; - payload: Uint8Array; - }) => void | Promise - ): Promise { + public async subscribe(contentTopics: ContentTopic[]): Promise { // Group content topics via routing info in case they spread across several shards - const ctToRouting = new Map(); + const ctToRouting: Map> = new Map(); for (const contentTopic of contentTopics) { const routingInfo = this.createRoutingInfo(contentTopic); pushOrInitMapSet(ctToRouting, routingInfo, contentTopic); @@ -152,12 +148,18 @@ export class WakuNode implements IWaku { const promises = []; if (this.filter) { for (const [routingInfo, contentTopics] of ctToRouting) { + // TODO: Returned bool from subscribe should be used promises.push( - this.filter.subscribe(contentTopics, routingInfo, callback) + this.filter.subscribe( + Array.from(contentTopics), + routingInfo, + this.emitIncomingMessages.bind(this, Array.from(contentTopics)) + ) ); } await Promise.all(promises); + return; } if (this.relay) { @@ -320,4 +322,20 @@ export class WakuNode implements IWaku { ): IRoutingInfo { return createRoutingInfo(this.networkConfig, { contentTopic, shardId }); } + + private emitIncomingMessages( + contentTopics: ContentTopic[], + message: { + contentTopic: ContentTopic; + payload: Uint8Array; + } + ): void { + if (contentTopics.includes(message.contentTopic)) { + this.messageEmitter.dispatchEvent( + new CustomEvent(message.contentTopic, { + detail: message.payload + }) + ); + } + } } diff --git a/packages/tests/src/lib/index.ts b/packages/tests/src/lib/index.ts index 85de368b23..742c09c092 100644 --- a/packages/tests/src/lib/index.ts +++ b/packages/tests/src/lib/index.ts @@ -124,14 +124,14 @@ export class ServiceNodesFleet { } class MultipleNodesMessageCollector { - public callback: (msg: IDecodedMessage) => void = () => {}; - protected messageList: Array = []; + public callback: (msg: Partial) => void = () => {}; + protected messageList: Array> = []; public constructor( private messageCollectors: MessageCollector[], private relayNodes?: ServiceNode[], private strictChecking: boolean = false ) { - this.callback = (msg: IDecodedMessage): void => { + this.callback = (msg: Partial): void => { log.info("Got a message"); this.messageList.push(msg); }; @@ -153,7 +153,9 @@ class MultipleNodesMessageCollector { } } - public getMessage(index: number): MessageRpcResponse | IDecodedMessage { + public getMessage( + index: number + ): MessageRpcResponse | Partial { return this.messageList[index]; } diff --git a/packages/tests/tests/waku.node.spec.ts b/packages/tests/tests/waku.node.spec.ts index 9483bc8ba6..7c07a8c5aa 100644 --- a/packages/tests/tests/waku.node.spec.ts +++ b/packages/tests/tests/waku.node.spec.ts @@ -29,8 +29,11 @@ import { makeLogFileName, NOISE_KEY_1, NOISE_KEY_2, + runMultipleNodes, ServiceNode, - tearDownNodes + ServiceNodesFleet, + tearDownNodes, + teardownNodesWithRedundancy } from "../src/index.js"; const TestContentTopic = "/test/1/waku/utf8"; @@ -291,3 +294,95 @@ describe("User Agent", function () { ); }); }); + +describe("Waku API", function () { + describe("WakuNode.subscribe (light node)", function () { + this.timeout(100000); + let waku: LightNode; + let serviceNodes: ServiceNodesFleet; + const messageText = "some message"; + const messagePayload = utf8ToBytes(messageText); + + beforeEachCustom(this, async () => { + [serviceNodes, waku] = await runMultipleNodes( + this.ctx, + TestRoutingInfo, + undefined + ); + }); + + afterEachCustom(this, async () => { + await teardownNodesWithRedundancy(serviceNodes, waku); + }); + + it("Subscribe and receive messages on 2 different content topics", async function () { + // Subscribe to the first content topic and send a message. + waku.messageEmitter.addEventListener(TestContentTopic, (event) => { + // TODO: fix the callback type + serviceNodes.messageCollector.callback({ + contentTopic: TestContentTopic, + payload: event.detail + }); + }); + await waku.subscribe([TestContentTopic]); + + await waku.lightPush.send(TestEncoder, { payload: messagePayload }); + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + true, + "Waiting for the first message" + ); + serviceNodes.messageCollector.verifyReceivedMessage(0, { + expectedMessageText: messageText, + expectedContentTopic: TestContentTopic, + expectedPubsubTopic: TestRoutingInfo.pubsubTopic + }); + + // Modify subscription to include a new content topic and send a message. + const newMessageText = "Filtering still works!"; + const newContentTopic = "/test/2/waku-filter/default"; + const newRoutingInfo = createRoutingInfo(DefaultTestNetworkConfig, { + contentTopic: newContentTopic + }); + const newEncoder = createPlainEncoder({ + contentTopic: newContentTopic, + routingInfo: newRoutingInfo + }); + // subscribe to second content topic + waku.messageEmitter.addEventListener(newContentTopic, (event) => { + // TODO: fix the callback type + serviceNodes.messageCollector.callback({ + contentTopic: TestContentTopic, + payload: event.detail + }); + }); + await waku.subscribe([newContentTopic]); + + await waku.lightPush.send(newEncoder, { + payload: utf8ToBytes(newMessageText) + }); + expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq( + true, + "Waiting for the second message" + ); + serviceNodes.messageCollector.verifyReceivedMessage(1, { + expectedContentTopic: newContentTopic, + expectedMessageText: newMessageText, + expectedPubsubTopic: TestRoutingInfo.pubsubTopic + }); + + // Send another message on the initial content topic to verify it still works. + const thirdMessageText = "Filtering still works on first subscription!"; + const thirdMessagePayload = { payload: utf8ToBytes(thirdMessageText) }; + await waku.lightPush.send(TestEncoder, thirdMessagePayload); + expect(await serviceNodes.messageCollector.waitForMessages(3)).to.eq( + true, + "Waiting for the third message" + ); + serviceNodes.messageCollector.verifyReceivedMessage(2, { + expectedMessageText: thirdMessageText, + expectedContentTopic: TestContentTopic, + expectedPubsubTopic: TestRoutingInfo.pubsubTopic + }); + }); + }); +});