feat: consolidate messageHistory/messageCache/messageSenders into one OrderedTable

This commit is contained in:
darshankabariya 2026-05-01 02:05:48 +05:30
parent 3cf816da76
commit aedf8e3945
No known key found for this signature in database
GPG Key ID: 9A92CCD9899F0D22
4 changed files with 95 additions and 76 deletions

60
sds.nim
View File

@ -116,18 +116,12 @@ proc wrapOutgoingMessage*(
)
channel.bloomFilter.add(msg.messageId)
rm.addToHistory(msg.messageId, channelId)
# The full SdsMessage carries senderId and content, so a single
# addToHistory replaces the old triple-write to messageHistory,
# messageCache, and messageSenders.
rm.addToHistory(msg, channelId)
# SDS-R: record sender for future causal-history entries
if rm.participantId.len > 0:
channel.messageSenders[msg.messageId] = rm.participantId
let serialized = serializeMessage(msg)
if serialized.isOk():
# SDS-R: cache serialized bytes so we can serve our own message on repair
if channel.messageCache.len < rm.config.maxMessageHistory:
channel.messageCache[msg.messageId] = serialized.get()
return serialized
return serializeMessage(msg)
except Exception:
error "Failed to wrap message",
channelId = channelId, msg = getCurrentExceptionMsg()
@ -156,7 +150,7 @@ proc processIncomingBuffer(rm: ReliabilityManager, channelId: SdsChannelID) {.gc
continue
if msgId in channel.incomingBuffer:
rm.addToHistory(msgId, channelId)
rm.addToHistory(channel.incomingBuffer[msgId].message, channelId)
if not rm.onMessageReady.isNil():
rm.onMessageReady(msgId, channelId)
processed.incl(msgId)
@ -200,35 +194,31 @@ proc unwrapReceivedMessage*(
rm.updateLamportTimestamp(msg.lamportTimestamp, channelId)
rm.reviewAckStatus(msg)
# SDS-R: cache the raw message for potential repair responses
if channel.messageCache.len < rm.config.maxMessageHistory:
channel.messageCache[msg.messageId] = message
# SDS-R: record sender so our future causal-history entries carry it
if msg.senderId.len > 0:
channel.messageSenders[msg.messageId] = msg.senderId
# SDS-R: process incoming repair requests from this message
# SDS-R: process incoming repair requests from this message. We can only
# answer for messages we have actually delivered (i.e. that live in
# messageHistory) — buffered-but-undelivered messages are not in a state
# to confidently rebroadcast.
let now = getTime()
for repairEntry in msg.repairRequest:
# Remove from our own outgoing repair buffer (someone else is also requesting)
channel.outgoingRepairBuffer.del(repairEntry.messageId)
# Check if we can respond to this repair request
if repairEntry.messageId in channel.messageCache and
if repairEntry.messageId in channel.messageHistory and
rm.participantId.len > 0 and repairEntry.senderId.len > 0:
if isInResponseGroup(
rm.participantId, repairEntry.senderId,
repairEntry.messageId, rm.config.numResponseGroups
):
let tResp = computeTResp(
rm.participantId, repairEntry.senderId,
repairEntry.messageId, rm.config.repairTMax
)
channel.incomingRepairBuffer[repairEntry.messageId] = IncomingRepairEntry(
inHistEntry: repairEntry,
cachedMessage: channel.messageCache[repairEntry.messageId],
minTimeRepairResp: now + tResp,
)
let serialized = serializeMessage(channel.messageHistory[repairEntry.messageId])
if serialized.isOk():
let tResp = computeTResp(
rm.participantId, repairEntry.senderId,
repairEntry.messageId, rm.config.repairTMax
)
channel.incomingRepairBuffer[repairEntry.messageId] = IncomingRepairEntry(
inHistEntry: repairEntry,
cachedMessage: serialized.get(),
minTimeRepairResp: now + tResp,
)
var missingDeps = rm.checkDependencies(msg.causalHistory, channelId)
@ -242,7 +232,7 @@ proc unwrapReceivedMessage*(
channel.incomingBuffer[msg.messageId] =
IncomingMessage.init(message = msg, missingDeps = initHashSet[SdsMessageID]())
else:
rm.addToHistory(msg.messageId, channelId)
rm.addToHistory(msg, channelId)
# Unblock any buffered messages that were waiting on this one.
for pendingId, entry in channel.incomingBuffer:
if msg.messageId in entry.missingDeps:
@ -435,13 +425,11 @@ proc resetReliabilityManager*(rm: ReliabilityManager): Result[void, ReliabilityE
try:
for channelId, channel in rm.channels:
channel.lamportTimestamp = 0
channel.messageHistory.setLen(0)
channel.messageHistory.clear()
channel.outgoingBuffer.setLen(0)
channel.incomingBuffer.clear()
channel.outgoingRepairBuffer.clear()
channel.incomingRepairBuffer.clear()
channel.messageCache.clear()
channel.messageSenders.clear()
channel.bloomFilter =
RollingBloomFilter.init(rm.config.bloomFilterCapacity, rm.config.bloomFilterErrorRate)
rm.channels.clear()

View File

@ -21,11 +21,9 @@ proc cleanup*(rm: ReliabilityManager) {.raises: [].} =
for channelId, channel in rm.channels:
channel.outgoingBuffer.setLen(0)
channel.incomingBuffer.clear()
channel.messageHistory.setLen(0)
channel.messageHistory.clear()
channel.outgoingRepairBuffer.clear()
channel.incomingRepairBuffer.clear()
channel.messageCache.clear()
channel.messageSenders.clear()
rm.channels.clear()
except Exception:
error "Error during cleanup", error = getCurrentExceptionMsg()
@ -42,17 +40,25 @@ proc cleanBloomFilter*(
error = getCurrentExceptionMsg(), channelId = channelId
proc addToHistory*(
rm: ReliabilityManager, msgId: SdsMessageID, channelId: SdsChannelID
rm: ReliabilityManager, msg: SdsMessage, channelId: SdsChannelID
) {.gcsafe, raises: [].} =
## Inserts a delivered message into the channel's history map and evicts the
## eldest entries when the bound is exceeded. The full SdsMessage is kept so
## senderId is available for downstream causal-history population and the
## bytes can be re-serialized on demand to answer SDS-R repair requests.
try:
if channelId in rm.channels:
let channel = rm.channels[channelId]
channel.messageHistory.add(msgId)
if channel.messageHistory.len > rm.config.maxMessageHistory:
channel.messageHistory.delete(0)
channel.messageHistory[msg.messageId] = msg
while channel.messageHistory.len > rm.config.maxMessageHistory:
var firstKey: SdsMessageID
for k in channel.messageHistory.keys:
firstKey = k
break
channel.messageHistory.del(firstKey)
except Exception:
error "Failed to add to history",
channelId = channelId, msgId = msgId, error = getCurrentExceptionMsg()
channelId = channelId, msgId = msg.messageId, error = getCurrentExceptionMsg()
proc updateLamportTimestamp*(
rm: ReliabilityManager, msgTs: int64, channelId: SdsChannelID
@ -134,14 +140,17 @@ proc getRecentHistoryEntries*(
try:
if channelId in rm.channels:
let channel = rm.channels[channelId]
let recentMessageIds = channel.messageHistory[max(0, channel.messageHistory.len - n) .. ^1]
var orderedIds: seq[SdsMessageID] = @[]
for msgId in channel.messageHistory.keys:
orderedIds.add(msgId)
let recentMessageIds =
orderedIds[max(0, orderedIds.len - n) .. ^1]
var entries: seq[HistoryEntry] = @[]
for msgId in recentMessageIds:
var entry = HistoryEntry(messageId: msgId)
if not rm.onRetrievalHint.isNil():
entry.retrievalHint = rm.onRetrievalHint(msgId)
if msgId in channel.messageSenders:
entry.senderId = channel.messageSenders[msgId]
entry.senderId = channel.messageHistory[msgId].senderId
entries.add(entry)
return entries
else:
@ -175,7 +184,10 @@ proc getMessageHistory*(
withLock rm.lock:
try:
if channelId in rm.channels:
return rm.channels[channelId].messageHistory
var ids: seq[SdsMessageID] = @[]
for msgId in rm.channels[channelId].messageHistory.keys:
ids.add(msgId)
return ids
else:
return @[]
except Exception:
@ -246,11 +258,9 @@ proc removeChannel*(
let channel = rm.channels[channelId]
channel.outgoingBuffer.setLen(0)
channel.incomingBuffer.clear()
channel.messageHistory.setLen(0)
channel.messageHistory.clear()
channel.outgoingRepairBuffer.clear()
channel.incomingRepairBuffer.clear()
channel.messageCache.clear()
channel.messageSenders.clear()
rm.channels.del(channelId)
return ok()
except Exception:

View File

@ -1,36 +1,36 @@
import std/tables
import ./sds_message_id
import ./sds_message
import ./rolling_bloom_filter
import ./unacknowledged_message
import ./incoming_message
import ./repair_entry
export
sds_message_id, rolling_bloom_filter, unacknowledged_message, incoming_message,
repair_entry
sds_message_id, sds_message, rolling_bloom_filter, unacknowledged_message,
incoming_message, repair_entry
type ChannelContext* = ref object
lamportTimestamp*: int64
messageHistory*: seq[SdsMessageID]
messageHistory*: OrderedTable[SdsMessageID, SdsMessage]
## Single source of truth for delivered messages. Holds the deserialized
## SdsMessage (which carries senderId, lamportTimestamp, content, etc.) so
## causal history, sender lookup, and SDS-R repair responses can all be
## answered from one place. OrderedTable preserves insertion order for
## causal-history tail access and FIFO eviction at maxMessageHistory.
bloomFilter*: RollingBloomFilter
outgoingBuffer*: seq[UnacknowledgedMessage]
incomingBuffer*: Table[SdsMessageID, IncomingMessage]
## SDS-R buffers
outgoingRepairBuffer*: Table[SdsMessageID, OutgoingRepairEntry]
incomingRepairBuffer*: Table[SdsMessageID, IncomingRepairEntry]
messageCache*: Table[SdsMessageID, seq[byte]]
## Cached serialized messages for repair responses
messageSenders*: Table[SdsMessageID, SdsParticipantID]
## SDS-R: msgId -> original sender, used to populate causal-history senderId
proc new*(T: type ChannelContext, bloomFilter: RollingBloomFilter): T =
return T(
lamportTimestamp: 0,
messageHistory: @[],
messageHistory: initOrderedTable[SdsMessageID, SdsMessage](),
bloomFilter: bloomFilter,
outgoingBuffer: @[],
incomingBuffer: initTable[SdsMessageID, IncomingMessage](),
outgoingRepairBuffer: initTable[SdsMessageID, OutgoingRepairEntry](),
incomingRepairBuffer: initTable[SdsMessageID, IncomingRepairEntry](),
messageCache: initTable[SdsMessageID, seq[byte]](),
messageSenders: initTable[SdsMessageID, SdsParticipantID](),
)

View File

@ -1157,7 +1157,7 @@ suite "SDS-R: Repair Buffer Management":
let channel = rm.channels[testChannel]
# First, cache a message so we can respond to a repair request for it
# First, seed delivered history so we can respond to a repair request for it
let cachedMsg = SdsMessage(
messageId: "cached-msg",
lamportTimestamp: 1,
@ -1166,8 +1166,7 @@ suite "SDS-R: Repair Buffer Management":
content: @[byte(99)],
bloomFilter: @[],
)
let cachedBytes = serializeMessage(cachedMsg).get()
channel.messageCache["cached-msg"] = cachedBytes
channel.messageHistory["cached-msg"] = cachedMsg
# Receive a message with a repair request for "cached-msg"
let msgWithRepair = SdsMessage(
@ -1361,7 +1360,14 @@ suite "SDS-R: Lifecycle and State":
defer: rm.cleanup()
check rm.ensureChannel(testChannel).isOk()
let channel = rm.channels[testChannel]
channel.messageCache["m-wanted"] = @[byte(99), 99, 99]
channel.messageHistory["m-wanted"] = SdsMessage(
messageId: "m-wanted",
lamportTimestamp: 1,
causalHistory: @[],
channelId: testChannel,
content: @[byte(99), 99, 99],
bloomFilter: @[],
)
rm.setCallbacks(
proc(msgId: SdsMessageID, ch: SdsChannelID) {.gcsafe.} = discard,
@ -1381,8 +1387,11 @@ suite "SDS-R: Lifecycle and State":
discard rm.unwrapReceivedMessage(serializeMessage(msg).get())
check "m-wanted" notin channel.incomingRepairBuffer
test "wrapOutgoingMessage caches bytes and records sender":
test "wrapOutgoingMessage records the message in history with our senderId":
# Proves Bug 1 is fixed — the original sender can serve her own message.
# In the consolidated history model, the SdsMessage itself carries senderId
# and can be re-serialized on demand for repair, so a single membership
# check + senderId read covers both halves of the original assertion.
let rm = newReliabilityManager(participantId = "alice").get()
defer: rm.cleanup()
check rm.ensureChannel(testChannel).isOk()
@ -1390,10 +1399,9 @@ suite "SDS-R: Lifecycle and State":
discard rm.wrapOutgoingMessage(@[byte(1), 2, 3], "m1", testChannel)
let channel = rm.channels[testChannel]
check:
"m1" in channel.messageCache
channel.messageCache["m1"].len > 0
"m1" in channel.messageSenders
channel.messageSenders["m1"] == "alice"
"m1" in channel.messageHistory
channel.messageHistory["m1"].senderId == "alice"
channel.messageHistory["m1"].content == @[byte(1), 2, 3]
test "getRecentHistoryEntries carries senderId for own messages":
let rm = newReliabilityManager(participantId = "alice").get()
@ -1423,8 +1431,15 @@ suite "SDS-R: Lifecycle and State":
cachedMessage: @[byte(1)],
minTimeRepairResp: getTime(),
)
channel.messageCache["c"] = @[byte(2)]
channel.messageSenders["c"] = "someone"
channel.messageHistory["c"] = SdsMessage(
messageId: "c",
lamportTimestamp: 1,
causalHistory: @[],
channelId: testChannel,
content: @[byte(2)],
bloomFilter: @[],
senderId: "someone",
)
check rm.resetReliabilityManager().isOk()
check rm.ensureChannel(testChannel).isOk()
@ -1432,8 +1447,7 @@ suite "SDS-R: Lifecycle and State":
check:
ch2.outgoingRepairBuffer.len == 0
ch2.incomingRepairBuffer.len == 0
ch2.messageCache.len == 0
ch2.messageSenders.len == 0
ch2.messageHistory.len == 0
test "SDS-R state is isolated per channel":
let rm = newReliabilityManager(participantId = "alice").get()
@ -1475,7 +1489,14 @@ suite "SDS-R: Lifecycle and State":
)
# Carol already has M1 in history and has a pending incomingRepairBuffer entry
channel.messageHistory.add("m1")
channel.messageHistory["m1"] = SdsMessage(
messageId: "m1",
lamportTimestamp: 1,
causalHistory: @[],
channelId: testChannel,
content: @[byte(1)],
bloomFilter: @[],
)
channel.incomingRepairBuffer["m1"] = IncomingRepairEntry(
inHistEntry: HistoryEntry(messageId: "m1", senderId: "alice"),
cachedMessage: @[byte(1)],
@ -1799,7 +1820,7 @@ suite "SDS-R: Multi-Participant Integration":
let bob = bus.addPeer("bob", cfg)
let carol = bus.addPeer("carol", cfg)
# Both Bob and Carol receive the original M1 (so both have it in messageCache).
# Both Bob and Carol receive the original M1 (so both have it in messageHistory).
bus.broadcast("alice", @[byte(1)], chosenMsg)
# Now Dave arrives: build a fake requester message manually so its repair_request