mirror of
https://github.com/logos-messaging/nim-sds.git
synced 2026-06-09 18:59:58 +00:00
feat: restore {.requiresInit.} on SdsMessage and migrate test sites
This commit is contained in:
parent
0516b01542
commit
8cbcd756bb
@ -2,7 +2,7 @@ import ./sds_message_id
|
||||
import ./history_entry
|
||||
export sds_message_id, history_entry
|
||||
|
||||
type SdsMessage* = object
|
||||
type SdsMessage* {.requiresInit.} = object
|
||||
messageId*: SdsMessageID
|
||||
lamportTimestamp*: int64
|
||||
causalHistory*: seq[HistoryEntry]
|
||||
|
||||
@ -82,22 +82,22 @@ suite "Core Operations":
|
||||
|
||||
test "message ordering":
|
||||
# Create messages with different timestamps
|
||||
let msg1 = SdsMessage(
|
||||
messageId: "msg1",
|
||||
lamportTimestamp: 1,
|
||||
causalHistory: @[],
|
||||
channelId: testChannel,
|
||||
content: @[byte(1)],
|
||||
bloomFilter: @[],
|
||||
let msg1 = SdsMessage.init(
|
||||
messageId = "msg1",
|
||||
lamportTimestamp = 1,
|
||||
causalHistory = @[],
|
||||
channelId = testChannel,
|
||||
content = @[byte(1)],
|
||||
bloomFilter = @[],
|
||||
)
|
||||
|
||||
let msg2 = SdsMessage(
|
||||
messageId: "msg2",
|
||||
lamportTimestamp: 5,
|
||||
causalHistory: @[],
|
||||
channelId: testChannel,
|
||||
content: @[byte(2)],
|
||||
bloomFilter: @[],
|
||||
let msg2 = SdsMessage.init(
|
||||
messageId = "msg2",
|
||||
lamportTimestamp = 5,
|
||||
causalHistory = @[],
|
||||
channelId = testChannel,
|
||||
content = @[byte(2)],
|
||||
bloomFilter = @[],
|
||||
)
|
||||
|
||||
let serialized1 = serializeMessage(msg1)
|
||||
@ -148,22 +148,22 @@ suite "Reliability Mechanisms":
|
||||
let id3 = "msg3"
|
||||
|
||||
# Create messages with dependencies
|
||||
let msg2 = SdsMessage(
|
||||
messageId: id2,
|
||||
lamportTimestamp: 2,
|
||||
causalHistory: toCausalHistory(@[id1]), # msg2 depends on msg1
|
||||
channelId: testChannel,
|
||||
content: @[byte(2)],
|
||||
bloomFilter: @[],
|
||||
let msg2 = SdsMessage.init(
|
||||
messageId = id2,
|
||||
lamportTimestamp = 2,
|
||||
causalHistory = toCausalHistory(@[id1]), # msg2 depends on msg1
|
||||
channelId = testChannel,
|
||||
content = @[byte(2)],
|
||||
bloomFilter = @[],
|
||||
)
|
||||
|
||||
let msg3 = SdsMessage(
|
||||
messageId: id3,
|
||||
lamportTimestamp: 3,
|
||||
causalHistory: toCausalHistory(@[id1, id2]), # msg3 depends on both msg1 and msg2
|
||||
channelId: testChannel,
|
||||
content: @[byte(3)],
|
||||
bloomFilter: @[],
|
||||
let msg3 = SdsMessage.init(
|
||||
messageId = id3,
|
||||
lamportTimestamp = 3,
|
||||
causalHistory = toCausalHistory(@[id1, id2]), # msg3 depends on both msg1 and msg2
|
||||
channelId = testChannel,
|
||||
content = @[byte(3)],
|
||||
bloomFilter = @[],
|
||||
)
|
||||
|
||||
let serialized2 = serializeMessage(msg2)
|
||||
@ -230,13 +230,13 @@ suite "Reliability Mechanisms":
|
||||
# missing from messageHistory. The manager must still report it missing.
|
||||
rm.channels[testChannel].bloomFilter.add(id1)
|
||||
|
||||
let msg2 = SdsMessage(
|
||||
messageId: id2,
|
||||
lamportTimestamp: 2,
|
||||
causalHistory: toCausalHistory(@[id1]),
|
||||
channelId: testChannel,
|
||||
content: @[byte(2)],
|
||||
bloomFilter: @[],
|
||||
let msg2 = SdsMessage.init(
|
||||
messageId = id2,
|
||||
lamportTimestamp = 2,
|
||||
causalHistory = toCausalHistory(@[id1]),
|
||||
channelId = testChannel,
|
||||
content = @[byte(2)],
|
||||
bloomFilter = @[],
|
||||
)
|
||||
let serialized2 = serializeMessage(msg2)
|
||||
check serialized2.isOk()
|
||||
@ -272,13 +272,13 @@ suite "Reliability Mechanisms":
|
||||
check wrap1.isOk()
|
||||
|
||||
# Create a message that has our message in causal history
|
||||
let msg2 = SdsMessage(
|
||||
messageId: "msg2",
|
||||
lamportTimestamp: rm.channels[testChannel].lamportTimestamp + 1,
|
||||
causalHistory: toCausalHistory(@[id1]), # Include our message in causal history
|
||||
channelId: testChannel,
|
||||
content: @[byte(2)],
|
||||
bloomFilter: @[] # Test with an empty bloom filter
|
||||
let msg2 = SdsMessage.init(
|
||||
messageId = "msg2",
|
||||
lamportTimestamp = rm.channels[testChannel].lamportTimestamp + 1,
|
||||
causalHistory = toCausalHistory(@[id1]), # Include our message in causal history
|
||||
channelId = testChannel,
|
||||
content = @[byte(2)],
|
||||
bloomFilter = @[] # Test with an empty bloom filter
|
||||
,
|
||||
)
|
||||
|
||||
@ -316,13 +316,13 @@ suite "Reliability Mechanisms":
|
||||
let wrap1 = rm.wrapOutgoingMessage(msg1, id1, testChannel)
|
||||
check wrap1.isOk()
|
||||
|
||||
let msg2 = SdsMessage(
|
||||
messageId: "msg2",
|
||||
lamportTimestamp: rm.channels[testChannel].lamportTimestamp + 1,
|
||||
causalHistory: toCausalHistory(@[id1]),
|
||||
channelId: testChannel,
|
||||
content: @[byte(2)],
|
||||
bloomFilter: @[],
|
||||
let msg2 = SdsMessage.init(
|
||||
messageId = "msg2",
|
||||
lamportTimestamp = rm.channels[testChannel].lamportTimestamp + 1,
|
||||
causalHistory = toCausalHistory(@[id1]),
|
||||
channelId = testChannel,
|
||||
content = @[byte(2)],
|
||||
bloomFilter = @[],
|
||||
)
|
||||
let serializedMsg2 = serializeMessage(msg2)
|
||||
check serializedMsg2.isOk()
|
||||
@ -360,13 +360,13 @@ suite "Reliability Mechanisms":
|
||||
let bfResult = serializeBloomFilter(otherPartyBloomFilter.filter)
|
||||
check bfResult.isOk()
|
||||
|
||||
let msg2 = SdsMessage(
|
||||
messageId: "msg2",
|
||||
lamportTimestamp: rm.channels[testChannel].lamportTimestamp + 1,
|
||||
causalHistory: @[], # Empty causal history as we're using bloom filter
|
||||
channelId: testChannel,
|
||||
content: @[byte(2)],
|
||||
bloomFilter: bfResult.get(),
|
||||
let msg2 = SdsMessage.init(
|
||||
messageId = "msg2",
|
||||
lamportTimestamp = rm.channels[testChannel].lamportTimestamp + 1,
|
||||
causalHistory = @[], # Empty causal history as we're using bloom filter
|
||||
channelId = testChannel,
|
||||
content = @[byte(2)],
|
||||
bloomFilter = bfResult.get(),
|
||||
)
|
||||
|
||||
let serializedMsg2 = serializeMessage(msg2)
|
||||
@ -406,13 +406,13 @@ suite "Reliability Mechanisms":
|
||||
let bfResult = serializeBloomFilter(otherPartyBloomFilter.filter)
|
||||
check bfResult.isOk()
|
||||
|
||||
let msg2 = SdsMessage(
|
||||
messageId: "msg2",
|
||||
lamportTimestamp: rm.channels[testChannel].lamportTimestamp + 1,
|
||||
causalHistory: @[],
|
||||
channelId: testChannel,
|
||||
content: @[byte(2)],
|
||||
bloomFilter: bfResult.get(),
|
||||
let msg2 = SdsMessage.init(
|
||||
messageId = "msg2",
|
||||
lamportTimestamp = rm.channels[testChannel].lamportTimestamp + 1,
|
||||
causalHistory = @[],
|
||||
channelId = testChannel,
|
||||
content = @[byte(2)],
|
||||
bloomFilter = bfResult.get(),
|
||||
)
|
||||
let serializedMsg2 = serializeMessage(msg2)
|
||||
check serializedMsg2.isOk()
|
||||
@ -429,13 +429,13 @@ suite "Reliability Mechanisms":
|
||||
rm.seedBloom(testChannel, 40, prefix = "delivered-")
|
||||
|
||||
# Plus a real delivery so we exercise the bloom-on-delivery path too.
|
||||
let incoming = SdsMessage(
|
||||
messageId: "incoming-1",
|
||||
lamportTimestamp: 1,
|
||||
causalHistory: @[],
|
||||
channelId: testChannel,
|
||||
content: @[byte(9)],
|
||||
bloomFilter: @[],
|
||||
let incoming = SdsMessage.init(
|
||||
messageId = "incoming-1",
|
||||
lamportTimestamp = 1,
|
||||
causalHistory = @[],
|
||||
channelId = testChannel,
|
||||
content = @[byte(9)],
|
||||
bloomFilter = @[],
|
||||
)
|
||||
let serIncoming = serializeMessage(incoming)
|
||||
check serIncoming.isOk()
|
||||
@ -497,13 +497,13 @@ suite "Reliability Mechanisms":
|
||||
check unwrappedMsg2.causalHistory[0].retrievalHint == cast[seq[byte]]("hint:" & id1)
|
||||
|
||||
# Create a message with a missing dependency (no retrieval hint)
|
||||
let msg3 = SdsMessage(
|
||||
messageId: "msg3",
|
||||
lamportTimestamp: 3,
|
||||
causalHistory: toCausalHistory(@["missing-dep"]),
|
||||
channelId: testChannel,
|
||||
content: @[byte(3)],
|
||||
bloomFilter: @[],
|
||||
let msg3 = SdsMessage.init(
|
||||
messageId = "msg3",
|
||||
lamportTimestamp = 3,
|
||||
causalHistory = toCausalHistory(@["missing-dep"]),
|
||||
channelId = testChannel,
|
||||
content = @[byte(3)],
|
||||
bloomFilter = @[],
|
||||
)
|
||||
let serialized3 = serializeMessage(msg3).get()
|
||||
let unwrapResult3 = rm.unwrapReceivedMessage(serialized3)
|
||||
@ -515,13 +515,13 @@ suite "Reliability Mechanisms":
|
||||
check missingDeps3[0].retrievalHint.len == 0
|
||||
|
||||
# Test with a message that HAS a retrieval hint from remote
|
||||
let msg4 = SdsMessage(
|
||||
messageId: "msg4",
|
||||
lamportTimestamp: 4,
|
||||
causalHistory: @[newHistoryEntry("another-missing", cast[seq[byte]]("remote-hint"))],
|
||||
channelId: testChannel,
|
||||
content: @[byte(4)],
|
||||
bloomFilter: @[],
|
||||
let msg4 = SdsMessage.init(
|
||||
messageId = "msg4",
|
||||
lamportTimestamp = 4,
|
||||
causalHistory = @[newHistoryEntry("another-missing", cast[seq[byte]]("remote-hint"))],
|
||||
channelId = testChannel,
|
||||
content = @[byte(4)],
|
||||
bloomFilter = @[],
|
||||
)
|
||||
let serialized4 = serializeMessage(msg4).get()
|
||||
let unwrapResult4 = rm.unwrapReceivedMessage(serialized4)
|
||||
@ -569,13 +569,13 @@ suite "Periodic Tasks & Buffer Management":
|
||||
check outBuffer.len == 6
|
||||
|
||||
# Create message that acknowledges some messages
|
||||
let ackMsg = SdsMessage(
|
||||
messageId: "ack1",
|
||||
lamportTimestamp: rm.channels[testChannel].lamportTimestamp + 1,
|
||||
causalHistory: toCausalHistory(@["msg0", "msg2", "msg4"]),
|
||||
channelId: testChannel,
|
||||
content: @[byte(100)],
|
||||
bloomFilter: @[],
|
||||
let ackMsg = SdsMessage.init(
|
||||
messageId = "ack1",
|
||||
lamportTimestamp = rm.channels[testChannel].lamportTimestamp + 1,
|
||||
causalHistory = toCausalHistory(@["msg0", "msg2", "msg4"]),
|
||||
channelId = testChannel,
|
||||
content = @[byte(100)],
|
||||
bloomFilter = @[],
|
||||
)
|
||||
|
||||
let serializedAck = serializeMessage(ackMsg)
|
||||
@ -698,13 +698,13 @@ suite "Special Cases Handling":
|
||||
history[^1] == "msg" & $(rm.config.maxMessageHistory + 5)
|
||||
|
||||
test "invalid bloom filter handling":
|
||||
let msgInvalid = SdsMessage(
|
||||
messageId: "invalid-bf",
|
||||
lamportTimestamp: 1,
|
||||
causalHistory: toCausalHistory(@[]),
|
||||
channelId: testChannel,
|
||||
content: @[byte(1)],
|
||||
bloomFilter: @[1.byte, 2.byte, 3.byte] # Invalid filter data
|
||||
let msgInvalid = SdsMessage.init(
|
||||
messageId = "invalid-bf",
|
||||
lamportTimestamp = 1,
|
||||
causalHistory = toCausalHistory(@[]),
|
||||
channelId = testChannel,
|
||||
content = @[byte(1)],
|
||||
bloomFilter = @[1.byte, 2.byte, 3.byte] # Invalid filter data
|
||||
,
|
||||
)
|
||||
|
||||
@ -729,13 +729,13 @@ suite "Special Cases Handling":
|
||||
)
|
||||
|
||||
# Create and process a message
|
||||
let msg = SdsMessage(
|
||||
messageId: "dup-msg",
|
||||
lamportTimestamp: 1,
|
||||
causalHistory: toCausalHistory(@[]),
|
||||
channelId: testChannel,
|
||||
content: @[byte(1)],
|
||||
bloomFilter: @[],
|
||||
let msg = SdsMessage.init(
|
||||
messageId = "dup-msg",
|
||||
lamportTimestamp = 1,
|
||||
causalHistory = toCausalHistory(@[]),
|
||||
channelId = testChannel,
|
||||
content = @[byte(1)],
|
||||
bloomFilter = @[],
|
||||
)
|
||||
|
||||
let serialized = serializeMessage(msg)
|
||||
@ -930,22 +930,22 @@ suite "Multi-Channel ReliabilityManager Tests":
|
||||
|
||||
# Create acknowledgment messages that include our message IDs in causal history
|
||||
# to trigger sent callbacks
|
||||
let ackMsg1 = SdsMessage(
|
||||
messageId: "ack1",
|
||||
lamportTimestamp: rm.channels[channel1].lamportTimestamp + 1,
|
||||
causalHistory: toCausalHistory(@[msgId1]), # Acknowledge msg1
|
||||
channelId: channel1,
|
||||
content: @[byte(100)],
|
||||
bloomFilter: @[],
|
||||
let ackMsg1 = SdsMessage.init(
|
||||
messageId = "ack1",
|
||||
lamportTimestamp = rm.channels[channel1].lamportTimestamp + 1,
|
||||
causalHistory = toCausalHistory(@[msgId1]), # Acknowledge msg1
|
||||
channelId = channel1,
|
||||
content = @[byte(100)],
|
||||
bloomFilter = @[],
|
||||
)
|
||||
|
||||
let ackMsg2 = SdsMessage(
|
||||
messageId: "ack2",
|
||||
lamportTimestamp: rm.channels[channel2].lamportTimestamp + 1,
|
||||
causalHistory: toCausalHistory(@[msgId2]), # Acknowledge msg2
|
||||
channelId: channel2,
|
||||
content: @[byte(101)],
|
||||
bloomFilter: @[],
|
||||
let ackMsg2 = SdsMessage.init(
|
||||
messageId = "ack2",
|
||||
lamportTimestamp = rm.channels[channel2].lamportTimestamp + 1,
|
||||
causalHistory = toCausalHistory(@[msgId2]), # Acknowledge msg2
|
||||
channelId = channel2,
|
||||
content = @[byte(101)],
|
||||
bloomFilter = @[],
|
||||
)
|
||||
|
||||
let serializedAck1 = serializeMessage(ackMsg1)
|
||||
@ -1016,7 +1016,7 @@ suite "SDS-R: Computation Functions":
|
||||
let d = computeTResp("other-node", "sender1", "msg1", initDuration(seconds = 300))
|
||||
check d.inMilliseconds >= 0
|
||||
|
||||
test "isInResponseGroup all in same group when numGroups=1":
|
||||
test "isInResponseGroup all in same group when numGroups =1":
|
||||
check isInResponseGroup("p1", "sender1", "msg1", 1) == true
|
||||
check isInResponseGroup("p2", "sender1", "msg1", 1) == true
|
||||
|
||||
@ -1051,13 +1051,13 @@ suite "SDS-R: Repair Buffer Management":
|
||||
)
|
||||
|
||||
# Create a message with a missing dependency
|
||||
let msg = SdsMessage(
|
||||
messageId: "msg2",
|
||||
lamportTimestamp: 2,
|
||||
causalHistory: @[HistoryEntry(messageId: "msg1", senderId: "sender-A")],
|
||||
channelId: testChannel,
|
||||
content: @[byte(2)],
|
||||
bloomFilter: @[],
|
||||
let msg = SdsMessage.init(
|
||||
messageId = "msg2",
|
||||
lamportTimestamp = 2,
|
||||
causalHistory = @[HistoryEntry(messageId: "msg1", senderId: "sender-A")],
|
||||
channelId = testChannel,
|
||||
content = @[byte(2)],
|
||||
bloomFilter = @[],
|
||||
)
|
||||
|
||||
let serialized = serializeMessage(msg).get()
|
||||
@ -1078,25 +1078,25 @@ suite "SDS-R: Repair Buffer Management":
|
||||
)
|
||||
|
||||
# First, create the missing dep scenario
|
||||
let msg2 = SdsMessage(
|
||||
messageId: "msg2",
|
||||
lamportTimestamp: 2,
|
||||
causalHistory: @[HistoryEntry(messageId: "msg1", senderId: "sender-A")],
|
||||
channelId: testChannel,
|
||||
content: @[byte(2)],
|
||||
bloomFilter: @[],
|
||||
let msg2 = SdsMessage.init(
|
||||
messageId = "msg2",
|
||||
lamportTimestamp = 2,
|
||||
causalHistory = @[HistoryEntry(messageId: "msg1", senderId: "sender-A")],
|
||||
channelId = testChannel,
|
||||
content = @[byte(2)],
|
||||
bloomFilter = @[],
|
||||
)
|
||||
discard rm.unwrapReceivedMessage(serializeMessage(msg2).get())
|
||||
check "msg1" in rm.channels[testChannel].outgoingRepairBuffer
|
||||
|
||||
# Now receive msg1 — should clear from repair buffer
|
||||
let msg1 = SdsMessage(
|
||||
messageId: "msg1",
|
||||
lamportTimestamp: 1,
|
||||
causalHistory: @[],
|
||||
channelId: testChannel,
|
||||
content: @[byte(1)],
|
||||
bloomFilter: @[],
|
||||
let msg1 = SdsMessage.init(
|
||||
messageId = "msg1",
|
||||
lamportTimestamp = 1,
|
||||
causalHistory = @[],
|
||||
channelId = testChannel,
|
||||
content = @[byte(1)],
|
||||
bloomFilter = @[],
|
||||
)
|
||||
discard rm.unwrapReceivedMessage(serializeMessage(msg1).get())
|
||||
check "msg1" notin rm.channels[testChannel].outgoingRepairBuffer
|
||||
@ -1108,13 +1108,13 @@ suite "SDS-R: Repair Buffer Management":
|
||||
proc(messageId: SdsMessageID, missingDeps: seq[HistoryEntry], channelId: SdsChannelID) {.gcsafe.} = discard,
|
||||
)
|
||||
|
||||
let msg2 = SdsMessage(
|
||||
messageId: "msg2",
|
||||
lamportTimestamp: 2,
|
||||
causalHistory: @[HistoryEntry(messageId: "msg1", senderId: "sender-A")],
|
||||
channelId: testChannel,
|
||||
content: @[byte(2)],
|
||||
bloomFilter: @[],
|
||||
let msg2 = SdsMessage.init(
|
||||
messageId = "msg2",
|
||||
lamportTimestamp = 2,
|
||||
causalHistory = @[HistoryEntry(messageId: "msg1", senderId: "sender-A")],
|
||||
channelId = testChannel,
|
||||
content = @[byte(2)],
|
||||
bloomFilter = @[],
|
||||
)
|
||||
discard rm.unwrapReceivedMessage(serializeMessage(msg2).get())
|
||||
check "msg1" in rm.channels[testChannel].outgoingRepairBuffer
|
||||
@ -1195,25 +1195,25 @@ suite "SDS-R: Repair Buffer Management":
|
||||
let channel = rm.channels[testChannel]
|
||||
|
||||
# First, seed delivered history so we can respond to a repair request for it
|
||||
let cachedMsg = SdsMessage(
|
||||
messageId: "cached-msg",
|
||||
lamportTimestamp: 1,
|
||||
causalHistory: @[],
|
||||
channelId: testChannel,
|
||||
content: @[byte(99)],
|
||||
bloomFilter: @[],
|
||||
let cachedMsg = SdsMessage.init(
|
||||
messageId = "cached-msg",
|
||||
lamportTimestamp = 1,
|
||||
causalHistory = @[],
|
||||
channelId = testChannel,
|
||||
content = @[byte(99)],
|
||||
bloomFilter = @[],
|
||||
)
|
||||
channel.messageHistory["cached-msg"] = cachedMsg
|
||||
|
||||
# Receive a message with a repair request for "cached-msg"
|
||||
let msgWithRepair = SdsMessage(
|
||||
messageId: "requester-msg",
|
||||
lamportTimestamp: 5,
|
||||
causalHistory: @[],
|
||||
channelId: testChannel,
|
||||
content: @[byte(3)],
|
||||
bloomFilter: @[],
|
||||
repairRequest: @[HistoryEntry(
|
||||
let msgWithRepair = SdsMessage.init(
|
||||
messageId = "requester-msg",
|
||||
lamportTimestamp = 5,
|
||||
causalHistory = @[],
|
||||
channelId = testChannel,
|
||||
content = @[byte(3)],
|
||||
bloomFilter = @[],
|
||||
repairRequest = @[HistoryEntry(
|
||||
messageId: "cached-msg",
|
||||
senderId: "test-participant", # Same as our participantId so we're in response group
|
||||
)],
|
||||
@ -1225,16 +1225,16 @@ suite "SDS-R: Repair Buffer Management":
|
||||
|
||||
suite "SDS-R: Protobuf Roundtrip":
|
||||
test "senderId in HistoryEntry roundtrips through protobuf":
|
||||
let msg = SdsMessage(
|
||||
messageId: "msg1",
|
||||
lamportTimestamp: 100,
|
||||
causalHistory: @[
|
||||
let msg = SdsMessage.init(
|
||||
messageId = "msg1",
|
||||
lamportTimestamp = 100,
|
||||
causalHistory = @[
|
||||
HistoryEntry(messageId: "dep1", retrievalHint: @[byte(1), 2], senderId: "sender-A"),
|
||||
HistoryEntry(messageId: "dep2", senderId: "sender-B"),
|
||||
],
|
||||
channelId: "ch1",
|
||||
content: @[byte(42)],
|
||||
bloomFilter: @[],
|
||||
channelId = "ch1",
|
||||
content = @[byte(42)],
|
||||
bloomFilter = @[],
|
||||
)
|
||||
|
||||
let serialized = serializeMessage(msg).get()
|
||||
@ -1249,14 +1249,14 @@ suite "SDS-R: Protobuf Roundtrip":
|
||||
decoded.causalHistory[1].senderId == "sender-B"
|
||||
|
||||
test "repairRequest field roundtrips through protobuf":
|
||||
let msg = SdsMessage(
|
||||
messageId: "msg1",
|
||||
lamportTimestamp: 100,
|
||||
causalHistory: @[],
|
||||
channelId: "ch1",
|
||||
content: @[byte(42)],
|
||||
bloomFilter: @[],
|
||||
repairRequest: @[
|
||||
let msg = SdsMessage.init(
|
||||
messageId = "msg1",
|
||||
lamportTimestamp = 100,
|
||||
causalHistory = @[],
|
||||
channelId = "ch1",
|
||||
content = @[byte(42)],
|
||||
bloomFilter = @[],
|
||||
repairRequest = @[
|
||||
HistoryEntry(messageId: "missing1", senderId: "sender-X"),
|
||||
HistoryEntry(messageId: "missing2", senderId: "sender-Y", retrievalHint: @[byte(5)]),
|
||||
],
|
||||
@ -1274,13 +1274,13 @@ suite "SDS-R: Protobuf Roundtrip":
|
||||
decoded.repairRequest[1].retrievalHint == @[byte(5)]
|
||||
|
||||
test "backward compat: message without repairRequest decodes fine":
|
||||
let msg = SdsMessage(
|
||||
messageId: "msg1",
|
||||
lamportTimestamp: 100,
|
||||
causalHistory: @[HistoryEntry(messageId: "dep1")],
|
||||
channelId: "ch1",
|
||||
content: @[byte(42)],
|
||||
bloomFilter: @[],
|
||||
let msg = SdsMessage.init(
|
||||
messageId = "msg1",
|
||||
lamportTimestamp = 100,
|
||||
causalHistory = @[HistoryEntry(messageId: "dep1")],
|
||||
channelId = "ch1",
|
||||
content = @[byte(42)],
|
||||
bloomFilter = @[],
|
||||
)
|
||||
|
||||
let serialized = serializeMessage(msg).get()
|
||||
@ -1291,14 +1291,14 @@ suite "SDS-R: Protobuf Roundtrip":
|
||||
decoded.causalHistory[0].senderId == ""
|
||||
|
||||
test "SdsMessage.senderId roundtrips through protobuf":
|
||||
let msg = SdsMessage(
|
||||
messageId: "m1",
|
||||
lamportTimestamp: 1,
|
||||
causalHistory: @[],
|
||||
channelId: "ch1",
|
||||
content: @[byte(1)],
|
||||
bloomFilter: @[],
|
||||
senderId: "alice",
|
||||
let msg = SdsMessage.init(
|
||||
messageId = "m1",
|
||||
lamportTimestamp = 1,
|
||||
causalHistory = @[],
|
||||
channelId = "ch1",
|
||||
content = @[byte(1)],
|
||||
bloomFilter = @[],
|
||||
senderId = "alice",
|
||||
)
|
||||
let decoded = deserializeMessage(serializeMessage(msg).get()).get()
|
||||
check decoded.senderId == "alice"
|
||||
@ -1346,7 +1346,7 @@ suite "SDS-R: Edge Cases and Defensive Branches":
|
||||
d.inMilliseconds < tMax.inMilliseconds
|
||||
|
||||
test "response group distribution is roughly uniform":
|
||||
# With numGroups=10, ~10% of random participants should share sender's group.
|
||||
# With numGroups =10, ~10% of random participants should share sender's group.
|
||||
const numGroups = 10
|
||||
const totalParticipants = 1000
|
||||
let senderId = "alice"
|
||||
@ -1361,7 +1361,7 @@ suite "SDS-R: Edge Cases and Defensive Branches":
|
||||
sameGroup <= 200
|
||||
|
||||
test "computeTResp monotonicity: self always fastest":
|
||||
# The original sender (distance=0) must always be first to respond.
|
||||
# The original sender (distance =0) must always be first to respond.
|
||||
let tMax = initDuration(seconds = 300)
|
||||
let selfD = computeTResp("alice", "alice", "msg-xyz", tMax)
|
||||
check selfD.inMilliseconds == 0
|
||||
@ -1381,13 +1381,13 @@ suite "SDS-R: Lifecycle and State":
|
||||
proc(msgId: SdsMessageID, deps: seq[HistoryEntry], ch: SdsChannelID) {.gcsafe.} = discard,
|
||||
)
|
||||
|
||||
let msg = SdsMessage(
|
||||
messageId: "m2",
|
||||
lamportTimestamp: 2,
|
||||
causalHistory: @[HistoryEntry(messageId: "m1-missing", senderId: "alice")],
|
||||
channelId: testChannel,
|
||||
content: @[byte(2)],
|
||||
bloomFilter: @[],
|
||||
let msg = SdsMessage.init(
|
||||
messageId = "m2",
|
||||
lamportTimestamp = 2,
|
||||
causalHistory = @[HistoryEntry(messageId: "m1-missing", senderId: "alice")],
|
||||
channelId = testChannel,
|
||||
content = @[byte(2)],
|
||||
bloomFilter = @[],
|
||||
)
|
||||
discard rm.unwrapReceivedMessage(serializeMessage(msg).get())
|
||||
check rm.channels[testChannel].outgoingRepairBuffer.len == 0
|
||||
@ -1397,13 +1397,13 @@ suite "SDS-R: Lifecycle and State":
|
||||
defer: rm.cleanup()
|
||||
check rm.ensureChannel(testChannel).isOk()
|
||||
let channel = rm.channels[testChannel]
|
||||
channel.messageHistory["m-wanted"] = SdsMessage(
|
||||
messageId: "m-wanted",
|
||||
lamportTimestamp: 1,
|
||||
causalHistory: @[],
|
||||
channelId: testChannel,
|
||||
content: @[byte(99), 99, 99],
|
||||
bloomFilter: @[],
|
||||
channel.messageHistory["m-wanted"] = SdsMessage.init(
|
||||
messageId = "m-wanted",
|
||||
lamportTimestamp = 1,
|
||||
causalHistory = @[],
|
||||
channelId = testChannel,
|
||||
content = @[byte(99), 99, 99],
|
||||
bloomFilter = @[],
|
||||
)
|
||||
|
||||
rm.setCallbacks(
|
||||
@ -1412,14 +1412,14 @@ suite "SDS-R: Lifecycle and State":
|
||||
proc(msgId: SdsMessageID, deps: seq[HistoryEntry], ch: SdsChannelID) {.gcsafe.} = discard,
|
||||
)
|
||||
|
||||
let msg = SdsMessage(
|
||||
messageId: "req-msg",
|
||||
lamportTimestamp: 5,
|
||||
causalHistory: @[],
|
||||
channelId: testChannel,
|
||||
content: @[byte(1)],
|
||||
bloomFilter: @[],
|
||||
repairRequest: @[HistoryEntry(messageId: "m-wanted", senderId: "")],
|
||||
let msg = SdsMessage.init(
|
||||
messageId = "req-msg",
|
||||
lamportTimestamp = 5,
|
||||
causalHistory = @[],
|
||||
channelId = testChannel,
|
||||
content = @[byte(1)],
|
||||
bloomFilter = @[],
|
||||
repairRequest = @[HistoryEntry(messageId: "m-wanted", senderId: "")],
|
||||
)
|
||||
discard rm.unwrapReceivedMessage(serializeMessage(msg).get())
|
||||
check "m-wanted" notin channel.incomingRepairBuffer
|
||||
@ -1468,14 +1468,14 @@ suite "SDS-R: Lifecycle and State":
|
||||
cachedMessage: @[byte(1)],
|
||||
minTimeRepairResp: getTime(),
|
||||
)
|
||||
channel.messageHistory["c"] = SdsMessage(
|
||||
messageId: "c",
|
||||
lamportTimestamp: 1,
|
||||
causalHistory: @[],
|
||||
channelId: testChannel,
|
||||
content: @[byte(2)],
|
||||
bloomFilter: @[],
|
||||
senderId: "someone",
|
||||
channel.messageHistory["c"] = SdsMessage.init(
|
||||
messageId = "c",
|
||||
lamportTimestamp = 1,
|
||||
causalHistory = @[],
|
||||
channelId = testChannel,
|
||||
content = @[byte(2)],
|
||||
bloomFilter = @[],
|
||||
senderId = "someone",
|
||||
)
|
||||
|
||||
check rm.resetReliabilityManager().isOk()
|
||||
@ -1498,13 +1498,13 @@ suite "SDS-R: Lifecycle and State":
|
||||
proc(msgId: SdsMessageID, deps: seq[HistoryEntry], ch: SdsChannelID) {.gcsafe.} = discard,
|
||||
)
|
||||
|
||||
let msg = SdsMessage(
|
||||
messageId: "m2",
|
||||
lamportTimestamp: 2,
|
||||
causalHistory: @[HistoryEntry(messageId: "m1-missing", senderId: "bob")],
|
||||
channelId: "ch-A",
|
||||
content: @[byte(2)],
|
||||
bloomFilter: @[],
|
||||
let msg = SdsMessage.init(
|
||||
messageId = "m2",
|
||||
lamportTimestamp = 2,
|
||||
causalHistory = @[HistoryEntry(messageId: "m1-missing", senderId: "bob")],
|
||||
channelId = "ch-A",
|
||||
content = @[byte(2)],
|
||||
bloomFilter = @[],
|
||||
)
|
||||
discard rm.unwrapReceivedMessage(serializeMessage(msg).get())
|
||||
check:
|
||||
@ -1526,13 +1526,13 @@ suite "SDS-R: Lifecycle and State":
|
||||
)
|
||||
|
||||
# Carol already has M1 in history and has a pending incomingRepairBuffer entry
|
||||
channel.messageHistory["m1"] = SdsMessage(
|
||||
messageId: "m1",
|
||||
lamportTimestamp: 1,
|
||||
causalHistory: @[],
|
||||
channelId: testChannel,
|
||||
content: @[byte(1)],
|
||||
bloomFilter: @[],
|
||||
channel.messageHistory["m1"] = SdsMessage.init(
|
||||
messageId = "m1",
|
||||
lamportTimestamp = 1,
|
||||
causalHistory = @[],
|
||||
channelId = testChannel,
|
||||
content = @[byte(1)],
|
||||
bloomFilter = @[],
|
||||
)
|
||||
channel.incomingRepairBuffer["m1"] = IncomingRepairEntry(
|
||||
inHistEntry: HistoryEntry(messageId: "m1", senderId: "alice"),
|
||||
@ -1541,14 +1541,14 @@ suite "SDS-R: Lifecycle and State":
|
||||
)
|
||||
|
||||
# A rebroadcast of M1 arrives
|
||||
let msg = SdsMessage(
|
||||
messageId: "m1",
|
||||
lamportTimestamp: 1,
|
||||
causalHistory: @[],
|
||||
channelId: testChannel,
|
||||
content: @[byte(1)],
|
||||
bloomFilter: @[],
|
||||
senderId: "alice",
|
||||
let msg = SdsMessage.init(
|
||||
messageId = "m1",
|
||||
lamportTimestamp = 1,
|
||||
causalHistory = @[],
|
||||
channelId = testChannel,
|
||||
content = @[byte(1)],
|
||||
bloomFilter = @[],
|
||||
senderId = "alice",
|
||||
)
|
||||
discard rm.unwrapReceivedMessage(serializeMessage(msg).get())
|
||||
check "m1" notin channel.incomingRepairBuffer
|
||||
@ -1748,11 +1748,11 @@ suite "SDS-R: Multi-Participant Integration":
|
||||
# Force Bob's T_req so the next wrap attaches the repair request.
|
||||
bob.forceOutgoingExpired("m1")
|
||||
|
||||
# Bob sends M3 — it must carry repair_request=[M1, sender=alice].
|
||||
# Bob sends M3 — it must carry repair_request =[M1, sender =alice].
|
||||
bus.broadcast("bob", @[byte(3)], "m3")
|
||||
|
||||
# Alice received M3, saw the repair_request, cached-bypass and response-group
|
||||
# checks pass, so she has an incomingRepairBuffer entry for M1 with tResp=0.
|
||||
# checks pass, so she has an incomingRepairBuffer entry for M1 with tResp =0.
|
||||
check "m1" in alice.channels[testChannel].incomingRepairBuffer
|
||||
|
||||
# Force alice's tResp to past just to be safe (it's already 0 for self),
|
||||
@ -1786,7 +1786,7 @@ suite "SDS-R: Multi-Participant Integration":
|
||||
"m1" in alice.channels[testChannel].incomingRepairBuffer
|
||||
"m1" in carol.channels[testChannel].incomingRepairBuffer
|
||||
|
||||
# Alice fires first (T_resp=0 for self). Her rebroadcast should cancel Carol's
|
||||
# Alice fires first (T_resp =0 for self). Her rebroadcast should cancel Carol's
|
||||
# pending entry when Carol receives the rebroadcast.
|
||||
alice.forceIncomingExpired("m1")
|
||||
alice.runRepairSweep()
|
||||
@ -1803,7 +1803,7 @@ suite "SDS-R: Multi-Participant Integration":
|
||||
var m1RebroadcastCount = 0
|
||||
for entry in bus.wireLog:
|
||||
if entry.messageId == "m1" and entry.senderId != "alice":
|
||||
discard # only the original Alice->all broadcast had senderId="alice"
|
||||
discard # only the original Alice->all broadcast had senderId ="alice"
|
||||
if entry.messageId == "m1":
|
||||
m1RebroadcastCount += 1
|
||||
# Two "m1" entries total on wire: (1) Alice's original broadcast, (2) Alice's rebroadcast.
|
||||
@ -1835,7 +1835,7 @@ suite "SDS-R: Multi-Participant Integration":
|
||||
check "m1" notin carol.channels[testChannel].outgoingRepairBuffer
|
||||
|
||||
test "response group filtering: only group members respond":
|
||||
# With numGroups=10, roughly 1/10 of receivers will be in the group.
|
||||
# With numGroups =10, roughly 1/10 of receivers will be in the group.
|
||||
# Construct a sender+message where a specific non-sender is NOT in the group.
|
||||
var cfg = defaultConfig()
|
||||
cfg.numResponseGroups = 10
|
||||
@ -1865,15 +1865,15 @@ suite "SDS-R: Multi-Participant Integration":
|
||||
# We inject directly by calling unwrapReceivedMessage on bob/carol.
|
||||
let dave = bus.addPeer("dave", cfg)
|
||||
# Dave has no messages, but we can hand-craft a repair request he would send.
|
||||
let reqMsg = SdsMessage(
|
||||
messageId: "req-from-dave",
|
||||
lamportTimestamp: 10,
|
||||
causalHistory: @[],
|
||||
channelId: testChannel,
|
||||
content: @[byte(9)],
|
||||
bloomFilter: @[],
|
||||
senderId: "dave",
|
||||
repairRequest: @[HistoryEntry(messageId: chosenMsg, senderId: "alice")],
|
||||
let reqMsg = SdsMessage.init(
|
||||
messageId = "req-from-dave",
|
||||
lamportTimestamp = 10,
|
||||
causalHistory = @[],
|
||||
channelId = testChannel,
|
||||
content = @[byte(9)],
|
||||
bloomFilter = @[],
|
||||
senderId = "dave",
|
||||
repairRequest = @[HistoryEntry(messageId: chosenMsg, senderId: "alice")],
|
||||
)
|
||||
let bytes = serializeMessage(reqMsg).get()
|
||||
discard bob.unwrapReceivedMessage(bytes)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user