From 42bfd98861903a7e686df774d9877ed3392c719a Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Fri, 28 Nov 2025 10:56:34 -0500 Subject: [PATCH] refactor: abstract `messageIndex` behind `ILocalHistory` as `findMissingDependencies` --- .../src/message_channel/mem_local_history.ts | 12 ++++++++++- .../src/message_channel/message_channel.ts | 21 +++++++++++++------ 2 files changed, 26 insertions(+), 7 deletions(-) diff --git a/packages/sds/src/message_channel/mem_local_history.ts b/packages/sds/src/message_channel/mem_local_history.ts index 8df3830d3c..b0c06b3f21 100644 --- a/packages/sds/src/message_channel/mem_local_history.ts +++ b/packages/sds/src/message_channel/mem_local_history.ts @@ -1,7 +1,12 @@ import { Logger } from "@waku/utils"; import _ from "lodash"; -import { type ChannelId, ContentMessage, isContentMessage } from "./message.js"; +import { + type ChannelId, + ContentMessage, + type HistoryEntry, + isContentMessage +} from "./message.js"; import { PersistentStorage } from "./persistent_storage.js"; export const DEFAULT_MAX_LENGTH = 10_000; @@ -26,6 +31,7 @@ export interface ILocalHistory { getMessage(messageId: string): ContentMessage | undefined; getRecentMessages(count: number): ContentMessage[]; getAllMessages(): ContentMessage[]; + findMissingDependencies(entries: HistoryEntry[]): HistoryEntry[]; } export type MemLocalHistoryOptions = { @@ -114,6 +120,10 @@ export class MemLocalHistory implements ILocalHistory { return this.messageIndex.get(messageId); } + public findMissingDependencies(entries: HistoryEntry[]): HistoryEntry[] { + return entries.filter((entry) => !this.messageIndex.has(entry.messageId)); + } + private rebuildIndex(): void { this.messageIndex.clear(); for (const message of this.items) { diff --git a/packages/sds/src/message_channel/message_channel.ts b/packages/sds/src/message_channel/message_channel.ts index a421eb35ab..b5a2f4b151 100644 --- a/packages/sds/src/message_channel/message_channel.ts +++ b/packages/sds/src/message_channel/message_channel.ts @@ -297,9 +297,8 @@ export class MessageChannel extends TypedEventEmitter { message.messageId, message.causalHistory.map((ch) => ch.messageId) ); - const missingDependencies = message.causalHistory.filter( - (messageHistoryEntry) => - !this.isMessageAvailable(messageHistoryEntry.messageId) + const missingDependencies = this.findMissingDependencies( + message.causalHistory ); if (missingDependencies.length === 0) { if (isContentMessage(message) && this.deliverMessage(message)) { @@ -583,9 +582,8 @@ export class MessageChannel extends TypedEventEmitter { this.filter.insert(message.messageId); } - const missingDependencies = message.causalHistory.filter( - (messageHistoryEntry) => - !this.isMessageAvailable(messageHistoryEntry.messageId) + const missingDependencies = this.findMissingDependencies( + message.causalHistory ); if (missingDependencies.length > 0) { @@ -739,6 +737,17 @@ export class MessageChannel extends TypedEventEmitter { } } + private findMissingDependencies(entries: HistoryEntry[]): HistoryEntry[] { + const missingFromHistory = + this.localHistory.findMissingDependencies(entries); + + const incomingIds = new Set(this.incomingBuffer.map((m) => m.messageId)); + + return missingFromHistory.filter( + (entry) => !incomingIds.has(entry.messageId) + ); + } + /** * Check if a message is available (either in localHistory or incomingBuffer) * This prevents treating messages as "missing" when they've already been received