feat: add POC for Subscribe API

This commit is contained in:
Sasha 2025-10-07 23:55:49 +02:00
parent 82be279331
commit 68b6bf90bd
No known key found for this signature in database
6 changed files with 108 additions and 18 deletions

View File

@ -84,6 +84,11 @@ export interface ISendMessage {
*/
export type RequestId = string;
/**
* Listener for subscribe messages.
*/
export type SubscribeListener = (message: IDecodedMessage) => void;
export interface IMetaSetter {
(message: IProtoMessage & { meta: undefined }): Uint8Array;
}

View File

@ -4,12 +4,12 @@ import {
IDecoder,
IFilter,
IStore,
NetworkConfig
NetworkConfig,
SubscribeListener
} from "@waku/interfaces";
import { createRoutingInfo } from "@waku/utils";
import { MessageStore } from "./message_store.js";
import { IAckManager } from "./utils.js";
type AckManagerConstructorParams = {
messageStore: MessageStore;
@ -18,6 +18,14 @@ type AckManagerConstructorParams = {
networkConfig: NetworkConfig;
};
export interface IAckManager {
start(): void;
stop(): void;
observe(contentTopic: string): Promise<boolean>;
subscribe(contentTopic: string, cb: SubscribeListener): Promise<boolean>;
unsubscribe(contentTopic: string): Promise<void>;
}
export class AckManager implements IAckManager {
private readonly messageStore: MessageStore;
private readonly filterAckManager: FilterAckManager;
@ -49,7 +57,7 @@ export class AckManager implements IAckManager {
this.subscribedContentTopics.clear();
}
public async subscribe(contentTopic: string): Promise<boolean> {
public async observe(contentTopic: string): Promise<boolean> {
if (this.subscribedContentTopics.has(contentTopic)) {
return true;
}
@ -69,6 +77,24 @@ export class AckManager implements IAckManager {
])
).some((success) => success);
}
public async subscribe(
contentTopic: string,
cb: SubscribeListener
): Promise<boolean> {
const decoder = createDecoder(
contentTopic,
createRoutingInfo(this.networkConfig, {
contentTopic
})
);
return this.filterAckManager.subscribe(decoder, cb);
}
public async unsubscribe(contentTopic: string): Promise<void> {
return this.filterAckManager.unsubscribe(contentTopic);
}
}
class FilterAckManager {
@ -77,7 +103,9 @@ class FilterAckManager {
public constructor(
private messageStore: MessageStore,
private filter: IFilter
) {}
) {
this.onMessage = this.onMessage.bind(this);
}
public start(): void {
return;
@ -91,18 +119,48 @@ class FilterAckManager {
this.decoders.clear();
}
public async subscribe(decoder: IDecoder<IDecodedMessage>): Promise<boolean> {
const success = await this.filter.subscribe(
decoder,
this.onMessage.bind(this)
);
public async subscribe(
decoder: IDecoder<IDecodedMessage>,
cb?: SubscribeListener
): Promise<boolean> {
const success = await this.filter.subscribe(decoder, (message) => {
try {
cb?.(message);
} catch (error) {
// ignore
}
try {
this.onMessage(message);
} catch (error) {
// ignore
}
});
if (success) {
this.decoders.add(decoder);
}
return success;
}
private async onMessage(message: IDecodedMessage): Promise<void> {
public async unsubscribe(contentTopic: string): Promise<void> {
const decoders = Array.from(this.decoders).filter(
(decoder) => decoder.contentTopic === contentTopic
);
const promises = decoders.map((decoder) =>
this.filter.unsubscribe(decoder)
);
await Promise.all(promises);
for (const decoder of decoders) {
this.decoders.delete(decoder);
}
}
private onMessage(message: IDecodedMessage): void {
if (!this.messageStore.has(message.hashStr)) {
this.messageStore.add(message, { filterAck: true });
}

View File

@ -4,7 +4,8 @@ import {
ISendMessage,
IStore,
NetworkConfig,
RequestId
RequestId,
SubscribeListener
} from "@waku/interfaces";
import { AckManager } from "./ack_manager.js";
@ -58,4 +59,15 @@ export class Messaging implements IMessaging {
public send(wakuLikeMessage: ISendMessage): Promise<RequestId> {
return this.sender.send(wakuLikeMessage);
}
public subscribe(
contentTopic: string,
cb: SubscribeListener
): Promise<boolean> {
return this.ackManager.subscribe(contentTopic, cb);
}
public unsubscribe(contentTopic: string): Promise<void> {
return this.ackManager.unsubscribe(contentTopic);
}
}

View File

@ -48,7 +48,7 @@ export class Sender {
public async send(message: ISendMessage): Promise<RequestId> {
const requestId = await this.messageStore.queue(message);
await this.ackManager.subscribe(message.contentTopic);
await this.ackManager.observe(message.contentTopic);
await this.sendMessage(requestId, message);
return requestId;

View File

@ -1,5 +0,0 @@
export interface IAckManager {
start(): void;
stop(): void;
subscribe(contentTopic: string): Promise<boolean>;
}

View File

@ -21,7 +21,8 @@ import type {
IWaku,
IWakuEventEmitter,
Libp2p,
NetworkConfig
NetworkConfig,
SubscribeListener
} from "@waku/interfaces";
import {
DefaultNetworkConfig,
@ -305,6 +306,25 @@ export class WakuNode implements IWaku {
return this.messaging.send(message);
}
public subscribe(
contentTopic: string,
cb: SubscribeListener
): Promise<boolean> {
if (!this.messaging) {
throw new Error("Messaging not initialized");
}
return this.messaging.subscribe(contentTopic, cb);
}
public unsubscribe(contentTopic: string): Promise<void> {
if (!this.messaging) {
throw new Error("Messaging not initialized");
}
return this.messaging.unsubscribe(contentTopic);
}
private createRoutingInfo(
contentTopic?: string,
shardId?: number