From 7f98bb183de1f733bcb6f3f16fcf4cd27b641c3a Mon Sep 17 00:00:00 2001 From: Sasha Date: Thu, 2 Oct 2025 19:39:30 +0200 Subject: [PATCH] move from encoder/decoder/codec to simple message parameter --- packages/interfaces/src/waku.ts | 10 +-- packages/sdk/src/messaging/ack_manager.ts | 54 ++++++++++----- packages/sdk/src/messaging/index.ts | 3 +- packages/sdk/src/messaging/message_store.ts | 26 +++---- packages/sdk/src/messaging/messaging.ts | 27 +++----- packages/sdk/src/messaging/sender.ts | 77 ++++++++++++++++----- packages/sdk/src/messaging/utils.ts | 12 +++- packages/sdk/src/waku/waku.ts | 13 ++-- 8 files changed, 131 insertions(+), 91 deletions(-) diff --git a/packages/interfaces/src/waku.ts b/packages/interfaces/src/waku.ts index 9608dc58e4..be754bc255 100644 --- a/packages/interfaces/src/waku.ts +++ b/packages/interfaces/src/waku.ts @@ -10,13 +10,7 @@ import type { IFilter } from "./filter.js"; import type { HealthStatus } from "./health_status.js"; import type { Libp2p } from "./libp2p.js"; import type { ILightPush } from "./light_push.js"; -import { - ICodec, - IDecodedMessage, - IDecoder, - IEncoder, - IMessage -} from "./message.js"; +import { ICodec, IDecodedMessage, IDecoder, IEncoder } from "./message.js"; import type { Protocols } from "./protocols.js"; import type { IRelay } from "./relay.js"; import type { ShardId } from "./sharding.js"; @@ -313,7 +307,7 @@ export interface IWaku { * @param {IMessage} message - The message to send * @returns {Promise} A promise that resolves to the request ID */ - send(codec: ICodec, message: IMessage): Promise; + // send(codec: ICodec, message: IMessage): Promise; /** * @returns {boolean} `true` if the node was started and `false` otherwise diff --git a/packages/sdk/src/messaging/ack_manager.ts b/packages/sdk/src/messaging/ack_manager.ts index d9a6211c17..21d856073f 100644 --- a/packages/sdk/src/messaging/ack_manager.ts +++ b/packages/sdk/src/messaging/ack_manager.ts @@ -1,4 +1,12 @@ -import { ICodec, IDecodedMessage, IFilter, IStore } from "@waku/interfaces"; +import { createDecoder } from "@waku/core"; +import { + IDecodedMessage, + IDecoder, + IFilter, + IStore, + NetworkConfig +} from "@waku/interfaces"; +import { createRoutingInfo } from "@waku/utils"; import { MessageStore } from "./message_store.js"; import { IAckManager } from "./utils.js"; @@ -7,15 +15,18 @@ type AckManagerConstructorParams = { messageStore: MessageStore; filter: IFilter; store: IStore; + networkConfig: NetworkConfig; }; export class AckManager implements IAckManager { private readonly messageStore: MessageStore; private readonly filterAckManager: FilterAckManager; private readonly storeAckManager: StoreAckManager; + private readonly networkConfig: NetworkConfig; public constructor(params: AckManagerConstructorParams) { this.messageStore = params.messageStore; + this.networkConfig = params.networkConfig; this.filterAckManager = new FilterAckManager( this.messageStore, @@ -35,16 +46,23 @@ export class AckManager implements IAckManager { this.storeAckManager.stop(); } - public async subscribe(codec: ICodec): Promise { + public async subscribe(contentTopic: string): Promise { + const decoder = createDecoder( + contentTopic, + createRoutingInfo(this.networkConfig, { + contentTopic + }) + ); + return ( - (await this.filterAckManager.subscribe(codec)) || - (await this.storeAckManager.subscribe(codec)) + (await this.filterAckManager.subscribe(decoder)) || + (await this.storeAckManager.subscribe(decoder)) ); } } -class FilterAckManager implements IAckManager { - private codecs: Set> = new Set(); +class FilterAckManager { + private decoders: Set> = new Set(); public constructor( private messageStore: MessageStore, @@ -56,20 +74,20 @@ class FilterAckManager implements IAckManager { } public async stop(): Promise { - const promises = Array.from(this.codecs.entries()).map((codec) => - this.filter.unsubscribe(codec) + const promises = Array.from(this.decoders.entries()).map((decoder) => + this.filter.unsubscribe(decoder) ); await Promise.all(promises); - this.codecs.clear(); + this.decoders.clear(); } - public async subscribe(codec: ICodec): Promise { + public async subscribe(decoder: IDecoder): Promise { const success = await this.filter.subscribe( - codec, + decoder, this.onMessage.bind(this) ); if (success) { - this.codecs.add(codec); + this.decoders.add(decoder); } return success; } @@ -83,10 +101,10 @@ class FilterAckManager implements IAckManager { } } -class StoreAckManager implements IAckManager { +class StoreAckManager { private interval: ReturnType | null = null; - private codecs: Set> = new Set(); + private decoders: Set> = new Set(); public constructor( private messageStore: MessageStore, @@ -112,15 +130,15 @@ class StoreAckManager implements IAckManager { this.interval = null; } - public async subscribe(codec: ICodec): Promise { - this.codecs.add(codec); + public async subscribe(decoder: IDecoder): Promise { + this.decoders.add(decoder); return true; } private async query(): Promise { - for (const codec of this.codecs) { + for (const decoder of this.decoders) { await this.store.queryWithOrderedCallback( - [codec], + [decoder], (message) => { if (!this.messageStore.has(message.hashStr)) { this.messageStore.add(message, { storeAck: true }); diff --git a/packages/sdk/src/messaging/index.ts b/packages/sdk/src/messaging/index.ts index 04928b7037..9dcca8b113 100644 --- a/packages/sdk/src/messaging/index.ts +++ b/packages/sdk/src/messaging/index.ts @@ -1,2 +1,3 @@ export { Messaging } from "./messaging.js"; -export type { RequestId } from "./utils.js"; +// todo: do not export this +export type { RequestId, WakuLikeMessage } from "./utils.js"; diff --git a/packages/sdk/src/messaging/message_store.ts b/packages/sdk/src/messaging/message_store.ts index 2a9159fcaa..5e9da4f2b6 100644 --- a/packages/sdk/src/messaging/message_store.ts +++ b/packages/sdk/src/messaging/message_store.ts @@ -1,10 +1,10 @@ -import { ICodec, IDecodedMessage, IMessage } from "@waku/interfaces"; +import { IDecodedMessage } from "@waku/interfaces"; import { v4 as uuidv4 } from "uuid"; +import { WakuLikeMessage } from "./utils.js"; + type QueuedMessage = { - codec?: ICodec; - messageRequest?: IMessage; - sentMessage?: IMessage; + messageRequest?: WakuLikeMessage; filterAck: boolean; storeAck: boolean; lastSentAt?: number; @@ -65,23 +65,18 @@ export class MessageStore { ): Promise { const entry = this.pendingRequests.get(requestId); - if (!entry || !entry.codec || !entry.messageRequest) { + if (!entry || !entry.messageRequest) { return; } entry.lastSentAt = Number(sentMessage.timestamp); - entry.sentMessage = sentMessage; this.pendingMessages.set(sentMessage.hashStr, requestId); } - public async queue( - codec: ICodec, - message: IMessage - ): Promise { + public async queue(message: WakuLikeMessage): Promise { const requestId = uuidv4(); this.pendingRequests.set(requestId.toString(), { - codec, messageRequest: message, filterAck: false, storeAck: false, @@ -93,19 +88,17 @@ export class MessageStore { public getMessagesToSend(): Array<{ requestId: string; - codec: ICodec; - message: IMessage; + message: WakuLikeMessage; }> { const res: Array<{ requestId: string; - codec: ICodec; - message: IMessage; + message: WakuLikeMessage; }> = []; for (const [requestId, entry] of this.pendingRequests.entries()) { const isAcknowledged = entry.filterAck || entry.storeAck; - if (!entry.codec || !entry.messageRequest || isAcknowledged) { + if (!entry.messageRequest || isAcknowledged) { continue; } @@ -118,7 +111,6 @@ export class MessageStore { if (notSent || notAcknowledged) { res.push({ requestId, - codec: entry.codec, message: entry.messageRequest }); } diff --git a/packages/sdk/src/messaging/messaging.ts b/packages/sdk/src/messaging/messaging.ts index 3568c0d831..05909a764a 100644 --- a/packages/sdk/src/messaging/messaging.ts +++ b/packages/sdk/src/messaging/messaging.ts @@ -1,25 +1,19 @@ -import { - ICodec, - IDecodedMessage, - IFilter, - ILightPush, - IMessage, - IStore -} from "@waku/interfaces"; +import { IFilter, ILightPush, IStore, NetworkConfig } from "@waku/interfaces"; import { AckManager } from "./ack_manager.js"; import { MessageStore } from "./message_store.js"; import { Sender } from "./sender.js"; -import type { RequestId } from "./utils.js"; +import type { RequestId, WakuLikeMessage } from "./utils.js"; interface IMessaging { - send(codec: ICodec, message: IMessage): Promise; + send(wakuLikeMessage: WakuLikeMessage): Promise; } type MessagingConstructorParams = { lightPush: ILightPush; filter: IFilter; store: IStore; + networkConfig: NetworkConfig; }; export class Messaging implements IMessaging { @@ -33,13 +27,15 @@ export class Messaging implements IMessaging { this.ackManager = new AckManager({ messageStore: this.messageStore, filter: params.filter, - store: params.store + store: params.store, + networkConfig: params.networkConfig }); this.sender = new Sender({ messageStore: this.messageStore, lightPush: params.lightPush, - ackManager: this.ackManager + ackManager: this.ackManager, + networkConfig: params.networkConfig }); } @@ -53,10 +49,7 @@ export class Messaging implements IMessaging { this.sender.stop(); } - public send( - codec: ICodec, - message: IMessage - ): Promise { - return this.sender.send(codec, message); + public send(wakuLikeMessage: WakuLikeMessage): Promise { + return this.sender.send(wakuLikeMessage); } } diff --git a/packages/sdk/src/messaging/sender.ts b/packages/sdk/src/messaging/sender.ts index db772e2591..ea7d150658 100644 --- a/packages/sdk/src/messaging/sender.ts +++ b/packages/sdk/src/messaging/sender.ts @@ -1,25 +1,28 @@ +import { createDecoder, createEncoder } from "@waku/core"; import { - ICodec, IDecodedMessage, ILightPush, - IMessage, - IProtoMessage + IProtoMessage, + NetworkConfig } from "@waku/interfaces"; +import { createRoutingInfo } from "@waku/utils"; import { AckManager } from "./ack_manager.js"; import type { MessageStore } from "./message_store.js"; -import type { RequestId } from "./utils.js"; +import type { RequestId, WakuLikeMessage } from "./utils.js"; type SenderConstructorParams = { messageStore: MessageStore; lightPush: ILightPush; ackManager: AckManager; + networkConfig: NetworkConfig; }; export class Sender { private readonly messageStore: MessageStore; private readonly lightPush: ILightPush; private readonly ackManager: AckManager; + private readonly networkConfig: NetworkConfig; private sendInterval: ReturnType | null = null; @@ -27,6 +30,7 @@ export class Sender { this.messageStore = params.messageStore; this.lightPush = params.lightPush; this.ackManager = params.ackManager; + this.networkConfig = params.networkConfig; } public start(): void { @@ -40,20 +44,36 @@ export class Sender { } } - public async send( - codec: ICodec, - message: IMessage - ): Promise { - const requestId = await this.messageStore.queue(codec, message); + public async send(wakuLikeMessage: WakuLikeMessage): Promise { + const requestId = await this.messageStore.queue(wakuLikeMessage); - await this.ackManager.subscribe(codec); + await this.ackManager.subscribe(wakuLikeMessage.contentTopic); - const response = await this.lightPush.send(codec, message); // todo: add to light push return of proto message or decoded message + const encoder = createEncoder({ + contentTopic: wakuLikeMessage.contentTopic, + routingInfo: createRoutingInfo(this.networkConfig, { + contentTopic: wakuLikeMessage.contentTopic + }), + ephemeral: wakuLikeMessage.ephemeral + }); + + const decoder = createDecoder( + wakuLikeMessage.contentTopic, + createRoutingInfo(this.networkConfig, { + contentTopic: wakuLikeMessage.contentTopic + }) + ); + + const response = await this.lightPush.send(encoder, { + payload: wakuLikeMessage.payload + }); // todo: add to light push return of proto message or decoded message if (response.successes.length > 0) { - const protoObj = await codec.toProtoObj(message); - const decodedMessage = await codec.fromProtoObj( - codec.pubsubTopic, + const protoObj = await encoder.toProtoObj({ + payload: wakuLikeMessage.payload + }); + const decodedMessage = await decoder.fromProtoObj( + decoder.pubsubTopic, protoObj as IProtoMessage ); @@ -66,13 +86,32 @@ export class Sender { private async backgroundSend(): Promise { const pendingRequests = this.messageStore.getMessagesToSend(); - for (const { requestId, codec, message } of pendingRequests) { - const response = await this.lightPush.send(codec, message); + for (const { requestId, message } of pendingRequests) { + const encoder = createEncoder({ + contentTopic: message.contentTopic, + routingInfo: createRoutingInfo(this.networkConfig, { + contentTopic: message.contentTopic + }), + ephemeral: message.ephemeral + }); + + const decoder = createDecoder( + message.contentTopic, + createRoutingInfo(this.networkConfig, { + contentTopic: message.contentTopic + }) + ); + + const response = await this.lightPush.send(encoder, { + payload: message.payload + }); if (response.successes.length > 0) { - const protoObj = await codec.toProtoObj(message); - const decodedMessage = await codec.fromProtoObj( - codec.pubsubTopic, + const protoObj = await encoder.toProtoObj({ + payload: message.payload + }); + const decodedMessage = await decoder.fromProtoObj( + decoder.pubsubTopic, protoObj as IProtoMessage ); diff --git a/packages/sdk/src/messaging/utils.ts b/packages/sdk/src/messaging/utils.ts index f76cb7920f..3382be0fe7 100644 --- a/packages/sdk/src/messaging/utils.ts +++ b/packages/sdk/src/messaging/utils.ts @@ -1,9 +1,15 @@ -import { ICodec, IDecodedMessage } from "@waku/interfaces"; - export type RequestId = string; +// todo: make it IMessage type +export type WakuLikeMessage = { + contentTopic: string; + payload: Uint8Array; + ephemeral?: boolean; + rateLimitProof?: boolean; +}; + export interface IAckManager { start(): void; stop(): void; - subscribe(codec: ICodec): Promise; + subscribe(contentTopic: string): Promise; } diff --git a/packages/sdk/src/waku/waku.ts b/packages/sdk/src/waku/waku.ts index 720e196883..9cb2d29770 100644 --- a/packages/sdk/src/waku/waku.ts +++ b/packages/sdk/src/waku/waku.ts @@ -22,7 +22,6 @@ import type { IEncoder, IFilter, ILightPush, - IMessage, IRelay, IRoutingInfo, IStore, @@ -42,7 +41,7 @@ import { Filter } from "../filter/index.js"; import { HealthIndicator } from "../health_indicator/index.js"; import { LightPush } from "../light_push/index.js"; import { Messaging } from "../messaging/index.js"; -import type { RequestId } from "../messaging/index.js"; +import type { RequestId, WakuLikeMessage } from "../messaging/index.js"; import { PeerManager } from "../peer_manager/index.js"; import { Store } from "../store/index.js"; @@ -141,7 +140,8 @@ export class WakuNode implements IWaku { this.messaging = new Messaging({ lightPush: this.lightPush, filter: this.filter, - store: this.store + store: this.store, + networkConfig: this.networkConfig }); } @@ -303,15 +303,12 @@ export class WakuNode implements IWaku { }); } - public send( - codec: ICodec, - message: IMessage - ): Promise { + public send(wakuLikeMessage: WakuLikeMessage): Promise { if (!this.messaging) { throw new Error("Messaging not initialized"); } - return this.messaging.send(codec, message); + return this.messaging.send(wakuLikeMessage); } public createCodec(params: CreateCodecParams): ICodec {