From ad675d81749bbbe405995c0795147023066a3e0f Mon Sep 17 00:00:00 2001 From: Sasha Date: Tue, 30 Sep 2025 00:12:43 +0200 Subject: [PATCH] implement send on waku --- packages/interfaces/src/waku.ts | 17 ++++++++- packages/sdk/src/messaging/ack_manager.ts | 4 +- packages/sdk/src/messaging/message_store.ts | 42 +++++++++++---------- packages/sdk/src/messaging/messaging.ts | 14 +++++-- packages/sdk/src/messaging/sender.ts | 22 +++++++---- packages/sdk/src/waku/waku.ts | 7 +++- packages/utils/src/common/mock_node.ts | 6 +++ 7 files changed, 76 insertions(+), 36 deletions(-) diff --git a/packages/interfaces/src/waku.ts b/packages/interfaces/src/waku.ts index a0c83ac746..9608dc58e4 100644 --- a/packages/interfaces/src/waku.ts +++ b/packages/interfaces/src/waku.ts @@ -10,7 +10,13 @@ import type { IFilter } from "./filter.js"; import type { HealthStatus } from "./health_status.js"; import type { Libp2p } from "./libp2p.js"; import type { ILightPush } from "./light_push.js"; -import { ICodec, IDecodedMessage, IDecoder, IEncoder } from "./message.js"; +import { + ICodec, + IDecodedMessage, + IDecoder, + IEncoder, + IMessage +} from "./message.js"; import type { Protocols } from "./protocols.js"; import type { IRelay } from "./relay.js"; import type { ShardId } from "./sharding.js"; @@ -300,6 +306,15 @@ export interface IWaku { */ createCodec(params: CreateCodecParams): ICodec; + /** + * Sends a message to the Waku network. + * + * @param {ICodec} codec - The codec to use for encoding the message + * @param {IMessage} message - The message to send + * @returns {Promise} A promise that resolves to the request ID + */ + send(codec: ICodec, message: IMessage): Promise; + /** * @returns {boolean} `true` if the node was started and `false` otherwise */ diff --git a/packages/sdk/src/messaging/ack_manager.ts b/packages/sdk/src/messaging/ack_manager.ts index c7874c31b1..d9a6211c17 100644 --- a/packages/sdk/src/messaging/ack_manager.ts +++ b/packages/sdk/src/messaging/ack_manager.ts @@ -76,7 +76,7 @@ class FilterAckManager implements IAckManager { private async onMessage(message: IDecodedMessage): Promise { if (!this.messageStore.has(message.hashStr)) { - this.messageStore.add(message); + this.messageStore.add(message, { filterAck: true }); } this.messageStore.markFilterAck(message.hashStr); @@ -123,7 +123,7 @@ class StoreAckManager implements IAckManager { [codec], (message) => { if (!this.messageStore.has(message.hashStr)) { - this.messageStore.add(message); + this.messageStore.add(message, { storeAck: true }); } this.messageStore.markStoreAck(message.hashStr); diff --git a/packages/sdk/src/messaging/message_store.ts b/packages/sdk/src/messaging/message_store.ts index a1babfde75..540ef38efd 100644 --- a/packages/sdk/src/messaging/message_store.ts +++ b/packages/sdk/src/messaging/message_store.ts @@ -1,8 +1,8 @@ import { messageHashStr } from "@waku/core"; -import { IDecodedMessage, IEncoder, IMessage } from "@waku/interfaces"; +import { ICodec, IDecodedMessage, IMessage } from "@waku/interfaces"; type QueuedMessage = { - encoder?: IEncoder; + codec?: ICodec; message?: IMessage; filterAck: boolean; storeAck: boolean; @@ -10,6 +10,11 @@ type QueuedMessage = { createdAt: number; }; +type AddMessageOptions = { + filterAck?: boolean; + storeAck?: boolean; +}; + type MessageStoreOptions = { resendIntervalMs?: number; }; @@ -30,11 +35,11 @@ export class MessageStore { return this.messages.has(hashStr); } - public add(message: IDecodedMessage): void { + public add(message: IDecodedMessage, options: AddMessageOptions = {}): void { if (!this.messages.has(message.hashStr)) { this.messages.set(message.hashStr, { - filterAck: false, - storeAck: false, + filterAck: options.filterAck ?? false, + storeAck: options.storeAck ?? false, createdAt: Date.now() }); } @@ -57,7 +62,7 @@ export class MessageStore { public async markSent(requestId: RequestId): Promise { const entry = this.pendingRequests.get(requestId); - if (!entry || !entry.encoder || !entry.message) { + if (!entry || !entry.codec || !entry.message) { return; } @@ -65,13 +70,13 @@ export class MessageStore { entry.lastSentAt = Date.now(); this.pendingRequests.delete(requestId); - const proto = await entry.encoder.toProtoObj(entry.message); + const proto = await entry.codec.toProtoObj(entry.message); if (!proto) { return; } - const hashStr = messageHashStr(entry.encoder.pubsubTopic, proto); + const hashStr = messageHashStr(entry.codec.pubsubTopic, proto); this.messages.set(hashStr, entry); } catch (error) { @@ -80,11 +85,14 @@ export class MessageStore { } } - public async queue(encoder: IEncoder, message: IMessage): Promise { + public async queue( + codec: ICodec, + message: IMessage + ): Promise { const requestId = crypto.randomUUID(); this.pendingRequests.set(requestId, { - encoder, + codec, message, filterAck: false, storeAck: false, @@ -96,25 +104,21 @@ export class MessageStore { public getMessagesToSend(): Array<{ requestId: string; - encoder: IEncoder; + codec: ICodec; message: IMessage; }> { const now = Date.now(); const res: Array<{ requestId: string; - encoder: IEncoder; + codec: ICodec; message: IMessage; }> = []; for (const [requestId, entry] of this.pendingRequests.entries()) { - if (!entry.encoder || !entry.message) { - continue; - } + const isAcknowledged = entry.filterAck || entry.storeAck; - const isAcknowledged = entry.filterAck || entry.storeAck; // TODO: make sure it works with message and pending requests and returns messages to re-sent that are not ack yet - - if (isAcknowledged) { + if (!entry.codec || !entry.message || isAcknowledged) { continue; } @@ -122,7 +126,7 @@ export class MessageStore { !entry.lastSentAt || now - entry.lastSentAt >= this.resendIntervalMs ) { - res.push({ requestId, encoder: entry.encoder, message: entry.message }); + res.push({ requestId, codec: entry.codec, message: entry.message }); } } diff --git a/packages/sdk/src/messaging/messaging.ts b/packages/sdk/src/messaging/messaging.ts index e843bd122a..a1fbf91e58 100644 --- a/packages/sdk/src/messaging/messaging.ts +++ b/packages/sdk/src/messaging/messaging.ts @@ -1,5 +1,6 @@ import { - IEncoder, + ICodec, + IDecodedMessage, IFilter, ILightPush, IMessage, @@ -12,7 +13,7 @@ import { Sender } from "./sender.js"; import type { RequestId } from "./utils.js"; interface IMessaging { - send(encoder: IEncoder, message: IMessage): Promise; + send(codec: ICodec, message: IMessage): Promise; } type MessagingConstructorParams = { @@ -43,13 +44,18 @@ export class Messaging implements IMessaging { public start(): void { this.ackManager.start(); + this.sender.start(); } public async stop(): Promise { await this.ackManager.stop(); + this.sender.stop(); } - public send(encoder: IEncoder, message: IMessage): Promise { - return this.sender.send(encoder, message); + public send( + codec: ICodec, + message: IMessage + ): Promise { + return this.sender.send(codec, message); } } diff --git a/packages/sdk/src/messaging/sender.ts b/packages/sdk/src/messaging/sender.ts index a7a49aa7d6..46d15f486d 100644 --- a/packages/sdk/src/messaging/sender.ts +++ b/packages/sdk/src/messaging/sender.ts @@ -1,4 +1,9 @@ -import { IEncoder, ILightPush, IMessage } from "@waku/interfaces"; +import { + ICodec, + IDecodedMessage, + ILightPush, + IMessage +} from "@waku/interfaces"; import type { MessageStore } from "./message_store.js"; import type { RequestId } from "./utils.js"; @@ -30,9 +35,12 @@ export class Sender { } } - public async send(encoder: IEncoder, message: IMessage): Promise { - const requestId = await this.messageStore.queue(encoder, message); - const response = await this.lightPush.send(encoder, message); + public async send( + codec: ICodec, + message: IMessage + ): Promise { + const requestId = await this.messageStore.queue(codec, message); + const response = await this.lightPush.send(codec, message); if (response.successes.length > 0) { await this.messageStore.markSent(requestId); @@ -44,10 +52,8 @@ export class Sender { private async backgroundSend(): Promise { const pendingRequests = this.messageStore.getMessagesToSend(); - // todo: implement chunking, error handling, retry, etc. - // todo: implement backoff and batching potentially - for (const { requestId, encoder, message } of pendingRequests) { - const response = await this.lightPush.send(encoder, message); + for (const { requestId, codec, message } of pendingRequests) { + const response = await this.lightPush.send(codec, message); if (response.successes.length > 0) { await this.messageStore.markSent(requestId); diff --git a/packages/sdk/src/waku/waku.ts b/packages/sdk/src/waku/waku.ts index d558a13369..720e196883 100644 --- a/packages/sdk/src/waku/waku.ts +++ b/packages/sdk/src/waku/waku.ts @@ -303,12 +303,15 @@ export class WakuNode implements IWaku { }); } - public send(encoder: IEncoder, message: IMessage): Promise { + public send( + codec: ICodec, + message: IMessage + ): Promise { if (!this.messaging) { throw new Error("Messaging not initialized"); } - return this.messaging.send(encoder, message); + return this.messaging.send(codec, message); } public createCodec(params: CreateCodecParams): ICodec { diff --git a/packages/utils/src/common/mock_node.ts b/packages/utils/src/common/mock_node.ts index 531709e0dd..d33d83e88e 100644 --- a/packages/utils/src/common/mock_node.ts +++ b/packages/utils/src/common/mock_node.ts @@ -159,6 +159,12 @@ export class MockWakuNode implements IWaku { public createCodec(_params: CreateCodecParams): ICodec { throw new Error("Method not implemented."); } + public send( + _codec: ICodec, + _message: IMessage + ): Promise { + throw new Error("Method not implemented."); + } public isStarted(): boolean { throw new Error("Method not implemented."); }