diff --git a/sds.nim b/sds.nim index f48b531..6df906d 100644 --- a/sds.nim +++ b/sds.nim @@ -116,18 +116,12 @@ proc wrapOutgoingMessage*( ) channel.bloomFilter.add(msg.messageId) - rm.addToHistory(msg.messageId, channelId) + # The full SdsMessage carries senderId and content, so a single + # addToHistory replaces the old triple-write to messageHistory, + # messageCache, and messageSenders. + rm.addToHistory(msg, channelId) - # SDS-R: record sender for future causal-history entries - if rm.participantId.len > 0: - channel.messageSenders[msg.messageId] = rm.participantId - - let serialized = serializeMessage(msg) - if serialized.isOk(): - # SDS-R: cache serialized bytes so we can serve our own message on repair - if channel.messageCache.len < rm.config.maxMessageHistory: - channel.messageCache[msg.messageId] = serialized.get() - return serialized + return serializeMessage(msg) except Exception: error "Failed to wrap message", channelId = channelId, msg = getCurrentExceptionMsg() @@ -156,7 +150,7 @@ proc processIncomingBuffer(rm: ReliabilityManager, channelId: SdsChannelID) {.gc continue if msgId in channel.incomingBuffer: - rm.addToHistory(msgId, channelId) + rm.addToHistory(channel.incomingBuffer[msgId].message, channelId) if not rm.onMessageReady.isNil(): rm.onMessageReady(msgId, channelId) processed.incl(msgId) @@ -200,35 +194,31 @@ proc unwrapReceivedMessage*( rm.updateLamportTimestamp(msg.lamportTimestamp, channelId) rm.reviewAckStatus(msg) - # SDS-R: cache the raw message for potential repair responses - if channel.messageCache.len < rm.config.maxMessageHistory: - channel.messageCache[msg.messageId] = message - - # SDS-R: record sender so our future causal-history entries carry it - if msg.senderId.len > 0: - channel.messageSenders[msg.messageId] = msg.senderId - - # SDS-R: process incoming repair requests from this message + # SDS-R: process incoming repair requests from this message. We can only + # answer for messages we have actually delivered (i.e. that live in + # messageHistory) — buffered-but-undelivered messages are not in a state + # to confidently rebroadcast. let now = getTime() for repairEntry in msg.repairRequest: # Remove from our own outgoing repair buffer (someone else is also requesting) channel.outgoingRepairBuffer.del(repairEntry.messageId) - # Check if we can respond to this repair request - if repairEntry.messageId in channel.messageCache and + if repairEntry.messageId in channel.messageHistory and rm.participantId.len > 0 and repairEntry.senderId.len > 0: if isInResponseGroup( rm.participantId, repairEntry.senderId, repairEntry.messageId, rm.config.numResponseGroups ): - let tResp = computeTResp( - rm.participantId, repairEntry.senderId, - repairEntry.messageId, rm.config.repairTMax - ) - channel.incomingRepairBuffer[repairEntry.messageId] = IncomingRepairEntry( - inHistEntry: repairEntry, - cachedMessage: channel.messageCache[repairEntry.messageId], - minTimeRepairResp: now + tResp, - ) + let serialized = serializeMessage(channel.messageHistory[repairEntry.messageId]) + if serialized.isOk(): + let tResp = computeTResp( + rm.participantId, repairEntry.senderId, + repairEntry.messageId, rm.config.repairTMax + ) + channel.incomingRepairBuffer[repairEntry.messageId] = IncomingRepairEntry( + inHistEntry: repairEntry, + cachedMessage: serialized.get(), + minTimeRepairResp: now + tResp, + ) var missingDeps = rm.checkDependencies(msg.causalHistory, channelId) @@ -242,7 +232,7 @@ proc unwrapReceivedMessage*( channel.incomingBuffer[msg.messageId] = IncomingMessage.init(message = msg, missingDeps = initHashSet[SdsMessageID]()) else: - rm.addToHistory(msg.messageId, channelId) + rm.addToHistory(msg, channelId) # Unblock any buffered messages that were waiting on this one. for pendingId, entry in channel.incomingBuffer: if msg.messageId in entry.missingDeps: @@ -435,13 +425,11 @@ proc resetReliabilityManager*(rm: ReliabilityManager): Result[void, ReliabilityE try: for channelId, channel in rm.channels: channel.lamportTimestamp = 0 - channel.messageHistory.setLen(0) + channel.messageHistory.clear() channel.outgoingBuffer.setLen(0) channel.incomingBuffer.clear() channel.outgoingRepairBuffer.clear() channel.incomingRepairBuffer.clear() - channel.messageCache.clear() - channel.messageSenders.clear() channel.bloomFilter = RollingBloomFilter.init(rm.config.bloomFilterCapacity, rm.config.bloomFilterErrorRate) rm.channels.clear() diff --git a/sds/sds_utils.nim b/sds/sds_utils.nim index 2ff9ad4..eefae43 100644 --- a/sds/sds_utils.nim +++ b/sds/sds_utils.nim @@ -21,11 +21,9 @@ proc cleanup*(rm: ReliabilityManager) {.raises: [].} = for channelId, channel in rm.channels: channel.outgoingBuffer.setLen(0) channel.incomingBuffer.clear() - channel.messageHistory.setLen(0) + channel.messageHistory.clear() channel.outgoingRepairBuffer.clear() channel.incomingRepairBuffer.clear() - channel.messageCache.clear() - channel.messageSenders.clear() rm.channels.clear() except Exception: error "Error during cleanup", error = getCurrentExceptionMsg() @@ -42,17 +40,25 @@ proc cleanBloomFilter*( error = getCurrentExceptionMsg(), channelId = channelId proc addToHistory*( - rm: ReliabilityManager, msgId: SdsMessageID, channelId: SdsChannelID + rm: ReliabilityManager, msg: SdsMessage, channelId: SdsChannelID ) {.gcsafe, raises: [].} = + ## Inserts a delivered message into the channel's history map and evicts the + ## eldest entries when the bound is exceeded. The full SdsMessage is kept so + ## senderId is available for downstream causal-history population and the + ## bytes can be re-serialized on demand to answer SDS-R repair requests. try: if channelId in rm.channels: let channel = rm.channels[channelId] - channel.messageHistory.add(msgId) - if channel.messageHistory.len > rm.config.maxMessageHistory: - channel.messageHistory.delete(0) + channel.messageHistory[msg.messageId] = msg + while channel.messageHistory.len > rm.config.maxMessageHistory: + var firstKey: SdsMessageID + for k in channel.messageHistory.keys: + firstKey = k + break + channel.messageHistory.del(firstKey) except Exception: error "Failed to add to history", - channelId = channelId, msgId = msgId, error = getCurrentExceptionMsg() + channelId = channelId, msgId = msg.messageId, error = getCurrentExceptionMsg() proc updateLamportTimestamp*( rm: ReliabilityManager, msgTs: int64, channelId: SdsChannelID @@ -134,14 +140,17 @@ proc getRecentHistoryEntries*( try: if channelId in rm.channels: let channel = rm.channels[channelId] - let recentMessageIds = channel.messageHistory[max(0, channel.messageHistory.len - n) .. ^1] + var orderedIds: seq[SdsMessageID] = @[] + for msgId in channel.messageHistory.keys: + orderedIds.add(msgId) + let recentMessageIds = + orderedIds[max(0, orderedIds.len - n) .. ^1] var entries: seq[HistoryEntry] = @[] for msgId in recentMessageIds: var entry = HistoryEntry(messageId: msgId) if not rm.onRetrievalHint.isNil(): entry.retrievalHint = rm.onRetrievalHint(msgId) - if msgId in channel.messageSenders: - entry.senderId = channel.messageSenders[msgId] + entry.senderId = channel.messageHistory[msgId].senderId entries.add(entry) return entries else: @@ -175,7 +184,10 @@ proc getMessageHistory*( withLock rm.lock: try: if channelId in rm.channels: - return rm.channels[channelId].messageHistory + var ids: seq[SdsMessageID] = @[] + for msgId in rm.channels[channelId].messageHistory.keys: + ids.add(msgId) + return ids else: return @[] except Exception: @@ -246,11 +258,9 @@ proc removeChannel*( let channel = rm.channels[channelId] channel.outgoingBuffer.setLen(0) channel.incomingBuffer.clear() - channel.messageHistory.setLen(0) + channel.messageHistory.clear() channel.outgoingRepairBuffer.clear() channel.incomingRepairBuffer.clear() - channel.messageCache.clear() - channel.messageSenders.clear() rm.channels.del(channelId) return ok() except Exception: diff --git a/sds/types/channel_context.nim b/sds/types/channel_context.nim index 2f61584..3f7bee6 100644 --- a/sds/types/channel_context.nim +++ b/sds/types/channel_context.nim @@ -1,36 +1,36 @@ import std/tables import ./sds_message_id +import ./sds_message import ./rolling_bloom_filter import ./unacknowledged_message import ./incoming_message import ./repair_entry export - sds_message_id, rolling_bloom_filter, unacknowledged_message, incoming_message, - repair_entry + sds_message_id, sds_message, rolling_bloom_filter, unacknowledged_message, + incoming_message, repair_entry type ChannelContext* = ref object lamportTimestamp*: int64 - messageHistory*: seq[SdsMessageID] + messageHistory*: OrderedTable[SdsMessageID, SdsMessage] + ## Single source of truth for delivered messages. Holds the deserialized + ## SdsMessage (which carries senderId, lamportTimestamp, content, etc.) so + ## causal history, sender lookup, and SDS-R repair responses can all be + ## answered from one place. OrderedTable preserves insertion order for + ## causal-history tail access and FIFO eviction at maxMessageHistory. bloomFilter*: RollingBloomFilter outgoingBuffer*: seq[UnacknowledgedMessage] incomingBuffer*: Table[SdsMessageID, IncomingMessage] ## SDS-R buffers outgoingRepairBuffer*: Table[SdsMessageID, OutgoingRepairEntry] incomingRepairBuffer*: Table[SdsMessageID, IncomingRepairEntry] - messageCache*: Table[SdsMessageID, seq[byte]] - ## Cached serialized messages for repair responses - messageSenders*: Table[SdsMessageID, SdsParticipantID] - ## SDS-R: msgId -> original sender, used to populate causal-history senderId proc new*(T: type ChannelContext, bloomFilter: RollingBloomFilter): T = return T( lamportTimestamp: 0, - messageHistory: @[], + messageHistory: initOrderedTable[SdsMessageID, SdsMessage](), bloomFilter: bloomFilter, outgoingBuffer: @[], incomingBuffer: initTable[SdsMessageID, IncomingMessage](), outgoingRepairBuffer: initTable[SdsMessageID, OutgoingRepairEntry](), incomingRepairBuffer: initTable[SdsMessageID, IncomingRepairEntry](), - messageCache: initTable[SdsMessageID, seq[byte]](), - messageSenders: initTable[SdsMessageID, SdsParticipantID](), ) diff --git a/tests/test_reliability.nim b/tests/test_reliability.nim index 5528951..6346395 100644 --- a/tests/test_reliability.nim +++ b/tests/test_reliability.nim @@ -1157,7 +1157,7 @@ suite "SDS-R: Repair Buffer Management": let channel = rm.channels[testChannel] - # First, cache a message so we can respond to a repair request for it + # First, seed delivered history so we can respond to a repair request for it let cachedMsg = SdsMessage( messageId: "cached-msg", lamportTimestamp: 1, @@ -1166,8 +1166,7 @@ suite "SDS-R: Repair Buffer Management": content: @[byte(99)], bloomFilter: @[], ) - let cachedBytes = serializeMessage(cachedMsg).get() - channel.messageCache["cached-msg"] = cachedBytes + channel.messageHistory["cached-msg"] = cachedMsg # Receive a message with a repair request for "cached-msg" let msgWithRepair = SdsMessage( @@ -1361,7 +1360,14 @@ suite "SDS-R: Lifecycle and State": defer: rm.cleanup() check rm.ensureChannel(testChannel).isOk() let channel = rm.channels[testChannel] - channel.messageCache["m-wanted"] = @[byte(99), 99, 99] + channel.messageHistory["m-wanted"] = SdsMessage( + messageId: "m-wanted", + lamportTimestamp: 1, + causalHistory: @[], + channelId: testChannel, + content: @[byte(99), 99, 99], + bloomFilter: @[], + ) rm.setCallbacks( proc(msgId: SdsMessageID, ch: SdsChannelID) {.gcsafe.} = discard, @@ -1381,8 +1387,11 @@ suite "SDS-R: Lifecycle and State": discard rm.unwrapReceivedMessage(serializeMessage(msg).get()) check "m-wanted" notin channel.incomingRepairBuffer - test "wrapOutgoingMessage caches bytes and records sender": + test "wrapOutgoingMessage records the message in history with our senderId": # Proves Bug 1 is fixed — the original sender can serve her own message. + # In the consolidated history model, the SdsMessage itself carries senderId + # and can be re-serialized on demand for repair, so a single membership + # check + senderId read covers both halves of the original assertion. let rm = newReliabilityManager(participantId = "alice").get() defer: rm.cleanup() check rm.ensureChannel(testChannel).isOk() @@ -1390,10 +1399,9 @@ suite "SDS-R: Lifecycle and State": discard rm.wrapOutgoingMessage(@[byte(1), 2, 3], "m1", testChannel) let channel = rm.channels[testChannel] check: - "m1" in channel.messageCache - channel.messageCache["m1"].len > 0 - "m1" in channel.messageSenders - channel.messageSenders["m1"] == "alice" + "m1" in channel.messageHistory + channel.messageHistory["m1"].senderId == "alice" + channel.messageHistory["m1"].content == @[byte(1), 2, 3] test "getRecentHistoryEntries carries senderId for own messages": let rm = newReliabilityManager(participantId = "alice").get() @@ -1423,8 +1431,15 @@ suite "SDS-R: Lifecycle and State": cachedMessage: @[byte(1)], minTimeRepairResp: getTime(), ) - channel.messageCache["c"] = @[byte(2)] - channel.messageSenders["c"] = "someone" + channel.messageHistory["c"] = SdsMessage( + messageId: "c", + lamportTimestamp: 1, + causalHistory: @[], + channelId: testChannel, + content: @[byte(2)], + bloomFilter: @[], + senderId: "someone", + ) check rm.resetReliabilityManager().isOk() check rm.ensureChannel(testChannel).isOk() @@ -1432,8 +1447,7 @@ suite "SDS-R: Lifecycle and State": check: ch2.outgoingRepairBuffer.len == 0 ch2.incomingRepairBuffer.len == 0 - ch2.messageCache.len == 0 - ch2.messageSenders.len == 0 + ch2.messageHistory.len == 0 test "SDS-R state is isolated per channel": let rm = newReliabilityManager(participantId = "alice").get() @@ -1475,7 +1489,14 @@ suite "SDS-R: Lifecycle and State": ) # Carol already has M1 in history and has a pending incomingRepairBuffer entry - channel.messageHistory.add("m1") + channel.messageHistory["m1"] = SdsMessage( + messageId: "m1", + lamportTimestamp: 1, + causalHistory: @[], + channelId: testChannel, + content: @[byte(1)], + bloomFilter: @[], + ) channel.incomingRepairBuffer["m1"] = IncomingRepairEntry( inHistEntry: HistoryEntry(messageId: "m1", senderId: "alice"), cachedMessage: @[byte(1)], @@ -1799,7 +1820,7 @@ suite "SDS-R: Multi-Participant Integration": let bob = bus.addPeer("bob", cfg) let carol = bus.addPeer("carol", cfg) - # Both Bob and Carol receive the original M1 (so both have it in messageCache). + # Both Bob and Carol receive the original M1 (so both have it in messageHistory). bus.broadcast("alice", @[byte(1)], chosenMsg) # Now Dave arrives: build a fake requester message manually so its repair_request