implement acks and message hash saving for sent messages

This commit is contained in:
Sasha 2025-10-02 18:53:21 +02:00
parent 7c67abec1e
commit 43851045cb
No known key found for this signature in database
3 changed files with 38 additions and 21 deletions

View File

@ -134,8 +134,8 @@ export class MessageStore {
let entry = this.messages.get(hashStr);
if (entry) {
entry.filterAck = true;
entry.storeAck = true;
entry.filterAck = ackParams.filterAck ?? entry.filterAck;
entry.storeAck = ackParams.storeAck ?? entry.storeAck;
return;
}

View File

@ -38,7 +38,8 @@ export class Messaging implements IMessaging {
this.sender = new Sender({
messageStore: this.messageStore,
lightPush: params.lightPush
lightPush: params.lightPush,
ackManager: this.ackManager
});
}

View File

@ -2,26 +2,31 @@ import {
ICodec,
IDecodedMessage,
ILightPush,
IMessage
IMessage,
IProtoMessage
} from "@waku/interfaces";
import { AckManager } from "./ack_manager.js";
import type { MessageStore } from "./message_store.js";
import type { RequestId } from "./utils.js";
type SenderConstructorParams = {
messageStore: MessageStore;
lightPush: ILightPush;
ackManager: AckManager;
};
export class Sender {
private readonly messageStore: MessageStore;
// private readonly lightPush: ILightPush;
private readonly lightPush: ILightPush;
private readonly ackManager: AckManager;
private sendInterval: ReturnType<typeof setInterval> | null = null;
public constructor(params: SenderConstructorParams) {
this.messageStore = params.messageStore;
// this.lightPush = params.lightPush;
this.lightPush = params.lightPush;
this.ackManager = params.ackManager;
}
public start(): void {
@ -40,14 +45,20 @@ export class Sender {
message: IMessage
): Promise<RequestId> {
const requestId = await this.messageStore.queue(codec, message);
// const response = await this.lightPush.send(codec, message);
// if (response.successes.length > 0) {
await this.messageStore.markSent(
requestId,
(await codec.toProtoObj(message)) as IDecodedMessage
);
// }
await this.ackManager.subscribe(codec);
const response = await this.lightPush.send(codec, message); // todo: add to light push return of proto message or decoded message
if (response.successes.length > 0) {
const protoObj = await codec.toProtoObj(message);
const decodedMessage = await codec.fromProtoObj(
codec.pubsubTopic,
protoObj as IProtoMessage
);
await this.messageStore.markSent(requestId, decodedMessage!);
}
return requestId;
}
@ -56,15 +67,20 @@ export class Sender {
const pendingRequests = this.messageStore.getMessagesToSend();
for (const { requestId, codec, message } of pendingRequests) {
// const response = await this.lightPush.send(codec, message);
const response = await this.lightPush.send(codec, message);
// if (response.successes.length > 0) {
const sentMessage = await codec.toProtoObj(message);
await this.messageStore.markSent(
requestId,
sentMessage as IDecodedMessage
);
// }
if (response.successes.length > 0) {
const protoObj = await codec.toProtoObj(message);
const decodedMessage = await codec.fromProtoObj(
codec.pubsubTopic,
protoObj as IProtoMessage
);
await this.messageStore.markSent(
requestId,
decodedMessage as IDecodedMessage
);
}
}
}
}