diff --git a/packages/sds/src/message_channel/mem_local_history.spec.ts b/packages/sds/src/message_channel/mem_local_history.spec.ts index d57e46493a..68a8d2f4f3 100644 --- a/packages/sds/src/message_channel/mem_local_history.spec.ts +++ b/packages/sds/src/message_channel/mem_local_history.spec.ts @@ -9,23 +9,23 @@ describe("MemLocalHistory", () => { const hist = new MemLocalHistory({ maxSize: maxSize }); - hist.push( + hist.addMessages( new ContentMessage("1", "c", "a", [], 1n, undefined, new Uint8Array([1])) ); - expect(hist.length).to.eq(1); - hist.push( + expect(hist.size).to.eq(1); + hist.addMessages( new ContentMessage("2", "c", "a", [], 2n, undefined, new Uint8Array([2])) ); - expect(hist.length).to.eq(2); + expect(hist.size).to.eq(2); - hist.push( + hist.addMessages( new ContentMessage("3", "c", "a", [], 3n, undefined, new Uint8Array([3])) ); - expect(hist.length).to.eq(2); + expect(hist.size).to.eq(2); - expect(hist.findIndex((m) => m.messageId === "1")).to.eq(-1); - expect(hist.findIndex((m) => m.messageId === "2")).to.not.eq(-1); - expect(hist.findIndex((m) => m.messageId === "3")).to.not.eq(-1); + expect(hist.hasMessage("1")).to.eq(false); + expect(hist.hasMessage("2")).to.eq(true); + expect(hist.hasMessage("3")).to.eq(true); }); it("Cap max size when a pushed array is exceeding the cap", () => { @@ -33,18 +33,18 @@ describe("MemLocalHistory", () => { const hist = new MemLocalHistory({ maxSize: maxSize }); - hist.push( + hist.addMessages( new ContentMessage("1", "c", "a", [], 1n, undefined, new Uint8Array([1])) ); - expect(hist.length).to.eq(1); - hist.push( + expect(hist.size).to.eq(1); + hist.addMessages( new ContentMessage("2", "c", "a", [], 2n, undefined, new Uint8Array([2])), new ContentMessage("3", "c", "a", [], 3n, undefined, new Uint8Array([3])) ); - expect(hist.length).to.eq(2); + expect(hist.size).to.eq(2); - expect(hist.findIndex((m) => m.messageId === "1")).to.eq(-1); - expect(hist.findIndex((m) => m.messageId === "2")).to.not.eq(-1); - expect(hist.findIndex((m) => m.messageId === "3")).to.not.eq(-1); + expect(hist.hasMessage("1")).to.eq(false); + expect(hist.hasMessage("2")).to.eq(true); + expect(hist.hasMessage("3")).to.eq(true); }); }); diff --git a/packages/sds/src/message_channel/mem_local_history.ts b/packages/sds/src/message_channel/mem_local_history.ts index 109ffd8ad3..b0c06b3f21 100644 --- a/packages/sds/src/message_channel/mem_local_history.ts +++ b/packages/sds/src/message_channel/mem_local_history.ts @@ -1,7 +1,12 @@ import { Logger } from "@waku/utils"; import _ from "lodash"; -import { type ChannelId, ContentMessage, isContentMessage } from "./message.js"; +import { + type ChannelId, + ContentMessage, + type HistoryEntry, + isContentMessage +} from "./message.js"; import { PersistentStorage } from "./persistent_storage.js"; export const DEFAULT_MAX_LENGTH = 10_000; @@ -20,33 +25,13 @@ export const DEFAULT_MAX_LENGTH = 10_000; * at next push. */ export interface ILocalHistory { - length: number; - push(...items: ContentMessage[]): number; - some( - predicate: ( - value: ContentMessage, - index: number, - array: ContentMessage[] - ) => unknown, - thisArg?: any - ): boolean; - slice(start?: number, end?: number): ContentMessage[]; - find( - predicate: ( - value: ContentMessage, - index: number, - obj: ContentMessage[] - ) => unknown, - thisArg?: any - ): ContentMessage | undefined; - findIndex( - predicate: ( - value: ContentMessage, - index: number, - obj: ContentMessage[] - ) => unknown, - thisArg?: any - ): number; + readonly size: number; + addMessages(...messages: ContentMessage[]): void; + hasMessage(messageId: string): boolean; + getMessage(messageId: string): ContentMessage | undefined; + getRecentMessages(count: number): ContentMessage[]; + getAllMessages(): ContentMessage[]; + findMissingDependencies(entries: HistoryEntry[]): HistoryEntry[]; } export type MemLocalHistoryOptions = { @@ -58,6 +43,7 @@ const log = new Logger("sds:local-history"); export class MemLocalHistory implements ILocalHistory { private items: ContentMessage[] = []; + private messageIndex: Map = new Map(); private readonly storage?: PersistentStorage; private readonly maxSize: number; @@ -85,18 +71,18 @@ export class MemLocalHistory implements ILocalHistory { this.load(); } - public get length(): number { + public get size(): number { return this.items.length; } - public push(...items: ContentMessage[]): number { - for (const item of items) { - this.validateMessage(item); + public addMessages(...messages: ContentMessage[]): void { + for (const message of messages) { + this.validateMessage(message); } // Add new items and sort by timestamp, ensuring uniqueness by messageId // The valueOf() method on ContentMessage enables native < operator sorting - const combinedItems = [...this.items, ...items]; + const combinedItems = [...this.items, ...messages]; // Sort by timestamp (using valueOf() which creates timestamp_messageId string) combinedItems.sort((a, b) => a.valueOf().localeCompare(b.valueOf())); @@ -104,52 +90,45 @@ export class MemLocalHistory implements ILocalHistory { // Remove duplicates by messageId while maintaining order this.items = _.uniqBy(combinedItems, "messageId"); + this.rebuildIndex(); + // Let's drop older messages if max length is reached - if (this.length > this.maxSize) { - const numItemsToRemove = this.length - this.maxSize; - this.items.splice(0, numItemsToRemove); + if (this.size > this.maxSize) { + const numItemsToRemove = this.size - this.maxSize; + const removedItems = this.items.splice(0, numItemsToRemove); + for (const item of removedItems) { + this.messageIndex.delete(item.messageId); + } } this.save(); - - return this.items.length; } - public some( - predicate: ( - value: ContentMessage, - index: number, - array: ContentMessage[] - ) => unknown, - thisArg?: any - ): boolean { - return this.items.some(predicate, thisArg); + public hasMessage(messageId: string): boolean { + return this.messageIndex.has(messageId); } - public slice(start?: number, end?: number): ContentMessage[] { - return this.items.slice(start, end); + public getRecentMessages(count: number): ContentMessage[] { + return this.items.slice(-count); } - public find( - predicate: ( - value: ContentMessage, - index: number, - obj: ContentMessage[] - ) => unknown, - thisArg?: any - ): ContentMessage | undefined { - return this.items.find(predicate, thisArg); + public getAllMessages(): ContentMessage[] { + return [...this.items]; } - public findIndex( - predicate: ( - value: ContentMessage, - index: number, - obj: ContentMessage[] - ) => unknown, - thisArg?: any - ): number { - return this.items.findIndex(predicate, thisArg); + public getMessage(messageId: string): ContentMessage | undefined { + return this.messageIndex.get(messageId); + } + + public findMissingDependencies(entries: HistoryEntry[]): HistoryEntry[] { + return entries.filter((entry) => !this.messageIndex.has(entry.messageId)); + } + + private rebuildIndex(): void { + this.messageIndex.clear(); + for (const message of this.items) { + this.messageIndex.set(message.messageId, message); + } } private validateMessage(message: ContentMessage): void { @@ -168,6 +147,7 @@ export class MemLocalHistory implements ILocalHistory { const messages = this.storage?.load() ?? []; if (messages.length > 0) { this.items = messages; + this.rebuildIndex(); } } } diff --git a/packages/sds/src/message_channel/message_channel.spec.ts b/packages/sds/src/message_channel/message_channel.spec.ts index ed66f10abf..a81446e762 100644 --- a/packages/sds/src/message_channel/message_channel.spec.ts +++ b/packages/sds/src/message_channel/message_channel.spec.ts @@ -118,14 +118,10 @@ describe("MessageChannel", function () { const messageId = MessageChannel.getMessageId(payload); await sendMessage(channelA, payload, callback); const messageIdLog = channelA["localHistory"] as ILocalHistory; - expect(messageIdLog.length).to.equal(1); - expect( - messageIdLog.some( - (log) => - log.lamportTimestamp === expectedTimestamp && - log.messageId === messageId - ) - ).to.equal(true); + expect(messageIdLog.size).to.equal(1); + const msg = messageIdLog.getMessage(messageId); + expect(msg).to.exist; + expect(msg!.lamportTimestamp).to.equal(expectedTimestamp); }); it("should add sent message to localHistory with retrievalHint", async () => { @@ -139,12 +135,10 @@ describe("MessageChannel", function () { }); const localHistory = channelA["localHistory"] as ILocalHistory; - expect(localHistory.length).to.equal(1); + expect(localHistory.size).to.equal(1); // Find the message in local history - const historyEntry = localHistory.find( - (entry) => entry.messageId === messageId - ); + const historyEntry = localHistory.getMessage(messageId); expect(historyEntry).to.exist; expect(historyEntry!.retrievalHint).to.deep.equal(testRetrievalHint); }); @@ -296,11 +290,9 @@ describe("MessageChannel", function () { // Message should not be in local history const localHistory = channelB["localHistory"]; - expect( - localHistory.some( - ({ messageId }) => messageId === receivedMessage!.messageId - ) - ).to.equal(false); + expect(localHistory.hasMessage(receivedMessage!.messageId)).to.equal( + false + ); }); it("should add received message to localHistory with retrievalHint", async () => { @@ -325,12 +317,10 @@ describe("MessageChannel", function () { ); const localHistory = channelA["localHistory"] as ILocalHistory; - expect(localHistory.length).to.equal(1); + expect(localHistory.size).to.equal(1); // Find the message in local history - const historyEntry = localHistory.find( - (entry) => entry.messageId === messageId - ); + const historyEntry = localHistory.getMessage(messageId); expect(historyEntry).to.exist; expect(historyEntry!.retrievalHint).to.deep.equal(testRetrievalHint); }); @@ -379,35 +369,35 @@ describe("MessageChannel", function () { ); const localHistory = channelA["localHistory"]; - expect(localHistory.length).to.equal(3); + expect(localHistory.size).to.equal(3); // Verify chronological order: message1 (ts=1), message2 (ts=2), message3 (ts=3) - const first = localHistory.findIndex( - ({ messageId, lamportTimestamp }) => { + const first = localHistory + .getAllMessages() + .findIndex(({ messageId, lamportTimestamp }) => { return ( messageId === message1Id && lamportTimestamp === startTimestamp + 1n ); - } - ); + }); expect(first).to.eq(0); - const second = localHistory.findIndex( - ({ messageId, lamportTimestamp }) => { + const second = localHistory + .getAllMessages() + .findIndex(({ messageId, lamportTimestamp }) => { return ( messageId === message2Id && lamportTimestamp === startTimestamp + 2n ); - } - ); + }); expect(second).to.eq(1); - const third = localHistory.findIndex( - ({ messageId, lamportTimestamp }) => { + const third = localHistory + .getAllMessages() + .findIndex(({ messageId, lamportTimestamp }) => { return ( messageId === message3Id && lamportTimestamp === startTimestamp + 3n ); - } - ); + }); expect(third).to.eq(2); }); @@ -447,24 +437,24 @@ describe("MessageChannel", function () { ); const localHistory = channelA["localHistory"] as ILocalHistory; - expect(localHistory.length).to.equal(2); + expect(localHistory.size).to.equal(2); // When timestamps are equal, should be ordered by messageId lexicographically // The valueOf() method creates "000000000000005_messageId" for comparison const expectedOrder = [message1Id, message2Id].sort(); - const first = localHistory.findIndex( - ({ messageId, lamportTimestamp }) => { + const first = localHistory + .getAllMessages() + .findIndex(({ messageId, lamportTimestamp }) => { return messageId === expectedOrder[0] && lamportTimestamp == 5n; - } - ); + }); expect(first).to.eq(0); - const second = localHistory.findIndex( - ({ messageId, lamportTimestamp }) => { + const second = localHistory + .getAllMessages() + .findIndex(({ messageId, lamportTimestamp }) => { return messageId === expectedOrder[1] && lamportTimestamp == 5n; - } - ); + }); expect(second).to.eq(1); }); }); @@ -1112,7 +1102,7 @@ describe("MessageChannel", function () { }); channelB = createTestChannel(channelId, "bob", { causalHistorySize: 2 }); const message = utf8ToBytes("first message in channel"); - channelA["localHistory"].push( + channelA["localHistory"].addMessages( new ContentMessage( MessageChannel.getMessageId(message), "MyChannel", @@ -1156,7 +1146,7 @@ describe("MessageChannel", function () { ).to.equal(false); const localLog = channelA["localHistory"]; - expect(localLog.length).to.equal(1); // beforeEach adds one message + expect(localLog.size).to.equal(1); // beforeEach adds one message }); it("should not be delivered", async () => { @@ -1170,7 +1160,7 @@ describe("MessageChannel", function () { expect(timestampAfter).to.equal(timestampBefore); const localLog = channelB["localHistory"]; - expect(localLog.length).to.equal(0); + expect(localLog.size).to.equal(0); const bloomFilter = getBloomFilter(channelB); expect( @@ -1230,7 +1220,7 @@ describe("MessageChannel", function () { const channelB = createTestChannel(channelId, "bob"); // Track initial state - const localHistoryBefore = channelB["localHistory"].length; + const localHistoryBefore = channelB["localHistory"].size; const incomingBufferBefore = channelB["incomingBuffer"].length; const timestampBefore = channelB["lamportTimestamp"]; @@ -1248,7 +1238,7 @@ describe("MessageChannel", function () { // Verify ephemeral message behavior: // 1. Not added to local history - expect(channelB["localHistory"].length).to.equal(localHistoryBefore); + expect(channelB["localHistory"].size).to.equal(localHistoryBefore); // 2. Not added to incoming buffer expect(channelB["incomingBuffer"].length).to.equal(incomingBufferBefore); // 3. Doesn't update lamport timestamp @@ -1272,14 +1262,14 @@ describe("MessageChannel", function () { await sendMessage(channel1, utf8ToBytes("msg-1"), callback); await sendMessage(channel1, utf8ToBytes("msg-2"), callback); - expect(channel1["localHistory"].length).to.equal(2); + expect(channel1["localHistory"].size).to.equal(2); // Recreate channel with same storage - should load history const channel2 = new MessageChannel(persistentChannelId, "alice"); - expect(channel2["localHistory"].length).to.equal(2); + expect(channel2["localHistory"].size).to.equal(2); expect( - channel2["localHistory"].slice(0).map((m) => m.messageId) + channel2["localHistory"].getAllMessages().map((m) => m.messageId) ).to.deep.equal([ MessageChannel.getMessageId(utf8ToBytes("msg-1")), MessageChannel.getMessageId(utf8ToBytes("msg-2")) diff --git a/packages/sds/src/message_channel/message_channel.ts b/packages/sds/src/message_channel/message_channel.ts index 8b23809627..88ebe3c6d7 100644 --- a/packages/sds/src/message_channel/message_channel.ts +++ b/packages/sds/src/message_channel/message_channel.ts @@ -297,9 +297,8 @@ export class MessageChannel extends TypedEventEmitter { message.messageId, message.causalHistory.map((ch) => ch.messageId) ); - const missingDependencies = message.causalHistory.filter( - (messageHistoryEntry) => - !this.isMessageAvailable(messageHistoryEntry.messageId) + const missingDependencies = this.findMissingDependencies( + message.causalHistory ); if (missingDependencies.length === 0) { if (isContentMessage(message) && this.deliverMessage(message)) { @@ -454,7 +453,7 @@ export class MessageChannel extends TypedEventEmitter { this.channelId, this.senderId, this.localHistory - .slice(-this.causalHistorySize) + .getRecentMessages(this.causalHistorySize) .map(({ messageId, retrievalHint, senderId }) => { return { messageId, retrievalHint, senderId }; }), @@ -583,9 +582,8 @@ export class MessageChannel extends TypedEventEmitter { this.filter.insert(message.messageId); } - const missingDependencies = message.causalHistory.filter( - (messageHistoryEntry) => - !this.isMessageAvailable(messageHistoryEntry.messageId) + const missingDependencies = this.findMissingDependencies( + message.causalHistory ); if (missingDependencies.length > 0) { @@ -676,7 +674,7 @@ export class MessageChannel extends TypedEventEmitter { this.channelId, this.senderId, this.localHistory - .slice(-this.causalHistorySize) + .getRecentMessages(this.causalHistorySize) .map(({ messageId, retrievalHint, senderId }) => { return { messageId, retrievalHint, senderId }; }), @@ -699,7 +697,7 @@ export class MessageChannel extends TypedEventEmitter { if (success && isContentMessage(message)) { message.retrievalHint = retrievalHint; this.filter.insert(messageId); - this.localHistory.push(message); + this.localHistory.addMessages(message); this.timeReceived.set(messageId, Date.now()); this.safeSendEvent(MessageChannelEvent.OutMessageSent, { detail: message @@ -739,24 +737,15 @@ export class MessageChannel extends TypedEventEmitter { } } - /** - * Check if a message is available (either in localHistory or incomingBuffer) - * This prevents treating messages as "missing" when they've already been received - * but are waiting in the incoming buffer for their dependencies. - * - * @param messageId - The ID of the message to check - * @private - */ - private isMessageAvailable(messageId: MessageId): boolean { - // Check if in local history - if (this.localHistory.some((m) => m.messageId === messageId)) { - return true; - } - // Check if in incoming buffer (already received, waiting for dependencies) - if (this.incomingBuffer.some((m) => m.messageId === messageId)) { - return true; - } - return false; + private findMissingDependencies(entries: HistoryEntry[]): HistoryEntry[] { + const missingFromHistory = + this.localHistory.findMissingDependencies(entries); + + const incomingIds = new Set(this.incomingBuffer.map((m) => m.messageId)); + + return missingFromHistory.filter( + (entry) => !incomingIds.has(entry.messageId) + ); } /** @@ -786,8 +775,8 @@ export class MessageChannel extends TypedEventEmitter { } // Check if the entry is already present - const existingHistoryEntry = this.localHistory.find( - ({ messageId }) => messageId === message.messageId + const existingHistoryEntry = this.localHistory.getMessage( + message.messageId ); // The history entry is already present, no need to re-add @@ -799,7 +788,7 @@ export class MessageChannel extends TypedEventEmitter { log.warn("message delivered without a retrieval hint", message.messageId); } - this.localHistory.push(message); + this.localHistory.addMessages(message); return true; } diff --git a/packages/sds/src/message_channel/persistent_storage.spec.ts b/packages/sds/src/message_channel/persistent_storage.spec.ts index ec7547dab0..702af3aed1 100644 --- a/packages/sds/src/message_channel/persistent_storage.spec.ts +++ b/packages/sds/src/message_channel/persistent_storage.spec.ts @@ -15,27 +15,26 @@ describe("PersistentStorage", () => { expect(persistentStorage).to.not.be.undefined; const history1 = new MemLocalHistory({ storage: persistentStorage }); - history1.push(createMessage("msg-1", 1)); - history1.push(createMessage("msg-2", 2)); + history1.addMessages(createMessage("msg-1", 1)); + history1.addMessages(createMessage("msg-2", 2)); const history2 = new MemLocalHistory({ storage: persistentStorage }); - expect(history2.length).to.equal(2); - expect(history2.slice(0).map((msg) => msg.messageId)).to.deep.equal([ - "msg-1", - "msg-2" - ]); + expect(history2.size).to.equal(2); + expect( + history2.getAllMessages().map((msg) => msg.messageId) + ).to.deep.equal(["msg-1", "msg-2"]); }); it("uses in-memory only when no storage is provided", () => { const history = new MemLocalHistory({ maxSize: 100 }); - history.push(createMessage("msg-3", 3)); + history.addMessages(createMessage("msg-3", 3)); - expect(history.length).to.equal(1); - expect(history.slice(0)[0].messageId).to.equal("msg-3"); + expect(history.size).to.equal(1); + expect(history.getAllMessages()[0].messageId).to.equal("msg-3"); const history2 = new MemLocalHistory({ maxSize: 100 }); - expect(history2.length).to.equal(0); + expect(history2.size).to.equal(0); }); it("handles corrupt data in storage gracefully", () => { @@ -46,7 +45,7 @@ describe("PersistentStorage", () => { const persistentStorage = PersistentStorage.create(channelId, storage); const history = new MemLocalHistory({ storage: persistentStorage }); - expect(history.length).to.equal(0); + expect(history.size).to.equal(0); // Corrupt data is not saved expect(storage.getItem("waku:sds:history:channel-1")).to.equal(null); @@ -61,14 +60,14 @@ describe("PersistentStorage", () => { const history1 = new MemLocalHistory({ storage: storage1 }); const history2 = new MemLocalHistory({ storage: storage2 }); - history1.push(createMessage("msg-1", 1)); - history2.push(createMessage("msg-2", 2)); + history1.addMessages(createMessage("msg-1", 1)); + history2.addMessages(createMessage("msg-2", 2)); - expect(history1.length).to.equal(1); - expect(history1.slice(0)[0].messageId).to.equal("msg-1"); + expect(history1.size).to.equal(1); + expect(history1.getAllMessages()[0].messageId).to.equal("msg-1"); - expect(history2.length).to.equal(1); - expect(history2.slice(0)[0].messageId).to.equal("msg-2"); + expect(history2.size).to.equal(1); + expect(history2.getAllMessages()[0].messageId).to.equal("msg-2"); expect(storage.getItem("waku:sds:history:channel-1")).to.not.be.null; expect(storage.getItem("waku:sds:history:channel-2")).to.not.be.null; @@ -81,7 +80,7 @@ describe("PersistentStorage", () => { expect(storage.getItem("waku:sds:history:channel-1")).to.be.null; - history.push(createMessage("msg-1", 1)); + history.addMessages(createMessage("msg-1", 1)); expect(storage.getItem("waku:sds:history:channel-1")).to.not.be.null; @@ -95,15 +94,15 @@ describe("PersistentStorage", () => { const persistentStorage1 = PersistentStorage.create(channelId, storage); const history1 = new MemLocalHistory({ storage: persistentStorage1 }); - history1.push(createMessage("msg-1", 1)); - history1.push(createMessage("msg-2", 2)); - history1.push(createMessage("msg-3", 3)); + history1.addMessages(createMessage("msg-1", 1)); + history1.addMessages(createMessage("msg-2", 2)); + history1.addMessages(createMessage("msg-3", 3)); const persistentStorage2 = PersistentStorage.create(channelId, storage); const history2 = new MemLocalHistory({ storage: persistentStorage2 }); - expect(history2.length).to.equal(3); - expect(history2.slice(0).map((m) => m.messageId)).to.deep.equal([ + expect(history2.size).to.equal(3); + expect(history2.getAllMessages().map((m) => m.messageId)).to.deep.equal([ "msg-1", "msg-2", "msg-3" @@ -135,16 +134,15 @@ describe("PersistentStorage", () => { it("persists and restores messages with channelId", () => { const testChannelId = `test-${Date.now()}`; const history1 = new MemLocalHistory({ storage: testChannelId }); - history1.push(createMessage("msg-1", 1)); - history1.push(createMessage("msg-2", 2)); + history1.addMessages(createMessage("msg-1", 1)); + history1.addMessages(createMessage("msg-2", 2)); const history2 = new MemLocalHistory({ storage: testChannelId }); - expect(history2.length).to.equal(2); - expect(history2.slice(0).map((msg) => msg.messageId)).to.deep.equal([ - "msg-1", - "msg-2" - ]); + expect(history2.size).to.equal(2); + expect( + history2.getAllMessages().map((msg) => msg.messageId) + ).to.deep.equal(["msg-1", "msg-2"]); localStorage.removeItem(`waku:sds:history:${testChannelId}`); }); @@ -153,12 +151,12 @@ describe("PersistentStorage", () => { const testChannelId = `auto-storage-${Date.now()}`; const history = new MemLocalHistory({ storage: testChannelId }); - history.push(createMessage("msg-auto-1", 1)); - history.push(createMessage("msg-auto-2", 2)); + history.addMessages(createMessage("msg-auto-1", 1)); + history.addMessages(createMessage("msg-auto-2", 2)); const history2 = new MemLocalHistory({ storage: testChannelId }); - expect(history2.length).to.equal(2); - expect(history2.slice(0).map((m) => m.messageId)).to.deep.equal([ + expect(history2.size).to.equal(2); + expect(history2.getAllMessages().map((m) => m.messageId)).to.deep.equal([ "msg-auto-1", "msg-auto-2" ]); diff --git a/packages/sds/src/message_channel/repair/repair.ts b/packages/sds/src/message_channel/repair/repair.ts index 1a8a85f43e..8eb13f8c64 100644 --- a/packages/sds/src/message_channel/repair/repair.ts +++ b/packages/sds/src/message_channel/repair/repair.ts @@ -1,8 +1,8 @@ import { Logger } from "@waku/utils"; +import type { ILocalHistory } from "../mem_local_history.js"; import type { HistoryEntry, MessageId } from "../message.js"; import { Message } from "../message.js"; -import type { ILocalHistory } from "../message_channel.js"; import { IncomingRepairBuffer, OutgoingRepairBuffer } from "./buffers.js"; import { @@ -191,9 +191,7 @@ export class RepairManager { this.outgoingBuffer.remove(request.messageId); // Check if we have this message - const message = localHistory.find( - (m) => m.messageId === request.messageId - ); + const message = localHistory.getMessage(request.messageId); if (!message) { log.info( `Cannot fulfill repair for ${request.messageId} - not in local history` @@ -255,7 +253,7 @@ export class RepairManager { const messages: Message[] = []; for (const entry of ready) { - const message = localHistory.find((m) => m.messageId === entry.messageId); + const message = localHistory.getMessage(entry.messageId); if (message) { messages.push(message); log.info(`Sending repair for ${entry.messageId}`);