diff --git a/sds.nim b/sds.nim index 30392e1..75e01a7 100644 --- a/sds.nim +++ b/sds.nim @@ -472,7 +472,6 @@ proc resetReliabilityManager*(rm: ReliabilityManager): Result[void, ReliabilityE try: for channelId, channel in rm.channels: rm.dropChannelFromPersistence(channelId, channel) - rm.persistence.saveLamport(channelId, 0) channel.lamportTimestamp = 0 channel.messageHistory.clear() channel.outgoingBuffer.setLen(0) diff --git a/sds/sds_utils.nim b/sds/sds_utils.nim index bf1fc8c..d3b593b 100644 --- a/sds/sds_utils.nim +++ b/sds/sds_utils.nim @@ -30,6 +30,7 @@ proc dropChannelFromPersistence*( rm.persistence.removeOutgoingRepair(channelId, msgId) for msgId in channel.incomingRepairBuffer.keys: rm.persistence.removeIncomingRepair(channelId, msgId) + rm.persistence.saveLamport(channelId, 0) proc cleanup*(rm: ReliabilityManager) {.raises: [].} = ## Releases in-memory state. Does NOT wipe persistence — the manager may be diff --git a/tests/test_persistence.nim b/tests/test_persistence.nim index 020996e..fc5e36a 100644 --- a/tests/test_persistence.nim +++ b/tests/test_persistence.nim @@ -1,4 +1,4 @@ -import unittest, results, std/tables +import unittest, results, std/[tables, sets, times] import sds import ./in_memory_persistence @@ -98,8 +98,10 @@ suite "Persistence: write → restart → read-back": check rm.ensureChannel(testChannel).isOk() discard rm.wrapOutgoingMessage(@[1.byte], "msg-r", testChannel) check store.outgoing[testChannel].len == 1 + check store.lamports[testChannel] > 0 check rm.removeChannel(testChannel).isOk() check store.outgoing[testChannel].len == 0 + check store.lamports[testChannel] == 0 rm.cleanup() test "noOpPersistence keeps existing manager working": @@ -109,3 +111,227 @@ suite "Persistence: write → restart → read-back": check wrapped.isOk() check rm.getOutgoingBuffer(testChannel).len == 1 rm.cleanup() + + test "continue operating after restart: lamport stays monotonic": + let store = newInMemoryStore() + let p1 = newInMemoryPersistence(store) + let rm1 = newReliabilityManager(persistence = p1).get() + check rm1.ensureChannel(testChannel).isOk() + discard rm1.wrapOutgoingMessage(@[1.byte], "m1", testChannel) + let lamportAfterSession1 = store.lamports[testChannel] + check lamportAfterSession1 > 0 + rm1.cleanup() + + # Restart and send another message — lamport must not regress. + let p2 = newInMemoryPersistence(store) + let rm2 = newReliabilityManager(persistence = p2).get() + check rm2.ensureChannel(testChannel).isOk() + check rm2.channels[testChannel].lamportTimestamp == lamportAfterSession1 + discard rm2.wrapOutgoingMessage(@[2.byte], "m2", testChannel) + check store.lamports[testChannel] > lamportAfterSession1 + check rm2.getOutgoingBuffer(testChannel).len == 2 + rm2.cleanup() + + test "multiple restart cycles preserve state": + let store = newInMemoryStore() + for i in 1 .. 3: + let p = newInMemoryPersistence(store) + let rm = newReliabilityManager(persistence = p).get() + check rm.ensureChannel(testChannel).isOk() + discard rm.wrapOutgoingMessage(@[byte(i)], "m" & $i, testChannel) + rm.cleanup() + + # Final session: all three messages must be in the buffer. + let pFinal = newInMemoryPersistence(store) + let rmFinal = newReliabilityManager(persistence = pFinal).get() + check rmFinal.ensureChannel(testChannel).isOk() + let buf = rmFinal.getOutgoingBuffer(testChannel) + check buf.len == 3 + var ids = newSeq[string]() + for unack in buf: + ids.add(unack.message.messageId.string) + check "m1" in ids + check "m2" in ids + check "m3" in ids + rmFinal.cleanup() + + test "incoming dep-waiting buffer survives restart with missingDeps intact": + let store = newInMemoryStore() + let p1 = newInMemoryPersistence(store) + let rm1 = newReliabilityManager(persistence = p1).get() + check rm1.ensureChannel(testChannel).isOk() + + # Receive a message whose causal-history references an unknown predecessor. + let depMsg = SdsMessage.init( + messageId = "msg-with-deps", + lamportTimestamp = 10, + causalHistory = @[HistoryEntry.init("missing-dep", @[])], + channelId = testChannel, + content = @[7.byte], + bloomFilter = @[], + senderId = "carol", + ) + let serialized = serializeMessage(depMsg).get() + discard rm1.unwrapReceivedMessage(serialized) + check "msg-with-deps" in store.incoming[testChannel] + rm1.cleanup() + + # Restart — buffered message and its missing-deps set must be back. + let p2 = newInMemoryPersistence(store) + let rm2 = newReliabilityManager(persistence = p2).get() + check rm2.ensureChannel(testChannel).isOk() + let inbuf = rm2.getIncomingBuffer(testChannel) + check "msg-with-deps" in inbuf + check "missing-dep" in inbuf["msg-with-deps"].missingDeps + rm2.cleanup() + + test "removeChannel + recreate does not inherit stale lamport": + # Regression: dropChannelFromPersistence used to skip saveLamport(0), + # so the channels row survived removeChannel and a recreate inherited it. + let store = newInMemoryStore() + let p1 = newInMemoryPersistence(store) + let rm1 = newReliabilityManager(persistence = p1).get() + check rm1.ensureChannel(testChannel).isOk() + discard rm1.wrapOutgoingMessage(@[1.byte], "m-old", testChannel) + check store.lamports[testChannel] > 0 + check rm1.removeChannel(testChannel).isOk() + check store.lamports[testChannel] == 0 + rm1.cleanup() + + # Recreate the same channelId after a restart — must start fresh. + let p2 = newInMemoryPersistence(store) + let rm2 = newReliabilityManager(persistence = p2).get() + check rm2.ensureChannel(testChannel).isOk() + check rm2.channels[testChannel].lamportTimestamp == 0 + check rm2.getOutgoingBuffer(testChannel).len == 0 + rm2.cleanup() + + test "SDS-R outgoing repair buffer survives restart with absolute t_req_at": + let store = newInMemoryStore() + let p1 = newInMemoryPersistence(store) + let rm1 = newReliabilityManager( + participantId = "alice", persistence = p1 + ).get() + check rm1.ensureChannel(testChannel).isOk() + + # Receive a message that references an unknown dep — triggers SDS-R repair. + let depMsg = SdsMessage.init( + messageId = "msg-needs-repair", + lamportTimestamp = 5, + causalHistory = @[HistoryEntry.init("missing-dep", @[])], + channelId = testChannel, + content = @[1.byte], + bloomFilter = @[], + senderId = "bob", + ) + discard rm1.unwrapReceivedMessage(serializeMessage(depMsg).get()) + check "missing-dep" in store.outgoingRepair[testChannel] + let originalTReqAt = store.outgoingRepair[testChannel]["missing-dep"].minTimeRepairReq + check originalTReqAt.toUnix > 0 + rm1.cleanup() + + # Restart — repair entry must be back with the SAME absolute time, not "now". + let p2 = newInMemoryPersistence(store) + let rm2 = newReliabilityManager( + participantId = "alice", persistence = p2 + ).get() + check rm2.ensureChannel(testChannel).isOk() + let buf = rm2.channels[testChannel].outgoingRepairBuffer + check "missing-dep" in buf + check buf["missing-dep"].minTimeRepairReq == originalTReqAt + rm2.cleanup() + + test "FIFO eviction state survives restart": + let store = newInMemoryStore() + var smallCfg = defaultConfig() + smallCfg.maxMessageHistory = 3 + smallCfg.bloomFilterCapacity = 3 + + let p1 = newInMemoryPersistence(store) + let rm1 = newReliabilityManager(config = smallCfg, persistence = p1).get() + check rm1.ensureChannel(testChannel).isOk() + # Add 5 delivered messages — first 2 should be evicted by FIFO. + for i in 1 .. 5: + let m = SdsMessage.init( + messageId = "m" & $i, + lamportTimestamp = int64(i), + causalHistory = @[], + channelId = testChannel, + content = @[byte(i)], + bloomFilter = @[], + senderId = "alice", + ) + rm1.addToHistory(m, testChannel) + check store.log[testChannel].len == 3 + check "m1" notin store.log[testChannel] + check "m2" notin store.log[testChannel] + rm1.cleanup() + + # Restart — evicted entries must NOT come back; survivors keep order. + let p2 = newInMemoryPersistence(store) + let rm2 = newReliabilityManager(config = smallCfg, persistence = p2).get() + check rm2.ensureChannel(testChannel).isOk() + let history = rm2.channels[testChannel].messageHistory + check history.len == 3 + check "m1" notin history + check "m2" notin history + check "m3" in history + check "m5" in history + # FIFO continues correctly after restart: adding m6 evicts m3, not a stale entry. + let m6 = SdsMessage.init( + messageId = "m6", lamportTimestamp = 6, causalHistory = @[], + channelId = testChannel, content = @[6.byte], + bloomFilter = @[], senderId = "alice", + ) + rm2.addToHistory(m6, testChannel) + check "m3" notin store.log[testChannel] + check "m6" in store.log[testChannel] + rm2.cleanup() + + test "dep-clear cascade resumes correctly across a restart": + let store = newInMemoryStore() + let p1 = newInMemoryPersistence(store) + let rm1 = newReliabilityManager(persistence = p1).get() + check rm1.ensureChannel(testChannel).isOk() + + # Receive c (deps on b), then b (deps on a). Both must buffer. + let msgC = SdsMessage.init( + messageId = "c", lamportTimestamp = 30, + causalHistory = @[HistoryEntry.init("b", @[])], + channelId = testChannel, content = @[3.byte], + bloomFilter = @[], senderId = "carol", + ) + let msgB = SdsMessage.init( + messageId = "b", lamportTimestamp = 20, + causalHistory = @[HistoryEntry.init("a", @[])], + channelId = testChannel, content = @[2.byte], + bloomFilter = @[], senderId = "bob", + ) + discard rm1.unwrapReceivedMessage(serializeMessage(msgC).get()) + discard rm1.unwrapReceivedMessage(serializeMessage(msgB).get()) + check "c" in store.incoming[testChannel] + check "b" in store.incoming[testChannel] + rm1.cleanup() + + # Restart — both still buffered, with intact missingDeps. + let p2 = newInMemoryPersistence(store) + let rm2 = newReliabilityManager(persistence = p2).get() + check rm2.ensureChannel(testChannel).isOk() + let inbuf = rm2.getIncomingBuffer(testChannel) + check "c" in inbuf + check "b" in inbuf + + # Now receive a (root) — should cascade-deliver a, b, c. + let msgA = SdsMessage.init( + messageId = "a", lamportTimestamp = 10, causalHistory = @[], + channelId = testChannel, content = @[1.byte], + bloomFilter = @[], senderId = "alice", + ) + discard rm2.unwrapReceivedMessage(serializeMessage(msgA).get()) + let history = rm2.channels[testChannel].messageHistory + check "a" in history + check "b" in history + check "c" in history + # Buffer should be drained. + check rm2.getIncomingBuffer(testChannel).len == 0 + rm2.cleanup()