mirror of
https://github.com/logos-messaging/nim-sds.git
synced 2026-01-02 14:13:07 +00:00
fix: revert to strings and fix tests
This commit is contained in:
parent
403974ec1a
commit
d71b68f915
@ -1,8 +1,8 @@
|
||||
import std/[times, options, sets]
|
||||
|
||||
type
|
||||
SdsMessageID* = seq[byte]
|
||||
SdsChannelID* = seq[byte]
|
||||
SdsMessageID* = string
|
||||
SdsChannelID* = string
|
||||
|
||||
SdsMessage* = object
|
||||
messageId*: SdsMessageID
|
||||
|
||||
@ -32,12 +32,12 @@ proc decode*(T: type SdsMessage, buffer: seq[byte]): ProtobufResult[T] =
|
||||
return err(ProtobufError.missingRequiredField("lamportTimestamp"))
|
||||
msg.lamportTimestamp = int64(timestamp)
|
||||
|
||||
var causalHistory: seq[seq[byte]]
|
||||
var causalHistory: seq[SdsMessageID]
|
||||
let histResult = pb.getRepeatedField(3, causalHistory)
|
||||
if histResult.isOk:
|
||||
msg.causalHistory = causalHistory
|
||||
|
||||
var channelId: seq[byte]
|
||||
var channelId: SdsChannelID
|
||||
if ?pb.getField(4, channelId):
|
||||
msg.channelId = some(channelId)
|
||||
else:
|
||||
|
||||
@ -12,6 +12,12 @@ type
|
||||
|
||||
PeriodicSyncCallback* = proc() {.gcsafe, raises: [].}
|
||||
|
||||
AppCallbacks* = ref object
|
||||
messageReadyCb*: MessageReadyCallback
|
||||
messageSentCb*: MessageSentCallback
|
||||
missingDependenciesCb*: MissingDependenciesCallback
|
||||
periodicSyncCb*: PeriodicSyncCallback
|
||||
|
||||
ReliabilityConfig* = object
|
||||
bloomFilterCapacity*: int
|
||||
bloomFilterErrorRate*: float
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
import unittest, results, chronos, std/times
|
||||
import unittest, results, chronos, std/[times, options, tables]
|
||||
import ../src/[reliability, message, protobuf, reliability_utils, rolling_bloom_filter]
|
||||
|
||||
# Core functionality tests
|
||||
@ -6,7 +6,7 @@ suite "Core Operations":
|
||||
var rm: ReliabilityManager
|
||||
|
||||
setup:
|
||||
let rmResult = newReliabilityManager("testChannel")
|
||||
let rmResult = newReliabilityManager(some("testChannel"))
|
||||
check rmResult.isOk()
|
||||
rm = rmResult.get()
|
||||
|
||||
@ -19,7 +19,6 @@ suite "Core Operations":
|
||||
check:
|
||||
config.bloomFilterCapacity == DefaultBloomFilterCapacity
|
||||
config.bloomFilterErrorRate == DefaultBloomFilterErrorRate
|
||||
config.bloomFilterWindow == DefaultBloomFilterWindow
|
||||
config.maxMessageHistory == DefaultMaxMessageHistory
|
||||
|
||||
test "basic message wrapping and unwrapping":
|
||||
@ -40,20 +39,20 @@ suite "Core Operations":
|
||||
|
||||
test "message ordering":
|
||||
# Create messages with different timestamps
|
||||
let msg1 = Message(
|
||||
let msg1 = SdsMessage(
|
||||
messageId: "msg1",
|
||||
lamportTimestamp: 1,
|
||||
causalHistory: @[],
|
||||
channelId: "testChannel",
|
||||
channelId: some("testChannel"),
|
||||
content: @[byte(1)],
|
||||
bloomFilter: @[],
|
||||
)
|
||||
|
||||
let msg2 = Message(
|
||||
let msg2 = SdsMessage(
|
||||
messageId: "msg2",
|
||||
lamportTimestamp: 5,
|
||||
causalHistory: @[],
|
||||
channelId: "testChannel",
|
||||
channelId: some("testChannel"),
|
||||
content: @[byte(2)],
|
||||
bloomFilter: @[],
|
||||
)
|
||||
@ -77,7 +76,7 @@ suite "Reliability Mechanisms":
|
||||
var rm: ReliabilityManager
|
||||
|
||||
setup:
|
||||
let rmResult = newReliabilityManager("testChannel")
|
||||
let rmResult = newReliabilityManager(some("testChannel"))
|
||||
check rmResult.isOk()
|
||||
rm = rmResult.get()
|
||||
|
||||
@ -91,11 +90,11 @@ suite "Reliability Mechanisms":
|
||||
var missingDepsCount = 0
|
||||
|
||||
rm.setCallbacks(
|
||||
proc(messageId: MessageID) {.gcsafe.} =
|
||||
proc(messageId: SdsMessageID) {.gcsafe.} =
|
||||
messageReadyCount += 1,
|
||||
proc(messageId: MessageID) {.gcsafe.} =
|
||||
proc(messageId: SdsMessageID) {.gcsafe.} =
|
||||
messageSentCount += 1,
|
||||
proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} =
|
||||
proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID]) {.gcsafe.} =
|
||||
missingDepsCount += 1,
|
||||
)
|
||||
|
||||
@ -105,20 +104,20 @@ suite "Reliability Mechanisms":
|
||||
let id3 = "msg3"
|
||||
|
||||
# Create messages with dependencies
|
||||
let msg2 = Message(
|
||||
let msg2 = SdsMessage(
|
||||
messageId: id2,
|
||||
lamportTimestamp: 2,
|
||||
causalHistory: @[id1], # msg2 depends on msg1
|
||||
channelId: "testChannel",
|
||||
channelId: some("testChannel"),
|
||||
content: @[byte(2)],
|
||||
bloomFilter: @[],
|
||||
)
|
||||
|
||||
let msg3 = Message(
|
||||
let msg3 = SdsMessage(
|
||||
messageId: id3,
|
||||
lamportTimestamp: 3,
|
||||
causalHistory: @[id1, id2], # msg3 depends on both msg1 and msg2
|
||||
channelId: "testChannel",
|
||||
channelId: some("testChannel"),
|
||||
content: @[byte(3)],
|
||||
bloomFilter: @[],
|
||||
)
|
||||
@ -168,11 +167,11 @@ suite "Reliability Mechanisms":
|
||||
var missingDepsCount = 0
|
||||
|
||||
rm.setCallbacks(
|
||||
proc(messageId: MessageID) {.gcsafe.} =
|
||||
proc(messageId: SdsMessageID) {.gcsafe.} =
|
||||
messageReadyCount += 1,
|
||||
proc(messageId: MessageID) {.gcsafe.} =
|
||||
proc(messageId: SdsMessageID) {.gcsafe.} =
|
||||
messageSentCount += 1,
|
||||
proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} =
|
||||
proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID]) {.gcsafe.} =
|
||||
missingDepsCount += 1,
|
||||
)
|
||||
|
||||
@ -183,11 +182,11 @@ suite "Reliability Mechanisms":
|
||||
check wrap1.isOk()
|
||||
|
||||
# Create a message that has our message in causal history
|
||||
let msg2 = Message(
|
||||
let msg2 = SdsMessage(
|
||||
messageId: "msg2",
|
||||
lamportTimestamp: rm.lamportTimestamp + 1,
|
||||
causalHistory: @[id1], # Include our message in causal history
|
||||
channelId: "testChannel",
|
||||
channelId: some("testChannel"),
|
||||
content: @[byte(2)],
|
||||
bloomFilter: @[] # Test with an empty bloom filter
|
||||
,
|
||||
@ -208,11 +207,11 @@ suite "Reliability Mechanisms":
|
||||
var messageSentCount = 0
|
||||
|
||||
rm.setCallbacks(
|
||||
proc(messageId: MessageID) {.gcsafe.} =
|
||||
proc(messageId: SdsMessageID) {.gcsafe.} =
|
||||
discard,
|
||||
proc(messageId: MessageID) {.gcsafe.} =
|
||||
proc(messageId: SdsMessageID) {.gcsafe.} =
|
||||
messageSentCount += 1,
|
||||
proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} =
|
||||
proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID]) {.gcsafe.} =
|
||||
discard,
|
||||
)
|
||||
|
||||
@ -223,19 +222,18 @@ suite "Reliability Mechanisms":
|
||||
check wrap1.isOk()
|
||||
|
||||
# Create a message with bloom filter containing our message
|
||||
var otherPartyBloomFilter = newRollingBloomFilter(
|
||||
DefaultBloomFilterCapacity, DefaultBloomFilterErrorRate, DefaultBloomFilterWindow
|
||||
)
|
||||
var otherPartyBloomFilter =
|
||||
newRollingBloomFilter(DefaultBloomFilterCapacity, DefaultBloomFilterErrorRate)
|
||||
otherPartyBloomFilter.add(id1)
|
||||
|
||||
let bfResult = serializeBloomFilter(otherPartyBloomFilter.filter)
|
||||
check bfResult.isOk()
|
||||
|
||||
let msg2 = Message(
|
||||
let msg2 = SdsMessage(
|
||||
messageId: "msg2",
|
||||
lamportTimestamp: rm.lamportTimestamp + 1,
|
||||
causalHistory: @[], # Empty causal history as we're using bloom filter
|
||||
channelId: "testChannel",
|
||||
channelId: some("testChannel"),
|
||||
content: @[byte(2)],
|
||||
bloomFilter: bfResult.get(),
|
||||
)
|
||||
@ -253,7 +251,7 @@ suite "Periodic Tasks & Buffer Management":
|
||||
var rm: ReliabilityManager
|
||||
|
||||
setup:
|
||||
let rmResult = newReliabilityManager("testChannel")
|
||||
let rmResult = newReliabilityManager(some("testChannel"))
|
||||
check rmResult.isOk()
|
||||
rm = rmResult.get()
|
||||
|
||||
@ -265,11 +263,11 @@ suite "Periodic Tasks & Buffer Management":
|
||||
var messageSentCount = 0
|
||||
|
||||
rm.setCallbacks(
|
||||
proc(messageId: MessageID) {.gcsafe.} =
|
||||
proc(messageId: SdsMessageID) {.gcsafe.} =
|
||||
discard,
|
||||
proc(messageId: MessageID) {.gcsafe.} =
|
||||
proc(messageId: SdsMessageID) {.gcsafe.} =
|
||||
messageSentCount += 1,
|
||||
proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} =
|
||||
proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID]) {.gcsafe.} =
|
||||
discard,
|
||||
)
|
||||
|
||||
@ -284,11 +282,11 @@ suite "Periodic Tasks & Buffer Management":
|
||||
check outBuffer.len == 6
|
||||
|
||||
# Create message that acknowledges some messages
|
||||
let ackMsg = Message(
|
||||
let ackMsg = SdsMessage(
|
||||
messageId: "ack1",
|
||||
lamportTimestamp: rm.lamportTimestamp + 1,
|
||||
causalHistory: @["msg0", "msg2", "msg4"],
|
||||
channelId: "testChannel",
|
||||
channelId: some("testChannel"),
|
||||
content: @[byte(100)],
|
||||
bloomFilter: @[],
|
||||
)
|
||||
@ -311,19 +309,19 @@ suite "Periodic Tasks & Buffer Management":
|
||||
var config = defaultConfig()
|
||||
config.resendInterval = initDuration(milliseconds = 100) # Short for testing
|
||||
config.bufferSweepInterval = initDuration(milliseconds = 50) # Frequent sweeps
|
||||
config.bloomFilterWindow = initDuration(milliseconds = 150) # Short window
|
||||
config.bloomFilterCapacity = 2 # Small capacity for testing
|
||||
config.maxResendAttempts = 3 # Set a low number of max attempts
|
||||
|
||||
let rmResultP = newReliabilityManager("testChannel", config)
|
||||
let rmResultP = newReliabilityManager(some("testChannel"), config)
|
||||
check rmResultP.isOk()
|
||||
let rm = rmResultP.get()
|
||||
|
||||
rm.setCallbacks(
|
||||
proc(messageId: MessageID) {.gcsafe.} =
|
||||
proc(messageId: SdsMessageID) {.gcsafe.} =
|
||||
discard,
|
||||
proc(messageId: MessageID) {.gcsafe.} =
|
||||
proc(messageId: SdsMessageID) {.gcsafe.} =
|
||||
messageSentCount += 1,
|
||||
proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} =
|
||||
proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID]) {.gcsafe.} =
|
||||
discard,
|
||||
)
|
||||
|
||||
@ -340,34 +338,39 @@ suite "Periodic Tasks & Buffer Management":
|
||||
|
||||
rm.startPeriodicTasks()
|
||||
|
||||
# Wait long enough for bloom filter window to pass and first message to exceed max retries
|
||||
# Wait long enough for bloom filter
|
||||
waitFor sleepAsync(chronos.milliseconds(500))
|
||||
|
||||
# Add new message
|
||||
# Add new messages
|
||||
let msg2 = @[byte(2)]
|
||||
let id2 = "msg2"
|
||||
let wrap2 = rm.wrapOutgoingMessage(msg2, id2)
|
||||
check wrap2.isOk()
|
||||
|
||||
let msg3 = @[byte(3)]
|
||||
let id3 = "msg3"
|
||||
let wrap3 = rm.wrapOutgoingMessage(msg3, id3)
|
||||
check wrap3.isOk()
|
||||
|
||||
let finalBuffer = rm.getOutgoingBuffer()
|
||||
check:
|
||||
finalBuffer.len == 1
|
||||
# Only msg2 should be in buffer, msg1 should be removed after max retries
|
||||
finalBuffer.len == 2
|
||||
# Only msg2 and msg3 should be in buffer, msg1 should be removed after max retries
|
||||
finalBuffer[0].message.messageId == id2 # Verify it's the second message
|
||||
finalBuffer[0].resendAttempts == 0 # New message should have 0 attempts
|
||||
not rm.bloomFilter.contains(id1) # Bloom filter cleaning check
|
||||
rm.bloomFilter.contains(id2) # New message still in filter
|
||||
rm.bloomFilter.contains(id3) # New message still in filter
|
||||
|
||||
rm.cleanup()
|
||||
|
||||
test "periodic sync callback":
|
||||
var syncCallCount = 0
|
||||
rm.setCallbacks(
|
||||
proc(messageId: MessageID) {.gcsafe.} =
|
||||
proc(messageId: SdsMessageID) {.gcsafe.} =
|
||||
discard,
|
||||
proc(messageId: MessageID) {.gcsafe.} =
|
||||
proc(messageId: SdsMessageID) {.gcsafe.} =
|
||||
discard,
|
||||
proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} =
|
||||
proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID]) {.gcsafe.} =
|
||||
discard,
|
||||
proc() {.gcsafe.} =
|
||||
syncCallCount += 1,
|
||||
@ -384,7 +387,7 @@ suite "Special Cases Handling":
|
||||
var rm: ReliabilityManager
|
||||
|
||||
setup:
|
||||
let rmResult = newReliabilityManager("testChannel")
|
||||
let rmResult = newReliabilityManager(some("testChannel"))
|
||||
check rmResult.isOk()
|
||||
rm = rmResult.get()
|
||||
|
||||
@ -406,11 +409,11 @@ suite "Special Cases Handling":
|
||||
history[^1] == "msg" & $(rm.config.maxMessageHistory + 5)
|
||||
|
||||
test "invalid bloom filter handling":
|
||||
let msgInvalid = Message(
|
||||
let msgInvalid = SdsMessage(
|
||||
messageId: "invalid-bf",
|
||||
lamportTimestamp: 1,
|
||||
causalHistory: @[],
|
||||
channelId: "testChannel",
|
||||
channelId: some("testChannel"),
|
||||
content: @[byte(1)],
|
||||
bloomFilter: @[1.byte, 2.byte, 3.byte] # Invalid filter data
|
||||
,
|
||||
@ -428,20 +431,20 @@ suite "Special Cases Handling":
|
||||
test "duplicate message handling":
|
||||
var messageReadyCount = 0
|
||||
rm.setCallbacks(
|
||||
proc(messageId: MessageID) {.gcsafe.} =
|
||||
proc(messageId: SdsMessageID) {.gcsafe.} =
|
||||
messageReadyCount += 1,
|
||||
proc(messageId: MessageID) {.gcsafe.} =
|
||||
proc(messageId: SdsMessageID) {.gcsafe.} =
|
||||
discard,
|
||||
proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} =
|
||||
proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID]) {.gcsafe.} =
|
||||
discard,
|
||||
)
|
||||
|
||||
# Create and process a message
|
||||
let msg = Message(
|
||||
let msg = SdsMessage(
|
||||
messageId: "dup-msg",
|
||||
lamportTimestamp: 1,
|
||||
causalHistory: @[],
|
||||
channelId: "testChannel",
|
||||
channelId: some("testChannel"),
|
||||
content: @[byte(1)],
|
||||
bloomFilter: @[],
|
||||
)
|
||||
@ -475,7 +478,7 @@ suite "Special Cases Handling":
|
||||
|
||||
suite "cleanup":
|
||||
test "cleanup works correctly":
|
||||
let rmResult = newReliabilityManager("testChannel")
|
||||
let rmResult = newReliabilityManager(some("testChannel"))
|
||||
check rmResult.isOk()
|
||||
let rm = rmResult.get()
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user