From 6e84d888c4cd50c7b81a8440729f95ee443020f4 Mon Sep 17 00:00:00 2001 From: fryorcraken Date: Fri, 3 Oct 2025 10:45:45 +1000 Subject: [PATCH] feat: Waku API's subscribe Introduced a new `subscribe` function to the Waku API. --- packages/interfaces/src/filter.ts | 17 +-- packages/interfaces/src/message.ts | 1 + packages/interfaces/src/waku.ts | 9 ++ packages/sdk/src/filter/filter.spec.ts | 11 +- packages/sdk/src/filter/filter.ts | 75 +++++-------- packages/sdk/src/filter/subscription.spec.ts | 3 - packages/sdk/src/filter/subscription.ts | 104 +++++++----------- packages/sdk/src/filter/types.ts | 9 +- packages/sdk/src/reliable_channel/events.ts | 5 +- .../src/reliable_channel/reliable_channel.ts | 67 ++++++----- packages/sdk/src/waku/waku.ts | 34 +++++- 11 files changed, 169 insertions(+), 166 deletions(-) diff --git a/packages/interfaces/src/filter.ts b/packages/interfaces/src/filter.ts index cf383fcf4e..279bc5870a 100644 --- a/packages/interfaces/src/filter.ts +++ b/packages/interfaces/src/filter.ts @@ -1,5 +1,6 @@ -import type { IDecodedMessage, IDecoder } from "./message.js"; -import type { Callback } from "./protocols.js"; +import type { IDecodedMessage } from "./message.js"; +import { ContentTopic } from "./misc.js"; +import { IRoutingInfo } from "./sharding.js"; export type IFilter = { readonly multicodec: string; @@ -38,9 +39,10 @@ export type IFilter = { * console.error("Failed to subscribe"); * } */ - subscribe( - decoders: IDecoder | IDecoder[], - callback: Callback + subscribe( + contentTopics: ContentTopic[], + routingInfo: IRoutingInfo, + callback: (msg: IDecodedMessage) => void | Promise ): Promise; /** @@ -64,8 +66,9 @@ export type IFilter = { * console.error("Failed to unsubscribe"); * } */ - unsubscribe( - decoders: IDecoder | IDecoder[] + unsubscribe( + contentTopics: ContentTopic[], + routingInfo: IRoutingInfo ): Promise; /** diff --git a/packages/interfaces/src/message.ts b/packages/interfaces/src/message.ts index caecb73aec..36b6bf67bc 100644 --- a/packages/interfaces/src/message.ts +++ b/packages/interfaces/src/message.ts @@ -105,6 +105,7 @@ export interface IEncoder { export interface IDecoder { contentTopic: string; pubsubTopic: PubsubTopic; + routingInfo: IRoutingInfo; fromWireToProtoObj: (bytes: Uint8Array) => Promise; fromProtoObj: ( pubsubTopic: string, diff --git a/packages/interfaces/src/waku.ts b/packages/interfaces/src/waku.ts index 71914755f7..68f1eca088 100644 --- a/packages/interfaces/src/waku.ts +++ b/packages/interfaces/src/waku.ts @@ -11,6 +11,7 @@ import type { HealthStatus } from "./health_status.js"; import type { Libp2p } from "./libp2p.js"; import type { ILightPush } from "./light_push.js"; import { IDecodedMessage, IDecoder, IEncoder } from "./message.js"; +import { ContentTopic } from "./misc.js"; import type { Protocols } from "./protocols.js"; import type { IRelay } from "./relay.js"; import type { ShardId } from "./sharding.js"; @@ -251,6 +252,14 @@ export interface IWaku { */ createEncoder(params: CreateEncoderParams): IEncoder; + subscribe( + contentTopics: ContentTopic[], + callback: (message: { + contentTopic: ContentTopic; + payload: Uint8Array; + }) => void | Promise + ): Promise; + /** * @returns {boolean} `true` if the node was started and `false` otherwise */ diff --git a/packages/sdk/src/filter/filter.spec.ts b/packages/sdk/src/filter/filter.spec.ts index 4eabec6969..96577e0702 100644 --- a/packages/sdk/src/filter/filter.spec.ts +++ b/packages/sdk/src/filter/filter.spec.ts @@ -49,7 +49,11 @@ describe("Filter SDK", () => { const addStub = sinon.stub(Subscription.prototype, "add").resolves(true); const startStub = sinon.stub(Subscription.prototype, "start"); - const result = await filter.subscribe(decoder, callback); + const result = await filter.subscribe( + [testContentTopic], + testRoutingInfo, + callback + ); expect(result).to.be.true; expect(addStub.calledOnce).to.be.true; @@ -57,7 +61,10 @@ describe("Filter SDK", () => { }); it("should return false when unsubscribing from a non-existing subscription", async () => { - const result = await filter.unsubscribe(decoder); + const result = await filter.unsubscribe( + [testContentTopic], + testRoutingInfo + ); expect(result).to.be.false; }); diff --git a/packages/sdk/src/filter/filter.ts b/packages/sdk/src/filter/filter.ts index 4d12f8d32d..f74361b7d9 100644 --- a/packages/sdk/src/filter/filter.ts +++ b/packages/sdk/src/filter/filter.ts @@ -1,10 +1,10 @@ import { FilterCore } from "@waku/core"; import type { - Callback, + ContentTopic, FilterProtocolOptions, IDecodedMessage, - IDecoder, - IFilter + IFilter, + IRoutingInfo } from "@waku/interfaces"; import { WakuMessage } from "@waku/proto"; import { Logger } from "@waku/utils"; @@ -61,31 +61,25 @@ export class Filter implements IFilter { this.subscriptions.clear(); } - public async subscribe( - decoder: IDecoder | IDecoder[], - callback: Callback + public async subscribe( + contentTopics: ContentTopic[], + routingInfo: IRoutingInfo, + callback: (msg: IDecodedMessage) => void | Promise ): Promise { - const decoders = Array.isArray(decoder) ? decoder : [decoder]; - - if (decoders.length === 0) { - throw Error("Cannot subscribe with 0 decoders."); + if (contentTopics.length === 0) { + throw Error("Cannot subscribe with 0 contentTopics."); } - const pubsubTopics = decoders.map((v) => v.pubsubTopic); - const singlePubsubTopic = pubsubTopics[0]; - - const contentTopics = decoders.map((v) => v.contentTopic); + const pubsubTopic = routingInfo.pubsubTopic; log.info( - `Subscribing to contentTopics: ${contentTopics}, pubsubTopic: ${singlePubsubTopic}` + `Subscribing to contentTopics: ${contentTopics}, pubsubTopic: ${pubsubTopic}` ); - this.throwIfTopicNotSame(pubsubTopics); - - let subscription = this.subscriptions.get(singlePubsubTopic); + let subscription = this.subscriptions.get(pubsubTopic); if (!subscription) { subscription = new Subscription({ - pubsubTopic: singlePubsubTopic, + pubsubTopic, protocol: this.protocol, config: this.config, peerManager: this.peerManager @@ -93,8 +87,8 @@ export class Filter implements IFilter { subscription.start(); } - const result = await subscription.add(decoders, callback); - this.subscriptions.set(singlePubsubTopic, subscription); + const result = await subscription.add(contentTopics, routingInfo, callback); + this.subscriptions.set(pubsubTopic, subscription); log.info( `Subscription ${result ? "successful" : "failed"} for content topic: ${contentTopics}` @@ -103,38 +97,31 @@ export class Filter implements IFilter { return result; } - public async unsubscribe( - decoder: IDecoder | IDecoder[] + public async unsubscribe( + contentTopics: ContentTopic[], + routingInfo: IRoutingInfo ): Promise { - const decoders = Array.isArray(decoder) ? decoder : [decoder]; - - if (decoders.length === 0) { - throw Error("Cannot unsubscribe with 0 decoders."); + if (contentTopics.length === 0) { + throw Error("Cannot unsubscribe with 0 contentTopics."); } - - const pubsubTopics = decoders.map((v) => v.pubsubTopic); - const singlePubsubTopic = pubsubTopics[0]; - - const contentTopics = decoders.map((v) => v.contentTopic); + const { pubsubTopic } = routingInfo; log.info( - `Unsubscribing from contentTopics: ${contentTopics}, pubsubTopic: ${singlePubsubTopic}` + `Unsubscribing from contentTopics: ${contentTopics}, pubsubTopic: ${pubsubTopic}` ); - this.throwIfTopicNotSame(pubsubTopics); - - const subscription = this.subscriptions.get(singlePubsubTopic); + const subscription = this.subscriptions.get(pubsubTopic); if (!subscription) { log.warn("No subscriptions associated with the decoder."); return false; } - const result = await subscription.remove(decoders); + const result = await subscription.remove(contentTopics); if (subscription.isEmpty()) { log.warn("Subscription has no decoders anymore, terminating it."); subscription.stop(); - this.subscriptions.delete(singlePubsubTopic); + this.subscriptions.delete(pubsubTopic); } log.info( @@ -162,16 +149,4 @@ export class Filter implements IFilter { subscription.invoke(message, peerId); } - - // Limiting to one pubsubTopic for simplicity reasons, we can enable subscription for more than one PubsubTopic at once later when requested - private throwIfTopicNotSame(pubsubTopics: string[]): void { - const first = pubsubTopics[0]; - const isSameTopic = pubsubTopics.every((t) => t === first); - - if (!isSameTopic) { - throw Error( - `Cannot subscribe to more than one pubsub topic at the same time, got pubsubTopics:${pubsubTopics}` - ); - } - } } diff --git a/packages/sdk/src/filter/subscription.spec.ts b/packages/sdk/src/filter/subscription.spec.ts index 37f3d48ed3..94eb56ce8f 100644 --- a/packages/sdk/src/filter/subscription.spec.ts +++ b/packages/sdk/src/filter/subscription.spec.ts @@ -19,7 +19,6 @@ describe("Filter Subscription", () => { let filterCore: FilterCore; let peerManager: PeerManager; let subscription: Subscription; - let decoder: IDecoder; let config: FilterProtocolOptions; beforeEach(() => { @@ -37,8 +36,6 @@ describe("Filter Subscription", () => { config, peerManager }); - - decoder = mockDecoder(); }); afterEach(() => { diff --git a/packages/sdk/src/filter/subscription.ts b/packages/sdk/src/filter/subscription.ts index 3ab7273eb1..793b53a382 100644 --- a/packages/sdk/src/filter/subscription.ts +++ b/packages/sdk/src/filter/subscription.ts @@ -5,11 +5,11 @@ import { } from "@libp2p/interface"; import { FilterCore, messageHashStr } from "@waku/core"; import type { - Callback, + ContentTopic, FilterProtocolOptions, IDecodedMessage, - IDecoder, IProtoMessage, + IRoutingInfo, PeerIdStr } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces"; @@ -51,8 +51,8 @@ export class Subscription { private readonly receivedMessages = new TTLSet(60_000); private callbacks = new Map< - IDecoder, - EventHandler> + ContentTopic, + EventHandler> >(); private messageEmitter = new TypedEventEmitter(); @@ -63,9 +63,7 @@ export class Subscription { private keepAliveIntervalId: number | null = null; private get contentTopics(): string[] { - const allTopics = Array.from(this.callbacks.keys()).map( - (k) => k.contentTopic - ); + const allTopics = Array.from(this.callbacks.keys()); const uniqueTopics = new Set(allTopics).values(); return Array.from(uniqueTopics); @@ -131,14 +129,13 @@ export class Subscription { return this.callbacks.size === 0; } - public async add( - decoder: IDecoder | IDecoder[], - callback: Callback + public async add( + contentTopics: ContentTopic[], + routingInfo: IRoutingInfo, + callback: (msg: IDecodedMessage) => void | Promise ): Promise { - const decoders = Array.isArray(decoder) ? decoder : [decoder]; - - for (const decoder of decoders) { - this.addSingle(decoder, callback); + for (const contentTopic of contentTopics) { + this.addSingle(contentTopic, routingInfo, callback); } return this.toSubscribeContentTopics.size > 0 @@ -146,13 +143,9 @@ export class Subscription { : true; // if content topic is not new - subscription, most likely exists } - public async remove( - decoder: IDecoder | IDecoder[] - ): Promise { - const decoders = Array.isArray(decoder) ? decoder : [decoder]; - - for (const decoder of decoders) { - this.removeSingle(decoder); + public async remove(contentTopics: ContentTopic[]): Promise { + for (const contentTopic of contentTopics) { + this.removeSingle(contentTopic); } return this.toUnsubscribeContentTopics.size > 0 @@ -177,76 +170,63 @@ export class Subscription { ); } - private addSingle( - decoder: IDecoder, - callback: Callback + private addSingle( + contentTopic: ContentTopic, + routingInfo: IRoutingInfo, + callback: (msg: IDecodedMessage) => void | Promise ): void { - log.info(`Adding subscription for contentTopic: ${decoder.contentTopic}`); + log.info(`Adding subscription for contentTopic: ${contentTopic}`); - const isNewContentTopic = !this.contentTopics.includes( - decoder.contentTopic - ); + const isNewContentTopic = !this.contentTopics.includes(contentTopic); if (isNewContentTopic) { - this.toSubscribeContentTopics.add(decoder.contentTopic); + this.toSubscribeContentTopics.add(contentTopic); } - if (this.callbacks.has(decoder)) { + if (this.callbacks.has(contentTopic)) { log.warn( - `Replacing callback associated associated with decoder with pubsubTopic:${decoder.pubsubTopic} and contentTopic:${decoder.contentTopic}` + `Replacing callback associated associated with decoder with pubsubTopic:${routingInfo.pubsubTopic} and contentTopic:${contentTopic}` ); - const callback = this.callbacks.get(decoder); - this.callbacks.delete(decoder); - this.messageEmitter.removeEventListener(decoder.contentTopic, callback); + const callback = this.callbacks.get(contentTopic); + this.callbacks.delete(contentTopic); + this.messageEmitter.removeEventListener(contentTopic, callback); } - const eventHandler = (event: CustomEvent): void => { - void (async (): Promise => { - try { - const message = await decoder.fromProtoObj( - decoder.pubsubTopic, - event.detail as IProtoMessage - ); - void callback(message!); - } catch (err) { - log.error("Error decoding message", err); - } - })(); + const eventHandler = (event: CustomEvent): void => { + void callback(event.detail); }; - this.callbacks.set(decoder, eventHandler); - this.messageEmitter.addEventListener(decoder.contentTopic, eventHandler); + this.callbacks.set(contentTopic, eventHandler); + this.messageEmitter.addEventListener(contentTopic, eventHandler); log.info( - `Subscription added for contentTopic: ${decoder.contentTopic}, isNewContentTopic: ${isNewContentTopic}` + `Subscription added for contentTopic: ${contentTopic}, isNewContentTopic: ${isNewContentTopic}` ); } - private removeSingle(decoder: IDecoder): void { - log.info(`Removing subscription for contentTopic: ${decoder.contentTopic}`); + private removeSingle(contentTopic: ContentTopic): void { + log.info(`Removing subscription for contentTopic: ${contentTopic}`); - const callback = this.callbacks.get(decoder); + const callback = this.callbacks.get(contentTopic); if (!callback) { log.warn( - `No callback associated with decoder with pubsubTopic:${decoder.pubsubTopic} and contentTopic:${decoder.contentTopic}` + `No callback associated with decoder with contentTopic: ${contentTopic}` ); } - this.callbacks.delete(decoder); - this.messageEmitter.removeEventListener(decoder.contentTopic, callback); + this.callbacks.delete(contentTopic); + this.messageEmitter.removeEventListener(contentTopic, callback); - const isCompletelyRemoved = !this.contentTopics.includes( - decoder.contentTopic - ); + const isCompletelyRemoved = !this.contentTopics.includes(contentTopic); if (isCompletelyRemoved) { - this.toUnsubscribeContentTopics.add(decoder.contentTopic); + this.toUnsubscribeContentTopics.add(contentTopic); } log.info( - `Subscription removed for contentTopic: ${decoder.contentTopic}, isCompletelyRemoved: ${isCompletelyRemoved}` + `Subscription removed for contentTopic: ${contentTopic}, isCompletelyRemoved: ${isCompletelyRemoved}` ); } @@ -383,8 +363,8 @@ export class Subscription { } private disposeHandlers(): void { - for (const [decoder, handler] of this.callbacks.entries()) { - this.messageEmitter.removeEventListener(decoder.contentTopic, handler); + for (const [contentTopic, handler] of this.callbacks.entries()) { + this.messageEmitter.removeEventListener(contentTopic, handler); } this.callbacks.clear(); } diff --git a/packages/sdk/src/filter/types.ts b/packages/sdk/src/filter/types.ts index 44326728d1..e748c9d93e 100644 --- a/packages/sdk/src/filter/types.ts +++ b/packages/sdk/src/filter/types.ts @@ -1,6 +1,9 @@ import type { FilterCore } from "@waku/core"; -import type { FilterProtocolOptions, Libp2p } from "@waku/interfaces"; -import type { WakuMessage } from "@waku/proto"; +import type { + FilterProtocolOptions, + IDecodedMessage, + Libp2p +} from "@waku/interfaces"; import type { PeerManager } from "../peer_manager/index.js"; @@ -11,7 +14,7 @@ export type FilterConstructorParams = { }; export type SubscriptionEvents = { - [contentTopic: string]: CustomEvent; + [contentTopic: string]: CustomEvent; }; export type SubscriptionParams = { diff --git a/packages/sdk/src/reliable_channel/events.ts b/packages/sdk/src/reliable_channel/events.ts index c79c2c0c0f..2382c73103 100644 --- a/packages/sdk/src/reliable_channel/events.ts +++ b/packages/sdk/src/reliable_channel/events.ts @@ -1,4 +1,4 @@ -import { IDecodedMessage, ProtocolError } from "@waku/interfaces"; +import { ProtocolError } from "@waku/interfaces"; import type { HistoryEntry, MessageId } from "@waku/sds"; export const ReliableChannelEvent = { @@ -56,8 +56,7 @@ export interface ReliableChannelEvents { possibleAckCount: number; }>; "message-acknowledged": CustomEvent; - // TODO probably T extends IDecodedMessage? - "message-received": CustomEvent; + "message-received": CustomEvent; "irretrievable-message": CustomEvent; "sending-message-irrecoverable-error": CustomEvent<{ messageId: MessageId; diff --git a/packages/sdk/src/reliable_channel/reliable_channel.ts b/packages/sdk/src/reliable_channel/reliable_channel.ts index 49b55aa495..8ea6b58a84 100644 --- a/packages/sdk/src/reliable_channel/reliable_channel.ts +++ b/packages/sdk/src/reliable_channel/reliable_channel.ts @@ -1,11 +1,12 @@ import { TypedEventEmitter } from "@libp2p/interface"; import { messageHash } from "@waku/core"; import { - type Callback, + type ContentTopic, type IDecodedMessage, type IDecoder, type IEncoder, type IMessage, + type IRoutingInfo, ISendOptions, type IWaku, LightPushError, @@ -132,8 +133,9 @@ export class ReliableChannel< ) => Promise; private readonly _subscribe: ( - decoders: IDecoder | IDecoder[], - callback: Callback + contentTopics: ContentTopic[], + routingInfo: IRoutingInfo, + callback: (msg: IDecodedMessage) => void | Promise ) => Promise; private readonly _retrieve?: ( @@ -324,10 +326,11 @@ export class ReliableChannel< const messageId = ReliableChannel.getMessageId(messagePayload); - // TODO: should the encoder give me the message hash? - // Encoding now to fail early, used later to get message hash - const protoMessage = await this.encoder.toProtoObj(wakuMessage); - if (!protoMessage) { + const retrievalHint = await computeRetrievalHint( + messagePayload, + this.encoder + ); + if (!retrievalHint) { this.safeSendEvent("sending-message-irrecoverable-error", { detail: { messageId: messageId, @@ -336,10 +339,6 @@ export class ReliableChannel< }); return { success: false }; } - const retrievalHint = messageHash( - this.encoder.pubsubTopic, - protoMessage - ); this.safeSendEvent("sending-message", { detail: messageId @@ -383,9 +382,13 @@ export class ReliableChannel< private async subscribe(): Promise { this.assertStarted(); - return this._subscribe(this.decoder, async (message: T) => { - await this.processIncomingMessage(message); - }); + return this._subscribe( + [this.decoder.contentTopic], + this.decoder.routingInfo, + async (message: IDecodedMessage) => { + await this.processIncomingMessage(message); + } + ); } /** @@ -393,9 +396,7 @@ export class ReliableChannel< * @param msg * @private */ - private async processIncomingMessage( - msg: T - ): Promise { + private async processIncomingMessage(msg: IDecodedMessage): Promise { // New message arrives, we need to unwrap it first const sdsMessage = SdsMessage.decode(msg.payload); @@ -422,25 +423,8 @@ export class ReliableChannel< if (sdsMessage.content && sdsMessage.content.length > 0) { // Now, process the message with callback - - // Overrides msg.payload with unwrapped payload - // TODO: can we do better? - const { payload: _p, ...allButPayload } = msg; - const unwrappedMessage = Object.assign(allButPayload, { - payload: sdsMessage.content, - hash: msg.hash, - hashStr: msg.hashStr, - version: msg.version, - contentTopic: msg.contentTopic, - pubsubTopic: msg.pubsubTopic, - timestamp: msg.timestamp, - rateLimitProof: msg.rateLimitProof, - ephemeral: msg.ephemeral, - meta: msg.meta - }); - this.safeSendEvent("message-received", { - detail: unwrappedMessage as unknown as T + detail: sdsMessage.content }); } @@ -689,3 +673,16 @@ export class ReliableChannel< } } } + +async function computeRetrievalHint( + payload: Uint8Array, + encoder: IEncoder +): Promise { + // TODO: should the encoder give me the message hash? + // Encoding now to fail early, used later to get message hash + const protoMessage = await encoder.toProtoObj({ payload }); + if (!protoMessage) { + return undefined; + } + return messageHash(encoder.pubsubTopic, protoMessage); +} diff --git a/packages/sdk/src/waku/waku.ts b/packages/sdk/src/waku/waku.ts index 7336b06df7..0d0757b216 100644 --- a/packages/sdk/src/waku/waku.ts +++ b/packages/sdk/src/waku/waku.ts @@ -7,6 +7,7 @@ import { import type { MultiaddrInput } from "@multiformats/multiaddr"; import { ConnectionManager, createDecoder, createEncoder } from "@waku/core"; import type { + ContentTopic, CreateDecoderParams, CreateEncoderParams, CreateNodeOptions, @@ -28,7 +29,7 @@ import { HealthStatus, Protocols } from "@waku/interfaces"; -import { createRoutingInfo, Logger } from "@waku/utils"; +import { createRoutingInfo, Logger, pushOrInitMapSet } from "@waku/utils"; import { Filter } from "../filter/index.js"; import { HealthIndicator } from "../health_indicator/index.js"; @@ -134,6 +135,37 @@ export class WakuNode implements IWaku { ); } + public async subscribe( + contentTopics: ContentTopic[], + callback: (message: { + contentTopic: ContentTopic; + payload: Uint8Array; + }) => void | Promise + ): Promise { + // Group content topics via routing info in case they spread across several shards + const ctToRouting = new Map(); + for (const contentTopic of contentTopics) { + const routingInfo = this.createRoutingInfo(contentTopic); + pushOrInitMapSet(ctToRouting, routingInfo, contentTopic); + } + + const promises = []; + if (this.filter) { + for (const [routingInfo, contentTopics] of ctToRouting) { + promises.push( + this.filter.subscribe(contentTopics, routingInfo, callback) + ); + } + + await Promise.all(promises); + } + + if (this.relay) { + throw "not implemented"; + } + throw "no subscribe protocol available"; + } + public get peerId(): PeerId { return this.libp2p.peerId; }