diff --git a/packages/sds/src/message_channel/repair/buffers.ts b/packages/sds/src/message_channel/repair/buffers.ts index 6f014c257e..799832ced6 100644 --- a/packages/sds/src/message_channel/repair/buffers.ts +++ b/packages/sds/src/message_channel/repair/buffers.ts @@ -93,12 +93,14 @@ export class OutgoingRepairBuffer { // Iterate from front of sorted array (earliest T_req first) for (const item of this.items) { - // Only return items that are eligible and haven't been requested yet - if ( - item.tReq <= currentTime && - !item.requested && - eligible.length < maxRequests - ) { + // Since array is sorted, once we hit an item with tReq > currentTime, + // all remaining items also have tReq > currentTime + if (item.tReq > currentTime) { + break; + } + + // Only return items that haven't been requested yet + if (!item.requested && eligible.length < maxRequests) { eligible.push(item.entry); // Mark as requested so we don't request it again item.requested = true; @@ -106,6 +108,11 @@ export class OutgoingRepairBuffer { `Repair request for ${item.entry.messageId} is eligible and marked as requested` ); } + + // If we've found enough eligible items, exit early + if (eligible.length >= maxRequests) { + break; + } } return eligible; diff --git a/packages/sds/src/message_channel/repair/repair.ts b/packages/sds/src/message_channel/repair/repair.ts index d099cff68d..439ba0a82b 100644 --- a/packages/sds/src/message_channel/repair/repair.ts +++ b/packages/sds/src/message_channel/repair/repair.ts @@ -172,10 +172,22 @@ export class RepairManager { */ public onMessageReceived(messageId: string): void { // Remove from both buffers as we no longer need to request or respond - this.outgoingBuffer.remove(messageId); - this.incomingBuffer.remove(messageId); + const wasInOutgoing = this.outgoingBuffer.has(messageId); + const wasInIncoming = this.incomingBuffer.has(messageId); - log.info(`Removed ${messageId} from repair buffers after receipt`); + if (wasInOutgoing) { + this.outgoingBuffer.remove(messageId); + log.info( + `Removed ${messageId} from outgoing repair buffer after receipt` + ); + } + + if (wasInIncoming) { + this.incomingBuffer.remove(messageId); + log.info( + `Removed ${messageId} from incoming repair buffer after receipt` + ); + } } /**