From 7c67abec1eadb384d681297714d80520f09bedc8 Mon Sep 17 00:00:00 2001 From: Sasha Date: Wed, 1 Oct 2025 01:16:15 +0200 Subject: [PATCH] implement and fix queuing mechanics of message store --- package-lock.json | 3 +- packages/sdk/package.json | 3 +- packages/sdk/src/messaging/message_store.ts | 123 +++++++++++++------- packages/sdk/src/messaging/sender.ts | 27 +++-- 4 files changed, 103 insertions(+), 53 deletions(-) diff --git a/package-lock.json b/package-lock.json index fa49a49a1b..ceb583fcd6 100644 --- a/package-lock.json +++ b/package-lock.json @@ -37624,7 +37624,8 @@ "@waku/sds": "^0.0.7", "@waku/utils": "0.0.27", "libp2p": "2.8.11", - "lodash.debounce": "^4.0.8" + "lodash.debounce": "^4.0.8", + "uuid": "^10.0.0" }, "devDependencies": { "@libp2p/interface": "2.10.4", diff --git a/packages/sdk/package.json b/packages/sdk/package.json index a72b4025d7..691d426aee 100644 --- a/packages/sdk/package.json +++ b/packages/sdk/package.json @@ -75,7 +75,8 @@ "@waku/sds": "^0.0.7", "@waku/utils": "0.0.27", "libp2p": "2.8.11", - "lodash.debounce": "^4.0.8" + "lodash.debounce": "^4.0.8", + "uuid": "^10.0.0" }, "devDependencies": { "@libp2p/interface": "2.10.4", diff --git a/packages/sdk/src/messaging/message_store.ts b/packages/sdk/src/messaging/message_store.ts index 540ef38efd..7bde1b153b 100644 --- a/packages/sdk/src/messaging/message_store.ts +++ b/packages/sdk/src/messaging/message_store.ts @@ -1,9 +1,10 @@ -import { messageHashStr } from "@waku/core"; import { ICodec, IDecodedMessage, IMessage } from "@waku/interfaces"; +import { v4 as uuidv4 } from "uuid"; type QueuedMessage = { codec?: ICodec; - message?: IMessage; + messageRequest?: IMessage; + sentMessage?: IMessage; filterAck: boolean; storeAck: boolean; lastSentAt?: number; @@ -20,10 +21,13 @@ type MessageStoreOptions = { }; type RequestId = string; +type MessageHashStr = string; export class MessageStore { - private readonly messages: Map = new Map(); + private readonly messages: Map = new Map(); + private readonly pendingRequests: Map = new Map(); + private readonly pendingMessages: Map = new Map(); private readonly resendIntervalMs: number; @@ -46,54 +50,39 @@ export class MessageStore { } public markFilterAck(hashStr: string): void { - const entry = this.messages.get(hashStr); - if (!entry) return; - entry.filterAck = true; - // TODO: implement events + this.ackMessage(hashStr, { filterAck: true }); + this.replacePendingWithMessage(hashStr); } public markStoreAck(hashStr: string): void { - const entry = this.messages.get(hashStr); - if (!entry) return; - entry.storeAck = true; - // TODO: implement events + this.ackMessage(hashStr, { storeAck: true }); + this.replacePendingWithMessage(hashStr); } - public async markSent(requestId: RequestId): Promise { + public async markSent( + requestId: RequestId, + sentMessage: IDecodedMessage + ): Promise { const entry = this.pendingRequests.get(requestId); - if (!entry || !entry.codec || !entry.message) { + if (!entry || !entry.codec || !entry.messageRequest) { return; } - try { - entry.lastSentAt = Date.now(); - this.pendingRequests.delete(requestId); - - const proto = await entry.codec.toProtoObj(entry.message); - - if (!proto) { - return; - } - - const hashStr = messageHashStr(entry.codec.pubsubTopic, proto); - - this.messages.set(hashStr, entry); - } catch (error) { - // TODO: better recovery - this.pendingRequests.set(requestId, entry); - } + entry.lastSentAt = Number(sentMessage.timestamp); + entry.sentMessage = sentMessage; + this.pendingMessages.set(sentMessage.hashStr, requestId); } public async queue( codec: ICodec, message: IMessage ): Promise { - const requestId = crypto.randomUUID(); + const requestId = uuidv4(); - this.pendingRequests.set(requestId, { + this.pendingRequests.set(requestId.toString(), { codec, - message, + messageRequest: message, filterAck: false, storeAck: false, createdAt: Date.now() @@ -107,8 +96,6 @@ export class MessageStore { codec: ICodec; message: IMessage; }> { - const now = Date.now(); - const res: Array<{ requestId: string; codec: ICodec; @@ -118,18 +105,72 @@ export class MessageStore { for (const [requestId, entry] of this.pendingRequests.entries()) { const isAcknowledged = entry.filterAck || entry.storeAck; - if (!entry.codec || !entry.message || isAcknowledged) { + if (!entry.codec || !entry.messageRequest || isAcknowledged) { continue; } - if ( - !entry.lastSentAt || - now - entry.lastSentAt >= this.resendIntervalMs - ) { - res.push({ requestId, codec: entry.codec, message: entry.message }); + const notSent = !entry.lastSentAt; + const notAcknowledged = + entry.lastSentAt && + Date.now() - entry.lastSentAt >= this.resendIntervalMs && + !isAcknowledged; + + if (notSent || notAcknowledged) { + res.push({ + requestId, + codec: entry.codec, + message: entry.messageRequest + }); } } return res; } + + private ackMessage( + hashStr: MessageHashStr, + ackParams: AddMessageOptions = {} + ): void { + let entry = this.messages.get(hashStr); + + if (entry) { + entry.filterAck = true; + entry.storeAck = true; + return; + } + + const requestId = this.pendingMessages.get(hashStr); + + if (!requestId) { + return; + } + + entry = this.pendingRequests.get(requestId); + + if (!entry) { + return; + } + + entry.filterAck = ackParams.filterAck ?? entry.filterAck; + entry.storeAck = ackParams.storeAck ?? entry.storeAck; + } + + private replacePendingWithMessage(hashStr: MessageHashStr): void { + const requestId = this.pendingMessages.get(hashStr); + + if (!requestId) { + return; + } + + const entry = this.pendingRequests.get(requestId); + + if (!entry) { + return; + } + + this.pendingRequests.delete(requestId); + this.pendingMessages.delete(hashStr); + + this.messages.set(hashStr, entry); + } } diff --git a/packages/sdk/src/messaging/sender.ts b/packages/sdk/src/messaging/sender.ts index 46d15f486d..0206d22d80 100644 --- a/packages/sdk/src/messaging/sender.ts +++ b/packages/sdk/src/messaging/sender.ts @@ -15,13 +15,13 @@ type SenderConstructorParams = { export class Sender { private readonly messageStore: MessageStore; - private readonly lightPush: ILightPush; + // private readonly lightPush: ILightPush; private sendInterval: ReturnType | null = null; public constructor(params: SenderConstructorParams) { this.messageStore = params.messageStore; - this.lightPush = params.lightPush; + // this.lightPush = params.lightPush; } public start(): void { @@ -40,11 +40,14 @@ export class Sender { message: IMessage ): Promise { const requestId = await this.messageStore.queue(codec, message); - const response = await this.lightPush.send(codec, message); + // const response = await this.lightPush.send(codec, message); - if (response.successes.length > 0) { - await this.messageStore.markSent(requestId); - } + // if (response.successes.length > 0) { + await this.messageStore.markSent( + requestId, + (await codec.toProtoObj(message)) as IDecodedMessage + ); + // } return requestId; } @@ -53,11 +56,15 @@ export class Sender { const pendingRequests = this.messageStore.getMessagesToSend(); for (const { requestId, codec, message } of pendingRequests) { - const response = await this.lightPush.send(codec, message); + // const response = await this.lightPush.send(codec, message); - if (response.successes.length > 0) { - await this.messageStore.markSent(requestId); - } + // if (response.successes.length > 0) { + const sentMessage = await codec.toProtoObj(message); + await this.messageStore.markSent( + requestId, + sentMessage as IDecodedMessage + ); + // } } } }