diff --git a/packages/sdk/src/messaging/ack_manager.ts b/packages/sdk/src/messaging/ack_manager.ts new file mode 100644 index 0000000000..5f41d1a771 --- /dev/null +++ b/packages/sdk/src/messaging/ack_manager.ts @@ -0,0 +1,138 @@ +import { IDecodedMessage, IFilter, IStore } from "@waku/interfaces"; + +import { MessageStore } from "./message_store.js"; +import { IAckManager, ICodec } from "./utils.js"; + +type AckManagerConstructorParams = { + messageStore: MessageStore; + filter: IFilter; + store: IStore; +}; + +export class AckManager implements IAckManager { + private readonly messageStore: MessageStore; + private readonly filterAckManager: FilterAckManager; + private readonly storeAckManager: StoreAckManager; + + public constructor(params: AckManagerConstructorParams) { + this.messageStore = params.messageStore; + + this.filterAckManager = new FilterAckManager( + this.messageStore, + params.filter + ); + + this.storeAckManager = new StoreAckManager(this.messageStore, params.store); + } + + public start(): void { + this.filterAckManager.start(); + this.storeAckManager.start(); + } + + public async stop(): Promise { + await this.filterAckManager.stop(); + this.storeAckManager.stop(); + } + + public async subscribe(codec: ICodec): Promise { + return ( + (await this.filterAckManager.subscribe(codec)) || + (await this.storeAckManager.subscribe(codec)) + ); + } +} + +class FilterAckManager implements IAckManager { + private codecs: Set = new Set(); + + public constructor( + private messageStore: MessageStore, + private filter: IFilter + ) {} + + public start(): void { + return; + } + + public async stop(): Promise { + const promises = Array.from(this.codecs.entries()).map((codec) => + this.filter.unsubscribe(codec) + ); + await Promise.all(promises); + this.codecs.clear(); + } + + public async subscribe(codec: ICodec): Promise { + const success = await this.filter.subscribe( + codec, + this.onMessage.bind(this) + ); + if (success) { + this.codecs.add(codec); + } + return success; + } + + private async onMessage(message: IDecodedMessage): Promise { + if (!this.messageStore.has(message.hashStr)) { + this.messageStore.add(message); + } + + this.messageStore.markFilterAck(message.hashStr); + } +} + +class StoreAckManager implements IAckManager { + private interval: ReturnType | null = null; + + private codecs: Set = new Set(); + + public constructor( + private messageStore: MessageStore, + private store: IStore + ) {} + + public start(): void { + if (this.interval) { + return; + } + + this.interval = setInterval(() => { + void this.query(); + }, 1000); + } + + public stop(): void { + if (!this.interval) { + return; + } + + clearInterval(this.interval); + this.interval = null; + } + + public async subscribe(codec: ICodec): Promise { + this.codecs.add(codec); + return true; + } + + private async query(): Promise { + for (const codec of this.codecs) { + await this.store.queryWithOrderedCallback( + [codec], + (message) => { + if (!this.messageStore.has(message.hashStr)) { + this.messageStore.add(message); + } + + this.messageStore.markStoreAck(message.hashStr); + }, + { + timeStart: new Date(Date.now() - 60 * 60 * 1000), + timeEnd: new Date() + } + ); + } + } +} diff --git a/packages/sdk/src/messaging/fitler_ack.ts b/packages/sdk/src/messaging/fitler_ack.ts deleted file mode 100644 index b5ed7562c3..0000000000 --- a/packages/sdk/src/messaging/fitler_ack.ts +++ /dev/null @@ -1,44 +0,0 @@ -import { IDecodedMessage, IFilter } from "@waku/interfaces"; - -import { MessageStore } from "./message_store.js"; -import { IAckManager, ICodec } from "./utils.js"; - -export class FilterAckManager implements IAckManager { - private codecs: Set = new Set(); - - public constructor( - private messageStore: MessageStore, - private filter: IFilter - ) {} - - public start(): void { - return; - } - - public async stop(): Promise { - const promises = Array.from(this.codecs.entries()).map((codec) => - this.filter.unsubscribe(codec) - ); - await Promise.all(promises); - this.codecs.clear(); - } - - public async subscribe(codec: ICodec): Promise { - const success = await this.filter.subscribe( - codec, - this.onMessage.bind(this) - ); - if (success) { - this.codecs.add(codec); - } - return success; - } - - private async onMessage(message: IDecodedMessage): Promise { - if (!this.messageStore.has(message.hashStr)) { - this.messageStore.add(message); - } - - this.messageStore.markFilterAck(message.hashStr); - } -} diff --git a/packages/sdk/src/messaging/message_store.ts b/packages/sdk/src/messaging/message_store.ts index 910f5cee60..caed532bf7 100644 --- a/packages/sdk/src/messaging/message_store.ts +++ b/packages/sdk/src/messaging/message_store.ts @@ -1,4 +1,4 @@ -import { messageHashStr } from "@waku/core"; +import { message, messageHashStr } from "@waku/core"; import { IDecodedMessage, IEncoder, IMessage } from "@waku/interfaces"; type QueuedMessage = { @@ -14,8 +14,12 @@ type MessageStoreOptions = { resendIntervalMs?: number; }; +type RequestId = string; + export class MessageStore { private readonly messages: Map = new Map(); + private readonly pendingRequests: Map = new Map(); + private readonly resendIntervalMs: number; public constructor(options: MessageStoreOptions = {}) { @@ -40,62 +44,91 @@ export class MessageStore { const entry = this.messages.get(hashStr); if (!entry) return; entry.filterAck = true; + // TODO: implement events } public markStoreAck(hashStr: string): void { const entry = this.messages.get(hashStr); if (!entry) return; entry.storeAck = true; + // TODO: implement events } - public markSent(hashStr: string): void { - const entry = this.messages.get(hashStr); - if (!entry) return; - entry.lastSentAt = Date.now(); + public async markSent(requestId: RequestId): Promise { + const entry = this.pendingRequests.get(requestId); + + if (!entry || !entry.encoder || !entry.message) { + return; + } + + try { + entry.lastSentAt = Date.now(); + this.pendingRequests.delete(requestId); + + const proto = await entry.encoder.toProtoObj(entry.message); + + if (!proto) { + return; + } + + const hashStr = messageHashStr(entry.encoder.pubsubTopic, proto); + + this.messages.set(hashStr, entry); + } catch (error) { + // TODO: better recovery + this.pendingRequests.set(requestId, entry); + } } public async queue( encoder: IEncoder, message: IMessage - ): Promise { - const proto = await encoder.toProtoObj(message); - if (!proto) return undefined; - const hashStr = messageHashStr(encoder.pubsubTopic, proto); - const existing = this.messages.get(hashStr); - if (!existing) { - this.messages.set(hashStr, { - encoder, - message, - filterAck: false, - storeAck: false, - createdAt: Date.now() - }); - } - return hashStr; + ): Promise { + const requestId = crypto.randomUUID(); + + this.pendingRequests.set(requestId, { + encoder, + message, + filterAck: false, + storeAck: false, + createdAt: Date.now() + }); + + return requestId; } public getMessagesToSend(): Array<{ - hashStr: string; + requestId: string; encoder: IEncoder; message: IMessage; }> { const now = Date.now(); + const res: Array<{ - hashStr: string; + requestId: string; encoder: IEncoder; message: IMessage; }> = []; - for (const [hashStr, entry] of this.messages.entries()) { - if (!entry.encoder || !entry.message) continue; - const isAcknowledged = entry.filterAck || entry.storeAck; - if (isAcknowledged) continue; + + for (const [requestId, entry] of this.pendingRequests.entries()) { + if (!entry.encoder || !entry.message) { + continue; + } + + const isAcknowledged = entry.filterAck || entry.storeAck; // TODO: make sure it works with message and pending requests and returns messages to re-sent that are not ack yet + + if (isAcknowledged) { + continue; + } + if ( !entry.lastSentAt || now - entry.lastSentAt >= this.resendIntervalMs ) { - res.push({ hashStr, encoder: entry.encoder, message: entry.message }); + res.push({ requestId, encoder: entry.encoder, message: entry.message }); } } + return res; } } diff --git a/packages/sdk/src/messaging/messaging.ts b/packages/sdk/src/messaging/messaging.ts index a3e058ed07..94e2d72006 100644 --- a/packages/sdk/src/messaging/messaging.ts +++ b/packages/sdk/src/messaging/messaging.ts @@ -6,9 +6,9 @@ import { IStore } from "@waku/interfaces"; -import { FilterAckManager } from "./fitler_ack.js"; +import { AckManager } from "./ack_manager.js"; import { MessageStore } from "./message_store.js"; -import { StoreAckManager } from "./store_ack.js"; +import { Sender } from "./sender.js"; interface IMessaging { send(encoder: IEncoder, message: IMessage): Promise; @@ -21,38 +21,34 @@ type MessagingConstructorParams = { }; export class Messaging implements IMessaging { - private readonly lightPush: ILightPush; private readonly messageStore: MessageStore; - private readonly filterAckManager: FilterAckManager; - private readonly storeAckManager: StoreAckManager; + private readonly ackManager: AckManager; + private readonly sender: Sender; public constructor(params: MessagingConstructorParams) { - this.lightPush = params.lightPush; this.messageStore = new MessageStore(); - this.filterAckManager = new FilterAckManager( - this.messageStore, - params.filter - ); - this.storeAckManager = new StoreAckManager(this.messageStore, params.store); + + this.ackManager = new AckManager({ + messageStore: this.messageStore, + filter: params.filter, + store: params.store + }); + + this.sender = new Sender({ + messageStore: this.messageStore, + lightPush: params.lightPush + }); } public start(): void { - this.filterAckManager.start(); - this.storeAckManager.start(); + this.ackManager.start(); } public async stop(): Promise { - await this.filterAckManager.stop(); - this.storeAckManager.stop(); + await this.ackManager.stop(); } public send(encoder: IEncoder, message: IMessage): Promise { - return (async () => { - const hash = await this.messageStore.queue(encoder, message); - await this.lightPush.send(encoder, message); - if (hash) { - this.messageStore.markSent(hash); - } - })(); + return this.sender.send(encoder, message); } } diff --git a/packages/sdk/src/messaging/sender.ts b/packages/sdk/src/messaging/sender.ts new file mode 100644 index 0000000000..cb21af4192 --- /dev/null +++ b/packages/sdk/src/messaging/sender.ts @@ -0,0 +1,26 @@ +import { IEncoder, ILightPush, IMessage } from "@waku/interfaces"; + +import type { MessageStore } from "./message_store.js"; + +type SenderConstructorParams = { + messageStore: MessageStore; + lightPush: ILightPush; +}; + +export class Sender { + private readonly messageStore: MessageStore; + private readonly lightPush: ILightPush; + + public constructor(params: SenderConstructorParams) { + this.messageStore = params.messageStore; + this.lightPush = params.lightPush; + } + + public async send(encoder: IEncoder, message: IMessage): Promise { + const requestId = await this.messageStore.queue(encoder, message); + await this.lightPush.send(encoder, message); + if (requestId) { + await this.messageStore.markSent(requestId); + } + } +} diff --git a/packages/sdk/src/messaging/store_ack.ts b/packages/sdk/src/messaging/store_ack.ts deleted file mode 100644 index 94511d1c92..0000000000 --- a/packages/sdk/src/messaging/store_ack.ts +++ /dev/null @@ -1,58 +0,0 @@ -import { IStore } from "@waku/interfaces"; - -import { MessageStore } from "./message_store.js"; -import { IAckManager, ICodec } from "./utils.js"; - -export class StoreAckManager implements IAckManager { - private interval: ReturnType | null = null; - - private codecs: Set = new Set(); - - public constructor( - private messageStore: MessageStore, - private store: IStore - ) {} - - public start(): void { - if (this.interval) { - return; - } - - this.interval = setInterval(() => { - void this.query(); - }, 1000); - } - - public stop(): void { - if (!this.interval) { - return; - } - - clearInterval(this.interval); - this.interval = null; - } - - public async subscribe(codec: ICodec): Promise { - this.codecs.add(codec); - return true; - } - - private async query(): Promise { - for (const codec of this.codecs) { - await this.store.queryWithOrderedCallback( - [codec], - (message) => { - if (!this.messageStore.has(message.hashStr)) { - this.messageStore.add(message); - } - - this.messageStore.markStoreAck(message.hashStr); - }, - { - timeStart: new Date(Date.now() - 60 * 60 * 1000), - timeEnd: new Date() - } - ); - } - } -}