From febd7cbd2ebfb67225b862ffa3589c5a57b64a8e Mon Sep 17 00:00:00 2001 From: jm-clius Date: Thu, 23 Oct 2025 12:09:31 +0100 Subject: [PATCH] fix: fixed buffer handling incoming and outgoing --- .../src/message_channel/message_channel.ts | 28 ++++-- .../sds/src/message_channel/repair/buffers.ts | 96 ++++++++++++------- .../sds/src/message_channel/repair/repair.ts | 44 +++++---- 3 files changed, 107 insertions(+), 61 deletions(-) diff --git a/packages/sds/src/message_channel/message_channel.ts b/packages/sds/src/message_channel/message_channel.ts index fd374fd95f..00b13311c2 100644 --- a/packages/sds/src/message_channel/message_channel.ts +++ b/packages/sds/src/message_channel/message_channel.ts @@ -285,9 +285,7 @@ export class MessageChannel extends TypedEventEmitter { ); const missingDependencies = message.causalHistory.filter( (messageHistoryEntry) => - !this.localHistory.some( - ({ messageId }) => messageId === messageHistoryEntry.messageId - ) + !this.isMessageAvailable(messageHistoryEntry.messageId) ); if (missingDependencies.length === 0) { if (isContentMessage(message) && this.deliverMessage(message)) { @@ -562,9 +560,7 @@ export class MessageChannel extends TypedEventEmitter { const missingDependencies = message.causalHistory.filter( (messageHistoryEntry) => - !this.localHistory.some( - ({ messageId }) => messageId === messageHistoryEntry.messageId - ) + !this.isMessageAvailable(messageHistoryEntry.messageId) ); if (missingDependencies.length > 0) { @@ -715,6 +711,26 @@ export class MessageChannel extends TypedEventEmitter { } } + /** + * Check if a message is available (either in localHistory or incomingBuffer) + * This prevents treating messages as "missing" when they've already been received + * but are waiting in the incoming buffer for their dependencies. + * + * @param messageId - The ID of the message to check + * @private + */ + private isMessageAvailable(messageId: MessageId): boolean { + // Check if in local history + if (this.localHistory.some((m) => m.messageId === messageId)) { + return true; + } + // Check if in incoming buffer (already received, waiting for dependencies) + if (this.incomingBuffer.some((m) => m.messageId === messageId)) { + return true; + } + return false; + } + /** * Return true if the message was "delivered" * diff --git a/packages/sds/src/message_channel/repair/buffers.ts b/packages/sds/src/message_channel/repair/buffers.ts index caadd97c8f..8f405961d7 100644 --- a/packages/sds/src/message_channel/repair/buffers.ts +++ b/packages/sds/src/message_channel/repair/buffers.ts @@ -1,5 +1,4 @@ import { Logger } from "@waku/utils"; -import _ from "lodash"; import type { HistoryEntry } from "../message.js"; @@ -11,6 +10,7 @@ const log = new Logger("sds:repair:buffers"); interface OutgoingBufferEntry { entry: HistoryEntry; tReq: number; // Timestamp when this repair request should be sent + requested: boolean; // Whether this repair has been requested already } /** @@ -30,66 +30,84 @@ export class OutgoingRepairBuffer { private items: OutgoingBufferEntry[] = []; private readonly maxSize: number; - constructor(maxSize = 1000) { + public constructor(maxSize = 1000) { this.maxSize = maxSize; } /** * Add a missing message to the outgoing repair request buffer * If message already exists, it is not updated (keeps original T_req) + * @returns true if the entry was added, false if it already existed */ - public add(entry: HistoryEntry, tReq: number): void { + public add(entry: HistoryEntry, tReq: number): boolean { const messageId = entry.messageId; - + // Check if already exists - do NOT update T_req per spec - const existingIndex = this.items.findIndex(item => item.entry.messageId === messageId); + const existingIndex = this.items.findIndex( + (item) => item.entry.messageId === messageId + ); if (existingIndex !== -1) { - log.info(`Message ${messageId} already in outgoing buffer, keeping original T_req`); - return; + log.info( + `Message ${messageId} already in outgoing buffer, keeping original T_req` + ); + return false; } // Check buffer size limit if (this.items.length >= this.maxSize) { // Evict oldest T_req entry (first in sorted array since we want to evict oldest) const evicted = this.items.shift()!; - log.warn(`Buffer full, evicted oldest entry ${evicted.entry.messageId} with T_req ${evicted.tReq}`); + log.warn( + `Buffer full, evicted oldest entry ${evicted.entry.messageId} with T_req ${evicted.tReq}` + ); } // Add new entry and re-sort - const newEntry: OutgoingBufferEntry = { entry, tReq }; + const newEntry: OutgoingBufferEntry = { entry, tReq, requested: false }; const combined = [...this.items, newEntry]; - + // Sort by T_req (ascending) combined.sort((a, b) => a.tReq - b.tReq); - + this.items = combined; log.info(`Added ${messageId} to outgoing buffer with T_req: ${tReq}`); + return true; } /** * Remove a message from the buffer (e.g., when received) */ public remove(messageId: string): void { - this.items = this.items.filter(item => item.entry.messageId !== messageId); + this.items = this.items.filter( + (item) => item.entry.messageId !== messageId + ); } /** * Get eligible repair requests (where T_req <= currentTime) * Returns up to maxRequests entries from the front of the sorted array + * Marks returned entries as requested but keeps them in buffer until received */ public getEligible(currentTime: number, maxRequests = 3): HistoryEntry[] { const eligible: HistoryEntry[] = []; - + // Iterate from front of sorted array (earliest T_req first) for (const item of this.items) { - if (item.tReq <= currentTime && eligible.length < maxRequests) { + // Only return items that are eligible and haven't been requested yet + if ( + item.tReq <= currentTime && + !item.requested && + eligible.length < maxRequests + ) { eligible.push(item.entry); - } else if (item.tReq > currentTime) { - // Since array is sorted, no more eligible entries - break; + // Mark as requested so we don't request it again + item.requested = true; + log.info( + `Repair request for ${item.entry.messageId} is eligible and marked as requested` + ); } } - + return eligible; } @@ -97,7 +115,7 @@ export class OutgoingRepairBuffer { * Check if a message is in the buffer */ public has(messageId: string): boolean { - return this.items.some(item => item.entry.messageId === messageId); + return this.items.some((item) => item.entry.messageId === messageId); } /** @@ -118,7 +136,7 @@ export class OutgoingRepairBuffer { * Get all entries (for testing/debugging) */ public getAll(): HistoryEntry[] { - return this.items.map(item => item.entry); + return this.items.map((item) => item.entry); } /** @@ -138,47 +156,55 @@ export class IncomingRepairBuffer { private items: IncomingBufferEntry[] = []; private readonly maxSize: number; - constructor(maxSize = 1000) { + public constructor(maxSize = 1000) { this.maxSize = maxSize; } /** * Add a repair request that we can fulfill * If message already exists, it is ignored (not updated) + * @returns true if the entry was added, false if it already existed */ - public add(entry: HistoryEntry, tResp: number): void { + public add(entry: HistoryEntry, tResp: number): boolean { const messageId = entry.messageId; - + // Check if already exists - ignore per spec - const existingIndex = this.items.findIndex(item => item.entry.messageId === messageId); + const existingIndex = this.items.findIndex( + (item) => item.entry.messageId === messageId + ); if (existingIndex !== -1) { log.info(`Message ${messageId} already in incoming buffer, ignoring`); - return; + return false; } // Check buffer size limit if (this.items.length >= this.maxSize) { // Evict furthest T_resp entry (last in sorted array) const evicted = this.items.pop()!; - log.warn(`Buffer full, evicted furthest entry ${evicted.entry.messageId} with T_resp ${evicted.tResp}`); + log.warn( + `Buffer full, evicted furthest entry ${evicted.entry.messageId} with T_resp ${evicted.tResp}` + ); } // Add new entry and re-sort const newEntry: IncomingBufferEntry = { entry, tResp }; const combined = [...this.items, newEntry]; - + // Sort by T_resp (ascending) combined.sort((a, b) => a.tResp - b.tResp); - + this.items = combined; log.info(`Added ${messageId} to incoming buffer with T_resp: ${tResp}`); + return true; } /** * Remove a message from the buffer */ public remove(messageId: string): void { - this.items = this.items.filter(item => item.entry.messageId !== messageId); + this.items = this.items.filter( + (item) => item.entry.messageId !== messageId + ); } /** @@ -188,7 +214,7 @@ export class IncomingRepairBuffer { public getReady(currentTime: number): HistoryEntry[] { const ready: HistoryEntry[] = []; const remaining: IncomingBufferEntry[] = []; - + for (const item of this.items) { if (item.tResp <= currentTime) { ready.push(item.entry); @@ -198,10 +224,10 @@ export class IncomingRepairBuffer { remaining.push(item); } } - + // Keep only non-ready entries this.items = remaining; - + return ready; } @@ -209,7 +235,7 @@ export class IncomingRepairBuffer { * Check if a message is in the buffer */ public has(messageId: string): boolean { - return this.items.some(item => item.entry.messageId === messageId); + return this.items.some((item) => item.entry.messageId === messageId); } /** @@ -230,7 +256,7 @@ export class IncomingRepairBuffer { * Get all entries (for testing/debugging) */ public getAll(): HistoryEntry[] { - return this.items.map(item => item.entry); + return this.items.map((item) => item.entry); } /** @@ -239,4 +265,4 @@ export class IncomingRepairBuffer { public getItems(): IncomingBufferEntry[] { return [...this.items]; } -} \ No newline at end of file +} diff --git a/packages/sds/src/message_channel/repair/repair.ts b/packages/sds/src/message_channel/repair/repair.ts index 90c854c49b..79e1e69770 100644 --- a/packages/sds/src/message_channel/repair/repair.ts +++ b/packages/sds/src/message_channel/repair/repair.ts @@ -144,18 +144,20 @@ export class RepairManager { // Calculate when to request this repair const tReq = this.calculateTReq(entry.messageId, currentTime); - // Add to outgoing buffer - this.outgoingBuffer.add(entry, tReq); + // Add to outgoing buffer - only log and emit event if actually added + const wasAdded = this.outgoingBuffer.add(entry, tReq); - log.info( - `Added missing dependency ${entry.messageId} to repair buffer with T_req=${tReq}` - ); + if (wasAdded) { + log.info( + `Added missing dependency ${entry.messageId} to repair buffer with T_req=${tReq}` + ); - // Emit event - this.eventEmitter?.("RepairRequestQueued", { - messageId: entry.messageId, - tReq - }); + // Emit event + this.eventEmitter?.("RepairRequestQueued", { + messageId: entry.messageId, + tReq + }); + } } } @@ -234,18 +236,20 @@ export class RepairManager { currentTime ); - // Add to incoming buffer - this.incomingBuffer.add(request, tResp); + // Add to incoming buffer - only log and emit event if actually added + const wasAdded = this.incomingBuffer.add(request, tResp); - log.info( - `Will respond to repair request for ${request.messageId} at T_resp=${tResp}` - ); + if (wasAdded) { + log.info( + `Will respond to repair request for ${request.messageId} at T_resp=${tResp}` + ); - // Emit event - this.eventEmitter?.("RepairResponseQueued", { - messageId: request.messageId, - tResp - }); + // Emit event + this.eventEmitter?.("RepairResponseQueued", { + messageId: request.messageId, + tResp + }); + } } }