diff --git a/packages/sds/src/message_channel/events.ts b/packages/sds/src/message_channel/events.ts index 0a40dce2bb..ecc2a55edc 100644 --- a/packages/sds/src/message_channel/events.ts +++ b/packages/sds/src/message_channel/events.ts @@ -1,4 +1,4 @@ -import { HistoryEntry, Message, MessageId } from "./message.js"; +import { HistoryEntry, Message, MessageId, ParticipantId } from "./message.js"; export enum MessageChannelEvent { OutMessageSent = "sds:out:message-sent", diff --git a/packages/sds/src/message_channel/message.ts b/packages/sds/src/message_channel/message.ts index d820a10445..1d45b96077 100644 --- a/packages/sds/src/message_channel/message.ts +++ b/packages/sds/src/message_channel/message.ts @@ -102,7 +102,7 @@ export class SyncMessage extends Message { public lamportTimestamp: bigint, public bloomFilter: Uint8Array | undefined, public content: undefined, - public override repairRequest: proto_sds_message.HistoryEntry[] = [], + public repairRequest: proto_sds_message.HistoryEntry[] = [], /** * Not encoded, set after it is sent, used to include in follow-up messages */ @@ -148,7 +148,7 @@ export class EphemeralMessage extends Message { public lamportTimestamp: undefined, public bloomFilter: Uint8Array | undefined, public content: Uint8Array, - public override repairRequest: proto_sds_message.HistoryEntry[] = [], + public repairRequest: proto_sds_message.HistoryEntry[] = [], /** * Not encoded, set after it is sent, used to include in follow-up messages */ @@ -198,7 +198,7 @@ export class ContentMessage extends Message { public lamportTimestamp: bigint, public bloomFilter: Uint8Array | undefined, public content: Uint8Array, - public override repairRequest: proto_sds_message.HistoryEntry[] = [], + public repairRequest: proto_sds_message.HistoryEntry[] = [], /** * Not encoded, set after it is sent, used to include in follow-up messages */ diff --git a/packages/sds/src/message_channel/message_channel.ts b/packages/sds/src/message_channel/message_channel.ts index 5a50193072..d7602f91f0 100644 --- a/packages/sds/src/message_channel/message_channel.ts +++ b/packages/sds/src/message_channel/message_channel.ts @@ -542,7 +542,7 @@ export class MessageChannel extends TypedEventEmitter { } // SDS-R: Handle received message in repair manager - this.repairManager.onMessageReceived(message.messageId); + this.repairManager.markMessageReceived(message.messageId); // SDS-R: Process incoming repair requests if (message.repairRequest && message.repairRequest.length > 0) { @@ -582,7 +582,7 @@ export class MessageChannel extends TypedEventEmitter { ); // SDS-R: Track missing dependencies in repair manager - this.repairManager.onMissingDependencies(missingDependencies); + this.repairManager.markDependenciesMissing(missingDependencies); this.safeSendEvent(MessageChannelEvent.InMessageMissing, { detail: Array.from(missingDependencies) diff --git a/packages/sds/src/message_channel/repair/buffers.ts b/packages/sds/src/message_channel/repair/buffers.ts index 5f1102e3f6..b2c45f3f5e 100644 --- a/packages/sds/src/message_channel/repair/buffers.ts +++ b/packages/sds/src/message_channel/repair/buffers.ts @@ -43,10 +43,7 @@ export class OutgoingRepairBuffer { const messageId = entry.messageId; // Check if already exists - do NOT update T_req per spec - const existingIndex = this.items.findIndex( - (item) => item.entry.messageId === messageId - ); - if (existingIndex !== -1) { + if (this.has(messageId)) { log.info( `Message ${messageId} already in outgoing buffer, keeping original T_req` ); @@ -55,7 +52,7 @@ export class OutgoingRepairBuffer { // Check buffer size limit if (this.items.length >= this.maxSize) { - // Evict furthest T_req entry (last in sorted array) to preserve repairs that need to be sent the soonest + // Evict furthest T_req entry (last in sorted array) to preserve repairs that need to be sent the soonest const evicted = this.items.pop()!; log.warn( `Buffer full, evicted furthest entry ${evicted.entry.messageId} with T_req ${evicted.tReq}` @@ -88,7 +85,10 @@ export class OutgoingRepairBuffer { * Returns up to maxRequests entries from the front of the sorted array * Marks returned entries as requested but keeps them in buffer until received */ - public getEligible(currentTime: number = Date.now(), maxRequests = 3): HistoryEntry[] { + public getEligible( + currentTime: number = Date.now(), + maxRequests = 3 + ): HistoryEntry[] { const eligible: HistoryEntry[] = []; // Iterate from front of sorted array (earliest T_req first) @@ -176,10 +176,7 @@ export class IncomingRepairBuffer { const messageId = entry.messageId; // Check if already exists - ignore per spec - const existingIndex = this.items.findIndex( - (item) => item.entry.messageId === messageId - ); - if (existingIndex !== -1) { + if (this.has(messageId)) { log.info(`Message ${messageId} already in incoming buffer, ignoring`); return false; } diff --git a/packages/sds/src/message_channel/repair/repair.ts b/packages/sds/src/message_channel/repair/repair.ts index 099954bd08..b61a6ca51b 100644 --- a/packages/sds/src/message_channel/repair/repair.ts +++ b/packages/sds/src/message_channel/repair/repair.ts @@ -1,6 +1,6 @@ import { Logger } from "@waku/utils"; -import type { HistoryEntry } from "../message.js"; +import type { HistoryEntry, MessageId } from "../message.js"; import { Message } from "../message.js"; import type { ILocalHistory } from "../message_channel.js";