feat: acquire rm.lock in runRepairSweep

This commit is contained in:
darshankabariya 2026-05-01 02:38:07 +05:30
parent fda6ab0f80
commit 0516b01542
No known key found for this signature in database
GPG Key ID: 9A92CCD9899F0D22

57
sds.nim
View File

@ -384,35 +384,38 @@ proc runRepairSweep*(rm: ReliabilityManager) {.gcsafe, raises: [].} =
## - Incoming: fires onRepairReady for expired T_resp entries and removes them
## - Outgoing: drops entries past T_max window
## Exposed so it can be driven directly in tests; also invoked by periodicRepairSweep.
try:
let now = getTime()
for channelId, channel in rm.channels:
try:
# Check incoming repair buffer for expired T_resp (time to rebroadcast)
var toRebroadcast: seq[SdsMessageID] = @[]
for msgId, entry in channel.incomingRepairBuffer:
if now >= entry.minTimeRepairResp:
toRebroadcast.add(msgId)
## Acquires rm.lock so the repair buffers cannot be observed mid-mutation by
## a concurrent wrapOutgoingMessage / unwrapReceivedMessage on another thread.
withLock rm.lock:
try:
let now = getTime()
for channelId, channel in rm.channels:
try:
# Check incoming repair buffer for expired T_resp (time to rebroadcast)
var toRebroadcast: seq[SdsMessageID] = @[]
for msgId, entry in channel.incomingRepairBuffer:
if now >= entry.minTimeRepairResp:
toRebroadcast.add(msgId)
for msgId in toRebroadcast:
let entry = channel.incomingRepairBuffer[msgId]
channel.incomingRepairBuffer.del(msgId)
if not rm.onRepairReady.isNil():
rm.onRepairReady(entry.cachedMessage, channelId)
for msgId in toRebroadcast:
let entry = channel.incomingRepairBuffer[msgId]
channel.incomingRepairBuffer.del(msgId)
if not rm.onRepairReady.isNil():
rm.onRepairReady(entry.cachedMessage, channelId)
# Drop expired outgoing repair entries past T_max
var toRemove: seq[SdsMessageID] = @[]
let tMaxDuration = rm.config.repairTMax
for msgId, entry in channel.outgoingRepairBuffer:
if now - entry.minTimeRepairReq > tMaxDuration:
toRemove.add(msgId)
for msgId in toRemove:
channel.outgoingRepairBuffer.del(msgId)
except Exception:
error "Error in repair sweep for channel",
channelId = channelId, msg = getCurrentExceptionMsg()
except Exception:
error "Error in repair sweep", msg = getCurrentExceptionMsg()
# Drop expired outgoing repair entries past T_max
var toRemove: seq[SdsMessageID] = @[]
let tMaxDuration = rm.config.repairTMax
for msgId, entry in channel.outgoingRepairBuffer:
if now - entry.minTimeRepairReq > tMaxDuration:
toRemove.add(msgId)
for msgId in toRemove:
channel.outgoingRepairBuffer.del(msgId)
except Exception:
error "Error in repair sweep for channel",
channelId = channelId, msg = getCurrentExceptionMsg()
except Exception:
error "Error in repair sweep", msg = getCurrentExceptionMsg()
proc periodicRepairSweep(
rm: ReliabilityManager