From fda6ab0f8071ac6c4224e7d6d3c8c3d6bd2f8ab8 Mon Sep 17 00:00:00 2001 From: darshankabariya Date: Fri, 1 May 2026 02:20:22 +0530 Subject: [PATCH] feat: prioritise most-overdue entries when attaching repair requests --- sds.nim | 19 ++++++++++++++----- tests/test_reliability.nim | 37 +++++++++++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 5 deletions(-) diff --git a/sds.nim b/sds.nim index 6df906d..7c724d5 100644 --- a/sds.nim +++ b/sds.nim @@ -1,4 +1,4 @@ -import std/[times, locks, tables, sets, options] +import std/[algorithm, times, locks, tables, sets, options] import chronos, results, chronicles import sds/[types, protobuf, sds_utils, rolling_bloom_filter] @@ -89,14 +89,23 @@ proc wrapOutgoingMessage*( error "Failed to serialize bloom filter", channelId = channelId return err(ReliabilityError.reSerializationError) - # SDS-R: collect eligible expired repair requests to attach + # SDS-R: collect eligible expired repair requests to attach. Per + # spec (sds-r-send-message, RECOMMENDED), prioritise the entries with + # the smallest minTimeRepairReq — they are the most overdue and the + # ones the network most needs us to ask about. var repairReqs: seq[HistoryEntry] = @[] let now = getTime() var expiredKeys: seq[SdsMessageID] = @[] + var eligible: seq[(SdsMessageID, OutgoingRepairEntry)] = @[] for msgId, repairEntry in channel.outgoingRepairBuffer: - if now >= repairEntry.minTimeRepairReq and repairReqs.len < rm.config.maxRepairRequests: - repairReqs.add(repairEntry.outHistEntry) - expiredKeys.add(msgId) + if now >= repairEntry.minTimeRepairReq: + eligible.add((msgId, repairEntry)) + eligible.sort do(a, b: (SdsMessageID, OutgoingRepairEntry)) -> int: + cmp(a[1].minTimeRepairReq, b[1].minTimeRepairReq) + let take = min(eligible.len, rm.config.maxRepairRequests) + for i in 0 ..< take: + repairReqs.add(eligible[i][1].outHistEntry) + expiredKeys.add(eligible[i][0]) for key in expiredKeys: channel.outgoingRepairBuffer.del(key) diff --git a/tests/test_reliability.nim b/tests/test_reliability.nim index 6346395..d6d8993 100644 --- a/tests/test_reliability.nim +++ b/tests/test_reliability.nim @@ -1148,6 +1148,43 @@ suite "SDS-R: Repair Buffer Management": # Should be removed from buffer after attaching "missing-msg" notin channel.outgoingRepairBuffer + test "expired repair requests attach the most-overdue first when capped": + # Per spec (sds-r-send-message, RECOMMENDED): when more entries are + # eligible than maxRepairRequests, attach the ones with the smallest + # minTimeRepairReq — i.e. the most overdue. + rm.setCallbacks( + proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} = discard, + proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} = discard, + proc(messageId: SdsMessageID, missingDeps: seq[HistoryEntry], channelId: SdsChannelID) {.gcsafe.} = discard, + ) + let channel = rm.channels[testChannel] + let now = getTime() + + # Five eligible entries with strictly ordered minTimeRepairReq (most-overdue first). + # All are expired; the cap is the default 3, so two should be left behind. + let expected = ["oldest", "second", "third", "fourth", "newest"] + for i, id in expected: + channel.outgoingRepairBuffer[id] = OutgoingRepairEntry( + outHistEntry: HistoryEntry(messageId: id, senderId: "sender"), + minTimeRepairReq: now - initDuration(seconds = 50 - i * 10), + ) + + let wrapped = rm.wrapOutgoingMessage(@[byte(1)], "outbound", testChannel) + check wrapped.isOk() + + let attached = deserializeMessage(wrapped.get()).get().repairRequest + check: + attached.len == rm.config.maxRepairRequests + attached[0].messageId == "oldest" + attached[1].messageId == "second" + attached[2].messageId == "third" + # Two least-overdue remain in the buffer for next time. + "fourth" in channel.outgoingRepairBuffer + "newest" in channel.outgoingRepairBuffer + "oldest" notin channel.outgoingRepairBuffer + "second" notin channel.outgoingRepairBuffer + "third" notin channel.outgoingRepairBuffer + test "incoming repair request adds to incoming repair buffer when eligible": rm.setCallbacks( proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} = discard,