implement and fix queuing mechanics of message store

This commit is contained in:
Sasha 2025-10-01 01:16:15 +02:00
parent ad675d8174
commit 7c67abec1e
No known key found for this signature in database
4 changed files with 103 additions and 53 deletions

3
package-lock.json generated
View File

@ -37624,7 +37624,8 @@
"@waku/sds": "^0.0.7",
"@waku/utils": "0.0.27",
"libp2p": "2.8.11",
"lodash.debounce": "^4.0.8"
"lodash.debounce": "^4.0.8",
"uuid": "^10.0.0"
},
"devDependencies": {
"@libp2p/interface": "2.10.4",

View File

@ -75,7 +75,8 @@
"@waku/sds": "^0.0.7",
"@waku/utils": "0.0.27",
"libp2p": "2.8.11",
"lodash.debounce": "^4.0.8"
"lodash.debounce": "^4.0.8",
"uuid": "^10.0.0"
},
"devDependencies": {
"@libp2p/interface": "2.10.4",

View File

@ -1,9 +1,10 @@
import { messageHashStr } from "@waku/core";
import { ICodec, IDecodedMessage, IMessage } from "@waku/interfaces";
import { v4 as uuidv4 } from "uuid";
type QueuedMessage = {
codec?: ICodec<IDecodedMessage>;
message?: IMessage;
messageRequest?: IMessage;
sentMessage?: IMessage;
filterAck: boolean;
storeAck: boolean;
lastSentAt?: number;
@ -20,10 +21,13 @@ type MessageStoreOptions = {
};
type RequestId = string;
type MessageHashStr = string;
export class MessageStore {
private readonly messages: Map<string, QueuedMessage> = new Map();
private readonly messages: Map<MessageHashStr, QueuedMessage> = new Map();
private readonly pendingRequests: Map<RequestId, QueuedMessage> = new Map();
private readonly pendingMessages: Map<MessageHashStr, RequestId> = new Map();
private readonly resendIntervalMs: number;
@ -46,54 +50,39 @@ export class MessageStore {
}
public markFilterAck(hashStr: string): void {
const entry = this.messages.get(hashStr);
if (!entry) return;
entry.filterAck = true;
// TODO: implement events
this.ackMessage(hashStr, { filterAck: true });
this.replacePendingWithMessage(hashStr);
}
public markStoreAck(hashStr: string): void {
const entry = this.messages.get(hashStr);
if (!entry) return;
entry.storeAck = true;
// TODO: implement events
this.ackMessage(hashStr, { storeAck: true });
this.replacePendingWithMessage(hashStr);
}
public async markSent(requestId: RequestId): Promise<void> {
public async markSent(
requestId: RequestId,
sentMessage: IDecodedMessage
): Promise<void> {
const entry = this.pendingRequests.get(requestId);
if (!entry || !entry.codec || !entry.message) {
if (!entry || !entry.codec || !entry.messageRequest) {
return;
}
try {
entry.lastSentAt = Date.now();
this.pendingRequests.delete(requestId);
const proto = await entry.codec.toProtoObj(entry.message);
if (!proto) {
return;
}
const hashStr = messageHashStr(entry.codec.pubsubTopic, proto);
this.messages.set(hashStr, entry);
} catch (error) {
// TODO: better recovery
this.pendingRequests.set(requestId, entry);
}
entry.lastSentAt = Number(sentMessage.timestamp);
entry.sentMessage = sentMessage;
this.pendingMessages.set(sentMessage.hashStr, requestId);
}
public async queue(
codec: ICodec<IDecodedMessage>,
message: IMessage
): Promise<RequestId> {
const requestId = crypto.randomUUID();
const requestId = uuidv4();
this.pendingRequests.set(requestId, {
this.pendingRequests.set(requestId.toString(), {
codec,
message,
messageRequest: message,
filterAck: false,
storeAck: false,
createdAt: Date.now()
@ -107,8 +96,6 @@ export class MessageStore {
codec: ICodec<IDecodedMessage>;
message: IMessage;
}> {
const now = Date.now();
const res: Array<{
requestId: string;
codec: ICodec<IDecodedMessage>;
@ -118,18 +105,72 @@ export class MessageStore {
for (const [requestId, entry] of this.pendingRequests.entries()) {
const isAcknowledged = entry.filterAck || entry.storeAck;
if (!entry.codec || !entry.message || isAcknowledged) {
if (!entry.codec || !entry.messageRequest || isAcknowledged) {
continue;
}
if (
!entry.lastSentAt ||
now - entry.lastSentAt >= this.resendIntervalMs
) {
res.push({ requestId, codec: entry.codec, message: entry.message });
const notSent = !entry.lastSentAt;
const notAcknowledged =
entry.lastSentAt &&
Date.now() - entry.lastSentAt >= this.resendIntervalMs &&
!isAcknowledged;
if (notSent || notAcknowledged) {
res.push({
requestId,
codec: entry.codec,
message: entry.messageRequest
});
}
}
return res;
}
private ackMessage(
hashStr: MessageHashStr,
ackParams: AddMessageOptions = {}
): void {
let entry = this.messages.get(hashStr);
if (entry) {
entry.filterAck = true;
entry.storeAck = true;
return;
}
const requestId = this.pendingMessages.get(hashStr);
if (!requestId) {
return;
}
entry = this.pendingRequests.get(requestId);
if (!entry) {
return;
}
entry.filterAck = ackParams.filterAck ?? entry.filterAck;
entry.storeAck = ackParams.storeAck ?? entry.storeAck;
}
private replacePendingWithMessage(hashStr: MessageHashStr): void {
const requestId = this.pendingMessages.get(hashStr);
if (!requestId) {
return;
}
const entry = this.pendingRequests.get(requestId);
if (!entry) {
return;
}
this.pendingRequests.delete(requestId);
this.pendingMessages.delete(hashStr);
this.messages.set(hashStr, entry);
}
}

View File

@ -15,13 +15,13 @@ type SenderConstructorParams = {
export class Sender {
private readonly messageStore: MessageStore;
private readonly lightPush: ILightPush;
// private readonly lightPush: ILightPush;
private sendInterval: ReturnType<typeof setInterval> | null = null;
public constructor(params: SenderConstructorParams) {
this.messageStore = params.messageStore;
this.lightPush = params.lightPush;
// this.lightPush = params.lightPush;
}
public start(): void {
@ -40,11 +40,14 @@ export class Sender {
message: IMessage
): Promise<RequestId> {
const requestId = await this.messageStore.queue(codec, message);
const response = await this.lightPush.send(codec, message);
// const response = await this.lightPush.send(codec, message);
if (response.successes.length > 0) {
await this.messageStore.markSent(requestId);
}
// if (response.successes.length > 0) {
await this.messageStore.markSent(
requestId,
(await codec.toProtoObj(message)) as IDecodedMessage
);
// }
return requestId;
}
@ -53,11 +56,15 @@ export class Sender {
const pendingRequests = this.messageStore.getMessagesToSend();
for (const { requestId, codec, message } of pendingRequests) {
const response = await this.lightPush.send(codec, message);
// const response = await this.lightPush.send(codec, message);
if (response.successes.length > 0) {
await this.messageStore.markSent(requestId);
}
// if (response.successes.length > 0) {
const sentMessage = await codec.toProtoObj(message);
await this.messageStore.markSent(
requestId,
sentMessage as IDecodedMessage
);
// }
}
}
}