diff --git a/packages/sdk/src/messaging/message_store.ts b/packages/sdk/src/messaging/message_store.ts index 7bde1b153b..2a9159fcaa 100644 --- a/packages/sdk/src/messaging/message_store.ts +++ b/packages/sdk/src/messaging/message_store.ts @@ -134,8 +134,8 @@ export class MessageStore { let entry = this.messages.get(hashStr); if (entry) { - entry.filterAck = true; - entry.storeAck = true; + entry.filterAck = ackParams.filterAck ?? entry.filterAck; + entry.storeAck = ackParams.storeAck ?? entry.storeAck; return; } diff --git a/packages/sdk/src/messaging/messaging.ts b/packages/sdk/src/messaging/messaging.ts index a1fbf91e58..3568c0d831 100644 --- a/packages/sdk/src/messaging/messaging.ts +++ b/packages/sdk/src/messaging/messaging.ts @@ -38,7 +38,8 @@ export class Messaging implements IMessaging { this.sender = new Sender({ messageStore: this.messageStore, - lightPush: params.lightPush + lightPush: params.lightPush, + ackManager: this.ackManager }); } diff --git a/packages/sdk/src/messaging/sender.ts b/packages/sdk/src/messaging/sender.ts index 0206d22d80..db772e2591 100644 --- a/packages/sdk/src/messaging/sender.ts +++ b/packages/sdk/src/messaging/sender.ts @@ -2,26 +2,31 @@ import { ICodec, IDecodedMessage, ILightPush, - IMessage + IMessage, + IProtoMessage } from "@waku/interfaces"; +import { AckManager } from "./ack_manager.js"; import type { MessageStore } from "./message_store.js"; import type { RequestId } from "./utils.js"; type SenderConstructorParams = { messageStore: MessageStore; lightPush: ILightPush; + ackManager: AckManager; }; export class Sender { private readonly messageStore: MessageStore; - // private readonly lightPush: ILightPush; + private readonly lightPush: ILightPush; + private readonly ackManager: AckManager; private sendInterval: ReturnType | null = null; public constructor(params: SenderConstructorParams) { this.messageStore = params.messageStore; - // this.lightPush = params.lightPush; + this.lightPush = params.lightPush; + this.ackManager = params.ackManager; } public start(): void { @@ -40,14 +45,20 @@ export class Sender { message: IMessage ): Promise { const requestId = await this.messageStore.queue(codec, message); - // const response = await this.lightPush.send(codec, message); - // if (response.successes.length > 0) { - await this.messageStore.markSent( - requestId, - (await codec.toProtoObj(message)) as IDecodedMessage - ); - // } + await this.ackManager.subscribe(codec); + + const response = await this.lightPush.send(codec, message); // todo: add to light push return of proto message or decoded message + + if (response.successes.length > 0) { + const protoObj = await codec.toProtoObj(message); + const decodedMessage = await codec.fromProtoObj( + codec.pubsubTopic, + protoObj as IProtoMessage + ); + + await this.messageStore.markSent(requestId, decodedMessage!); + } return requestId; } @@ -56,15 +67,20 @@ export class Sender { const pendingRequests = this.messageStore.getMessagesToSend(); for (const { requestId, codec, message } of pendingRequests) { - // const response = await this.lightPush.send(codec, message); + const response = await this.lightPush.send(codec, message); - // if (response.successes.length > 0) { - const sentMessage = await codec.toProtoObj(message); - await this.messageStore.markSent( - requestId, - sentMessage as IDecodedMessage - ); - // } + if (response.successes.length > 0) { + const protoObj = await codec.toProtoObj(message); + const decodedMessage = await codec.fromProtoObj( + codec.pubsubTopic, + protoObj as IProtoMessage + ); + + await this.messageStore.markSent( + requestId, + decodedMessage as IDecodedMessage + ); + } } } }