diff --git a/packages/sdk/src/messaging/messaging.ts b/packages/sdk/src/messaging/messaging.ts index 1324961d36..f68708311b 100644 --- a/packages/sdk/src/messaging/messaging.ts +++ b/packages/sdk/src/messaging/messaging.ts @@ -1,6 +1,6 @@ -import { messageHashStr } from "@waku/core"; import { IDecodedMessage, + IDecoder, IEncoder, IFilter, ILightPush, @@ -37,7 +37,7 @@ class MessageStore { // -> sent more than 2s ago but not acked yet (store or filter) } -type ICodec = null; +type ICodec = IEncoder & IDecoder; interface IAckManager { start(): void; @@ -46,42 +46,88 @@ interface IAckManager { } class FilterAckManager implements IAckManager { - private subscriptions: Set = new Set(); + private codecs: Set = new Set(); public constructor( private messageStore: MessageStore, private filter: IFilter ) {} - public start(): void {} + public start(): void { + // noop + } - public stop(): void {} + public async stop(): Promise { + const promises = Array.from(this.codecs.entries()).map((codec) => { + return this.filter.unsubscribe(codec); + }); + + await Promise.all(promises); + this.codecs.clear(); + } public async subscribe(codec: ICodec): Promise { return this.filter.subscribe(codec, this.onMessage.bind(this)); } private async onMessage(message: IDecodedMessage): Promise { - const hash = messageHashStr(message.pubsubTopic, message); - - if (this.messageStore.has(message)) { - this.messageStore.markFilterAck(hash); - } else { - this.messageStore.put(message); - this.messageStore.markFilterAck(hash); + if (!this.messageStore.has(message.hashStr)) { + this.messageStore.add(message); } + + this.messageStore.markFilterAck(message.hashStr); } } class StoreAckManager implements IAckManager { + private interval: ReturnType | null = null; + + private codecs: Set = new Set(); + public constructor( private messageStore: MessageStore, private store: IStore ) {} - public start(): void {} + public start(): void { + if (this.interval) { + return; + } - public stop(): void {} + this.interval = setInterval(() => { + void this.query(); + }, 1000); + } - public subscribe(codec: ICodec): void {} + public stop(): void { + if (!this.interval) { + return; + } + + clearInterval(this.interval); + this.interval = null; + } + + public subscribe(codec: ICodec): void { + this.codecs.add(codec); + } + + private async query(): Promise { + for (const codec of this.codecs) { + await this.store.queryWithOrderedCallback( + [codec], + (message) => { + if (!this.messageStore.has(message.hashStr)) { + this.messageStore.add(message); + } + + this.messageStore.markStoreAck(message.hashStr); + }, + { + timeStart: new Date(Date.now() - 60 * 60 * 1000), + timeEnd: new Date() + } + ); + } + } }