diff --git a/sds.nim b/sds.nim index 7c724d5..202b489 100644 --- a/sds.nim +++ b/sds.nim @@ -384,35 +384,38 @@ proc runRepairSweep*(rm: ReliabilityManager) {.gcsafe, raises: [].} = ## - Incoming: fires onRepairReady for expired T_resp entries and removes them ## - Outgoing: drops entries past T_max window ## Exposed so it can be driven directly in tests; also invoked by periodicRepairSweep. - try: - let now = getTime() - for channelId, channel in rm.channels: - try: - # Check incoming repair buffer for expired T_resp (time to rebroadcast) - var toRebroadcast: seq[SdsMessageID] = @[] - for msgId, entry in channel.incomingRepairBuffer: - if now >= entry.minTimeRepairResp: - toRebroadcast.add(msgId) + ## Acquires rm.lock so the repair buffers cannot be observed mid-mutation by + ## a concurrent wrapOutgoingMessage / unwrapReceivedMessage on another thread. + withLock rm.lock: + try: + let now = getTime() + for channelId, channel in rm.channels: + try: + # Check incoming repair buffer for expired T_resp (time to rebroadcast) + var toRebroadcast: seq[SdsMessageID] = @[] + for msgId, entry in channel.incomingRepairBuffer: + if now >= entry.minTimeRepairResp: + toRebroadcast.add(msgId) - for msgId in toRebroadcast: - let entry = channel.incomingRepairBuffer[msgId] - channel.incomingRepairBuffer.del(msgId) - if not rm.onRepairReady.isNil(): - rm.onRepairReady(entry.cachedMessage, channelId) + for msgId in toRebroadcast: + let entry = channel.incomingRepairBuffer[msgId] + channel.incomingRepairBuffer.del(msgId) + if not rm.onRepairReady.isNil(): + rm.onRepairReady(entry.cachedMessage, channelId) - # Drop expired outgoing repair entries past T_max - var toRemove: seq[SdsMessageID] = @[] - let tMaxDuration = rm.config.repairTMax - for msgId, entry in channel.outgoingRepairBuffer: - if now - entry.minTimeRepairReq > tMaxDuration: - toRemove.add(msgId) - for msgId in toRemove: - channel.outgoingRepairBuffer.del(msgId) - except Exception: - error "Error in repair sweep for channel", - channelId = channelId, msg = getCurrentExceptionMsg() - except Exception: - error "Error in repair sweep", msg = getCurrentExceptionMsg() + # Drop expired outgoing repair entries past T_max + var toRemove: seq[SdsMessageID] = @[] + let tMaxDuration = rm.config.repairTMax + for msgId, entry in channel.outgoingRepairBuffer: + if now - entry.minTimeRepairReq > tMaxDuration: + toRemove.add(msgId) + for msgId in toRemove: + channel.outgoingRepairBuffer.del(msgId) + except Exception: + error "Error in repair sweep for channel", + channelId = channelId, msg = getCurrentExceptionMsg() + except Exception: + error "Error in repair sweep", msg = getCurrentExceptionMsg() proc periodicRepairSweep( rm: ReliabilityManager