feat: Waku API's subscribe

Introduced a new `subscribe` function to the Waku API.
This commit is contained in:
fryorcraken 2025-10-03 10:45:45 +10:00
parent e92f6a2409
commit 6e84d888c4
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4
11 changed files with 169 additions and 166 deletions

View File

@ -1,5 +1,6 @@
import type { IDecodedMessage, IDecoder } from "./message.js";
import type { Callback } from "./protocols.js";
import type { IDecodedMessage } from "./message.js";
import { ContentTopic } from "./misc.js";
import { IRoutingInfo } from "./sharding.js";
export type IFilter = {
readonly multicodec: string;
@ -38,9 +39,10 @@ export type IFilter = {
* console.error("Failed to subscribe");
* }
*/
subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>
subscribe(
contentTopics: ContentTopic[],
routingInfo: IRoutingInfo,
callback: (msg: IDecodedMessage) => void | Promise<void>
): Promise<boolean>;
/**
@ -64,8 +66,9 @@ export type IFilter = {
* console.error("Failed to unsubscribe");
* }
*/
unsubscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[]
unsubscribe(
contentTopics: ContentTopic[],
routingInfo: IRoutingInfo
): Promise<boolean>;
/**

View File

@ -105,6 +105,7 @@ export interface IEncoder {
export interface IDecoder<T extends IDecodedMessage> {
contentTopic: string;
pubsubTopic: PubsubTopic;
routingInfo: IRoutingInfo;
fromWireToProtoObj: (bytes: Uint8Array) => Promise<IProtoMessage | undefined>;
fromProtoObj: (
pubsubTopic: string,

View File

@ -11,6 +11,7 @@ import type { HealthStatus } from "./health_status.js";
import type { Libp2p } from "./libp2p.js";
import type { ILightPush } from "./light_push.js";
import { IDecodedMessage, IDecoder, IEncoder } from "./message.js";
import { ContentTopic } from "./misc.js";
import type { Protocols } from "./protocols.js";
import type { IRelay } from "./relay.js";
import type { ShardId } from "./sharding.js";
@ -251,6 +252,14 @@ export interface IWaku {
*/
createEncoder(params: CreateEncoderParams): IEncoder;
subscribe(
contentTopics: ContentTopic[],
callback: (message: {
contentTopic: ContentTopic;
payload: Uint8Array;
}) => void | Promise<void>
): Promise<void>;
/**
* @returns {boolean} `true` if the node was started and `false` otherwise
*/

View File

@ -49,7 +49,11 @@ describe("Filter SDK", () => {
const addStub = sinon.stub(Subscription.prototype, "add").resolves(true);
const startStub = sinon.stub(Subscription.prototype, "start");
const result = await filter.subscribe(decoder, callback);
const result = await filter.subscribe(
[testContentTopic],
testRoutingInfo,
callback
);
expect(result).to.be.true;
expect(addStub.calledOnce).to.be.true;
@ -57,7 +61,10 @@ describe("Filter SDK", () => {
});
it("should return false when unsubscribing from a non-existing subscription", async () => {
const result = await filter.unsubscribe(decoder);
const result = await filter.unsubscribe(
[testContentTopic],
testRoutingInfo
);
expect(result).to.be.false;
});

View File

@ -1,10 +1,10 @@
import { FilterCore } from "@waku/core";
import type {
Callback,
ContentTopic,
FilterProtocolOptions,
IDecodedMessage,
IDecoder,
IFilter
IFilter,
IRoutingInfo
} from "@waku/interfaces";
import { WakuMessage } from "@waku/proto";
import { Logger } from "@waku/utils";
@ -61,31 +61,25 @@ export class Filter implements IFilter {
this.subscriptions.clear();
}
public async subscribe<T extends IDecodedMessage>(
decoder: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>
public async subscribe(
contentTopics: ContentTopic[],
routingInfo: IRoutingInfo,
callback: (msg: IDecodedMessage) => void | Promise<void>
): Promise<boolean> {
const decoders = Array.isArray(decoder) ? decoder : [decoder];
if (decoders.length === 0) {
throw Error("Cannot subscribe with 0 decoders.");
if (contentTopics.length === 0) {
throw Error("Cannot subscribe with 0 contentTopics.");
}
const pubsubTopics = decoders.map((v) => v.pubsubTopic);
const singlePubsubTopic = pubsubTopics[0];
const contentTopics = decoders.map((v) => v.contentTopic);
const pubsubTopic = routingInfo.pubsubTopic;
log.info(
`Subscribing to contentTopics: ${contentTopics}, pubsubTopic: ${singlePubsubTopic}`
`Subscribing to contentTopics: ${contentTopics}, pubsubTopic: ${pubsubTopic}`
);
this.throwIfTopicNotSame(pubsubTopics);
let subscription = this.subscriptions.get(singlePubsubTopic);
let subscription = this.subscriptions.get(pubsubTopic);
if (!subscription) {
subscription = new Subscription({
pubsubTopic: singlePubsubTopic,
pubsubTopic,
protocol: this.protocol,
config: this.config,
peerManager: this.peerManager
@ -93,8 +87,8 @@ export class Filter implements IFilter {
subscription.start();
}
const result = await subscription.add(decoders, callback);
this.subscriptions.set(singlePubsubTopic, subscription);
const result = await subscription.add(contentTopics, routingInfo, callback);
this.subscriptions.set(pubsubTopic, subscription);
log.info(
`Subscription ${result ? "successful" : "failed"} for content topic: ${contentTopics}`
@ -103,38 +97,31 @@ export class Filter implements IFilter {
return result;
}
public async unsubscribe<T extends IDecodedMessage>(
decoder: IDecoder<T> | IDecoder<T>[]
public async unsubscribe(
contentTopics: ContentTopic[],
routingInfo: IRoutingInfo
): Promise<boolean> {
const decoders = Array.isArray(decoder) ? decoder : [decoder];
if (decoders.length === 0) {
throw Error("Cannot unsubscribe with 0 decoders.");
if (contentTopics.length === 0) {
throw Error("Cannot unsubscribe with 0 contentTopics.");
}
const pubsubTopics = decoders.map((v) => v.pubsubTopic);
const singlePubsubTopic = pubsubTopics[0];
const contentTopics = decoders.map((v) => v.contentTopic);
const { pubsubTopic } = routingInfo;
log.info(
`Unsubscribing from contentTopics: ${contentTopics}, pubsubTopic: ${singlePubsubTopic}`
`Unsubscribing from contentTopics: ${contentTopics}, pubsubTopic: ${pubsubTopic}`
);
this.throwIfTopicNotSame(pubsubTopics);
const subscription = this.subscriptions.get(singlePubsubTopic);
const subscription = this.subscriptions.get(pubsubTopic);
if (!subscription) {
log.warn("No subscriptions associated with the decoder.");
return false;
}
const result = await subscription.remove(decoders);
const result = await subscription.remove(contentTopics);
if (subscription.isEmpty()) {
log.warn("Subscription has no decoders anymore, terminating it.");
subscription.stop();
this.subscriptions.delete(singlePubsubTopic);
this.subscriptions.delete(pubsubTopic);
}
log.info(
@ -162,16 +149,4 @@ export class Filter implements IFilter {
subscription.invoke(message, peerId);
}
// Limiting to one pubsubTopic for simplicity reasons, we can enable subscription for more than one PubsubTopic at once later when requested
private throwIfTopicNotSame(pubsubTopics: string[]): void {
const first = pubsubTopics[0];
const isSameTopic = pubsubTopics.every((t) => t === first);
if (!isSameTopic) {
throw Error(
`Cannot subscribe to more than one pubsub topic at the same time, got pubsubTopics:${pubsubTopics}`
);
}
}
}

View File

@ -19,7 +19,6 @@ describe("Filter Subscription", () => {
let filterCore: FilterCore;
let peerManager: PeerManager;
let subscription: Subscription;
let decoder: IDecoder<IDecodedMessage>;
let config: FilterProtocolOptions;
beforeEach(() => {
@ -37,8 +36,6 @@ describe("Filter Subscription", () => {
config,
peerManager
});
decoder = mockDecoder();
});
afterEach(() => {

View File

@ -5,11 +5,11 @@ import {
} from "@libp2p/interface";
import { FilterCore, messageHashStr } from "@waku/core";
import type {
Callback,
ContentTopic,
FilterProtocolOptions,
IDecodedMessage,
IDecoder,
IProtoMessage,
IRoutingInfo,
PeerIdStr
} from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
@ -51,8 +51,8 @@ export class Subscription {
private readonly receivedMessages = new TTLSet<string>(60_000);
private callbacks = new Map<
IDecoder<IDecodedMessage>,
EventHandler<CustomEvent<WakuMessage>>
ContentTopic,
EventHandler<CustomEvent<IDecodedMessage>>
>();
private messageEmitter = new TypedEventEmitter<SubscriptionEvents>();
@ -63,9 +63,7 @@ export class Subscription {
private keepAliveIntervalId: number | null = null;
private get contentTopics(): string[] {
const allTopics = Array.from(this.callbacks.keys()).map(
(k) => k.contentTopic
);
const allTopics = Array.from(this.callbacks.keys());
const uniqueTopics = new Set(allTopics).values();
return Array.from(uniqueTopics);
@ -131,14 +129,13 @@ export class Subscription {
return this.callbacks.size === 0;
}
public async add<T extends IDecodedMessage>(
decoder: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>
public async add(
contentTopics: ContentTopic[],
routingInfo: IRoutingInfo,
callback: (msg: IDecodedMessage) => void | Promise<void>
): Promise<boolean> {
const decoders = Array.isArray(decoder) ? decoder : [decoder];
for (const decoder of decoders) {
this.addSingle(decoder, callback);
for (const contentTopic of contentTopics) {
this.addSingle(contentTopic, routingInfo, callback);
}
return this.toSubscribeContentTopics.size > 0
@ -146,13 +143,9 @@ export class Subscription {
: true; // if content topic is not new - subscription, most likely exists
}
public async remove<T extends IDecodedMessage>(
decoder: IDecoder<T> | IDecoder<T>[]
): Promise<boolean> {
const decoders = Array.isArray(decoder) ? decoder : [decoder];
for (const decoder of decoders) {
this.removeSingle(decoder);
public async remove(contentTopics: ContentTopic[]): Promise<boolean> {
for (const contentTopic of contentTopics) {
this.removeSingle(contentTopic);
}
return this.toUnsubscribeContentTopics.size > 0
@ -177,76 +170,63 @@ export class Subscription {
);
}
private addSingle<T extends IDecodedMessage>(
decoder: IDecoder<T>,
callback: Callback<T>
private addSingle(
contentTopic: ContentTopic,
routingInfo: IRoutingInfo,
callback: (msg: IDecodedMessage) => void | Promise<void>
): void {
log.info(`Adding subscription for contentTopic: ${decoder.contentTopic}`);
log.info(`Adding subscription for contentTopic: ${contentTopic}`);
const isNewContentTopic = !this.contentTopics.includes(
decoder.contentTopic
);
const isNewContentTopic = !this.contentTopics.includes(contentTopic);
if (isNewContentTopic) {
this.toSubscribeContentTopics.add(decoder.contentTopic);
this.toSubscribeContentTopics.add(contentTopic);
}
if (this.callbacks.has(decoder)) {
if (this.callbacks.has(contentTopic)) {
log.warn(
`Replacing callback associated associated with decoder with pubsubTopic:${decoder.pubsubTopic} and contentTopic:${decoder.contentTopic}`
`Replacing callback associated associated with decoder with pubsubTopic:${routingInfo.pubsubTopic} and contentTopic:${contentTopic}`
);
const callback = this.callbacks.get(decoder);
this.callbacks.delete(decoder);
this.messageEmitter.removeEventListener(decoder.contentTopic, callback);
const callback = this.callbacks.get(contentTopic);
this.callbacks.delete(contentTopic);
this.messageEmitter.removeEventListener(contentTopic, callback);
}
const eventHandler = (event: CustomEvent<WakuMessage>): void => {
void (async (): Promise<void> => {
try {
const message = await decoder.fromProtoObj(
decoder.pubsubTopic,
event.detail as IProtoMessage
);
void callback(message!);
} catch (err) {
log.error("Error decoding message", err);
}
})();
const eventHandler = (event: CustomEvent<IDecodedMessage>): void => {
void callback(event.detail);
};
this.callbacks.set(decoder, eventHandler);
this.messageEmitter.addEventListener(decoder.contentTopic, eventHandler);
this.callbacks.set(contentTopic, eventHandler);
this.messageEmitter.addEventListener(contentTopic, eventHandler);
log.info(
`Subscription added for contentTopic: ${decoder.contentTopic}, isNewContentTopic: ${isNewContentTopic}`
`Subscription added for contentTopic: ${contentTopic}, isNewContentTopic: ${isNewContentTopic}`
);
}
private removeSingle<T extends IDecodedMessage>(decoder: IDecoder<T>): void {
log.info(`Removing subscription for contentTopic: ${decoder.contentTopic}`);
private removeSingle(contentTopic: ContentTopic): void {
log.info(`Removing subscription for contentTopic: ${contentTopic}`);
const callback = this.callbacks.get(decoder);
const callback = this.callbacks.get(contentTopic);
if (!callback) {
log.warn(
`No callback associated with decoder with pubsubTopic:${decoder.pubsubTopic} and contentTopic:${decoder.contentTopic}`
`No callback associated with decoder with contentTopic: ${contentTopic}`
);
}
this.callbacks.delete(decoder);
this.messageEmitter.removeEventListener(decoder.contentTopic, callback);
this.callbacks.delete(contentTopic);
this.messageEmitter.removeEventListener(contentTopic, callback);
const isCompletelyRemoved = !this.contentTopics.includes(
decoder.contentTopic
);
const isCompletelyRemoved = !this.contentTopics.includes(contentTopic);
if (isCompletelyRemoved) {
this.toUnsubscribeContentTopics.add(decoder.contentTopic);
this.toUnsubscribeContentTopics.add(contentTopic);
}
log.info(
`Subscription removed for contentTopic: ${decoder.contentTopic}, isCompletelyRemoved: ${isCompletelyRemoved}`
`Subscription removed for contentTopic: ${contentTopic}, isCompletelyRemoved: ${isCompletelyRemoved}`
);
}
@ -383,8 +363,8 @@ export class Subscription {
}
private disposeHandlers(): void {
for (const [decoder, handler] of this.callbacks.entries()) {
this.messageEmitter.removeEventListener(decoder.contentTopic, handler);
for (const [contentTopic, handler] of this.callbacks.entries()) {
this.messageEmitter.removeEventListener(contentTopic, handler);
}
this.callbacks.clear();
}

View File

@ -1,6 +1,9 @@
import type { FilterCore } from "@waku/core";
import type { FilterProtocolOptions, Libp2p } from "@waku/interfaces";
import type { WakuMessage } from "@waku/proto";
import type {
FilterProtocolOptions,
IDecodedMessage,
Libp2p
} from "@waku/interfaces";
import type { PeerManager } from "../peer_manager/index.js";
@ -11,7 +14,7 @@ export type FilterConstructorParams = {
};
export type SubscriptionEvents = {
[contentTopic: string]: CustomEvent<WakuMessage>;
[contentTopic: string]: CustomEvent<IDecodedMessage>;
};
export type SubscriptionParams = {

View File

@ -1,4 +1,4 @@
import { IDecodedMessage, ProtocolError } from "@waku/interfaces";
import { ProtocolError } from "@waku/interfaces";
import type { HistoryEntry, MessageId } from "@waku/sds";
export const ReliableChannelEvent = {
@ -56,8 +56,7 @@ export interface ReliableChannelEvents {
possibleAckCount: number;
}>;
"message-acknowledged": CustomEvent<MessageId>;
// TODO probably T extends IDecodedMessage?
"message-received": CustomEvent<IDecodedMessage>;
"message-received": CustomEvent<Uint8Array>;
"irretrievable-message": CustomEvent<HistoryEntry>;
"sending-message-irrecoverable-error": CustomEvent<{
messageId: MessageId;

View File

@ -1,11 +1,12 @@
import { TypedEventEmitter } from "@libp2p/interface";
import { messageHash } from "@waku/core";
import {
type Callback,
type ContentTopic,
type IDecodedMessage,
type IDecoder,
type IEncoder,
type IMessage,
type IRoutingInfo,
ISendOptions,
type IWaku,
LightPushError,
@ -132,8 +133,9 @@ export class ReliableChannel<
) => Promise<LightPushSDKResult>;
private readonly _subscribe: (
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>
contentTopics: ContentTopic[],
routingInfo: IRoutingInfo,
callback: (msg: IDecodedMessage) => void | Promise<void>
) => Promise<boolean>;
private readonly _retrieve?: <T extends IDecodedMessage>(
@ -324,10 +326,11 @@ export class ReliableChannel<
const messageId = ReliableChannel.getMessageId(messagePayload);
// TODO: should the encoder give me the message hash?
// Encoding now to fail early, used later to get message hash
const protoMessage = await this.encoder.toProtoObj(wakuMessage);
if (!protoMessage) {
const retrievalHint = await computeRetrievalHint(
messagePayload,
this.encoder
);
if (!retrievalHint) {
this.safeSendEvent("sending-message-irrecoverable-error", {
detail: {
messageId: messageId,
@ -336,10 +339,6 @@ export class ReliableChannel<
});
return { success: false };
}
const retrievalHint = messageHash(
this.encoder.pubsubTopic,
protoMessage
);
this.safeSendEvent("sending-message", {
detail: messageId
@ -383,9 +382,13 @@ export class ReliableChannel<
private async subscribe(): Promise<boolean> {
this.assertStarted();
return this._subscribe(this.decoder, async (message: T) => {
await this.processIncomingMessage(message);
});
return this._subscribe(
[this.decoder.contentTopic],
this.decoder.routingInfo,
async (message: IDecodedMessage) => {
await this.processIncomingMessage(message);
}
);
}
/**
@ -393,9 +396,7 @@ export class ReliableChannel<
* @param msg
* @private
*/
private async processIncomingMessage<T extends IDecodedMessage>(
msg: T
): Promise<void> {
private async processIncomingMessage(msg: IDecodedMessage): Promise<void> {
// New message arrives, we need to unwrap it first
const sdsMessage = SdsMessage.decode(msg.payload);
@ -422,25 +423,8 @@ export class ReliableChannel<
if (sdsMessage.content && sdsMessage.content.length > 0) {
// Now, process the message with callback
// Overrides msg.payload with unwrapped payload
// TODO: can we do better?
const { payload: _p, ...allButPayload } = msg;
const unwrappedMessage = Object.assign(allButPayload, {
payload: sdsMessage.content,
hash: msg.hash,
hashStr: msg.hashStr,
version: msg.version,
contentTopic: msg.contentTopic,
pubsubTopic: msg.pubsubTopic,
timestamp: msg.timestamp,
rateLimitProof: msg.rateLimitProof,
ephemeral: msg.ephemeral,
meta: msg.meta
});
this.safeSendEvent("message-received", {
detail: unwrappedMessage as unknown as T
detail: sdsMessage.content
});
}
@ -689,3 +673,16 @@ export class ReliableChannel<
}
}
}
async function computeRetrievalHint(
payload: Uint8Array,
encoder: IEncoder
): Promise<Uint8Array | undefined> {
// TODO: should the encoder give me the message hash?
// Encoding now to fail early, used later to get message hash
const protoMessage = await encoder.toProtoObj({ payload });
if (!protoMessage) {
return undefined;
}
return messageHash(encoder.pubsubTopic, protoMessage);
}

View File

@ -7,6 +7,7 @@ import {
import type { MultiaddrInput } from "@multiformats/multiaddr";
import { ConnectionManager, createDecoder, createEncoder } from "@waku/core";
import type {
ContentTopic,
CreateDecoderParams,
CreateEncoderParams,
CreateNodeOptions,
@ -28,7 +29,7 @@ import {
HealthStatus,
Protocols
} from "@waku/interfaces";
import { createRoutingInfo, Logger } from "@waku/utils";
import { createRoutingInfo, Logger, pushOrInitMapSet } from "@waku/utils";
import { Filter } from "../filter/index.js";
import { HealthIndicator } from "../health_indicator/index.js";
@ -134,6 +135,37 @@ export class WakuNode implements IWaku {
);
}
public async subscribe(
contentTopics: ContentTopic[],
callback: (message: {
contentTopic: ContentTopic;
payload: Uint8Array;
}) => void | Promise<void>
): Promise<void> {
// Group content topics via routing info in case they spread across several shards
const ctToRouting = new Map();
for (const contentTopic of contentTopics) {
const routingInfo = this.createRoutingInfo(contentTopic);
pushOrInitMapSet(ctToRouting, routingInfo, contentTopic);
}
const promises = [];
if (this.filter) {
for (const [routingInfo, contentTopics] of ctToRouting) {
promises.push(
this.filter.subscribe(contentTopics, routingInfo, callback)
);
}
await Promise.all(promises);
}
if (this.relay) {
throw "not implemented";
}
throw "no subscribe protocol available";
}
public get peerId(): PeerId {
return this.libp2p.peerId;
}