nim-sds/tests/test_persistence.nim
NagyZoltanPeter 23a0ea7a6f
refactor(persistence): delete legacy interface; rename PersistenceV2 -> Persistence (phase 3)
End-state of the snapshot-persistence refactor. The legacy 13-proc
Persistence interface and its noOpPersistence are gone; the 5-proc
snapshot-based interface (formerly PersistenceV2) takes their place under
the canonical name.

Source:
- sds/types/persistence.nim: replaced 13-proc contract with the 5-proc
  snapshot interface (saveChannelMeta, updateHistory, loadChannel,
  dropChannel, setRetrievalHint). noOpPersistence returns ok everywhere
  and an empty ChannelData on load.
- sds/types/persistence_v2.nim: removed.
- sds/types/reliability_manager.nim: dropped the second persistenceV2
  field; constructor takes a single `persistence: Persistence`.
- sds/sds_utils.nim: rm.persistenceV2.X -> rm.persistence.X; doc-comments
  updated.
- sds.nim: dropped the persistenceV2 parameter from newReliabilityManager.

Tests:
- tests/in_memory_persistence_v2.nim: removed; its content moved to...
- tests/in_memory_persistence.nim: replaces the old legacy mock with the
  snapshot adapter under the canonical filename. Same InMemoryStore
  shape so test assertions stay unchanged.
- tests/test_persistence.nim: ctor param renamed, suite name de-prefixed.

FFI smoke (`nimble libsdsDynamicMac`, refc/threads:on): builds clean.
All 4 test suites pass:
- test_bloom
- test_reliability
- test_persistence (17 V2 tests)
- test_snapshot_codec (13 codec round-trip tests)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-29 13:19:16 +02:00

433 lines
17 KiB
Nim

import results, std/[tables, sets, times]
import sds
import ./async_unittest
import ./in_memory_persistence
converter toParticipantID(s: string): SdsParticipantID =
s.SdsParticipantID
const testChannel = "testChannel"
# Helper: build a ReliabilityManager wired only to the V2 in-memory
# persistence (no legacy backend). Mirrors how production callers will
# construct the manager once phase 3 deletes the legacy field.
proc newV2Manager(
store: InMemoryStore, config = defaultConfig()
): ReliabilityManager =
newReliabilityManager(
participantId = "alice",
config = config,
persistence = newInMemoryPersistence(store),
)
.get()
suite "Persistence: write → restart → read-back":
asyncTest "outgoing buffer survives restart":
let store = newInMemoryStore()
let rm1 = newV2Manager(store)
check (await rm1.ensureChannel(testChannel)).isOk()
let wrapped = await rm1.wrapOutgoingMessage(@[1.byte, 2, 3], "msg-1", testChannel)
check wrapped.isOk()
check store.outgoing[testChannel].len == 1
check "msg-1" in store.outgoing[testChannel]
await rm1.cleanup()
# Simulate restart: fresh manager, same backend.
let rm2 = newV2Manager(store)
check (await rm2.ensureChannel(testChannel)).isOk()
let buf = await rm2.getOutgoingBuffer(testChannel)
check buf.len == 1
check buf[0].message.messageId == "msg-1"
await rm2.cleanup()
asyncTest "lamport clock survives restart":
let store = newInMemoryStore()
let rm1 = newV2Manager(store)
check (await rm1.ensureChannel(testChannel)).isOk()
check (await rm1.updateLamportTimestamp(42, testChannel)).isOk()
# updateLamportTimestamp is now pure; the mutation is persisted by the
# next op-end save. Drive a wrap to force a trySaveMeta.
discard await rm1.wrapOutgoingMessage(@[byte(1)], "tick", testChannel)
# max(42,0)+1 then max(getTime().toUnix, 43)+1; whatever wrap sets is
# what we'll see. We just assert it stayed monotonic.
check store.lamports[testChannel] >= 43
let savedLamport = store.lamports[testChannel]
await rm1.cleanup()
let rm2 = newV2Manager(store)
check (await rm2.ensureChannel(testChannel)).isOk()
check rm2.channels[testChannel].lamportTimestamp == savedLamport
asyncTest "delivered messages survive restart and rebuild bloom":
let store = newInMemoryStore()
let rm1 = newV2Manager(store)
check (await rm1.ensureChannel(testChannel)).isOk()
let msg = SdsMessage.init(
messageId = "delivered-1",
lamportTimestamp = 1,
causalHistory = @[],
channelId = testChannel,
content = @[9.byte, 9],
bloomFilter = @[],
senderId = "alice",
)
check (await rm1.addToHistory(msg, testChannel)).isOk()
check store.log[testChannel].len == 1
await rm1.cleanup()
let rm2 = newV2Manager(store)
check (await rm2.ensureChannel(testChannel)).isOk()
let ch = rm2.channels[testChannel]
check ch.messageHistory.len == 1
check "delivered-1" in ch.messageHistory
# Bloom filter rebuilt from log on bootstrap.
check ch.bloomFilter.contains("delivered-1")
asyncTest "ack removes outgoing entry from persistence":
let store = newInMemoryStore()
let rm = newV2Manager(store)
check (await rm.ensureChannel(testChannel)).isOk()
discard await rm.wrapOutgoingMessage(@[1.byte], "msg-x", testChannel)
check "msg-x" in store.outgoing[testChannel]
# Synthesize an incoming message that ACKs msg-x via causal history.
let ackMsg = SdsMessage.init(
messageId = "ack-bearer",
lamportTimestamp = 5,
causalHistory = @[HistoryEntry.init("msg-x", @[])],
channelId = testChannel,
content = @[],
bloomFilter = @[],
senderId = "bob",
)
let serialized = serializeMessage(ackMsg).get()
discard await rm.unwrapReceivedMessage(serialized)
check "msg-x" notin store.outgoing[testChannel]
await rm.cleanup()
asyncTest "removeChannel issues exactly one dropChannel call and wipes all state":
# Regression for PR #66 review: removal must be a single transactional
# drop, not N per-row removes.
let store = newInMemoryStore()
let rm = newV2Manager(store)
check (await rm.ensureChannel(testChannel)).isOk()
discard await rm.wrapOutgoingMessage(@[1.byte], "msg-r", testChannel)
check store.outgoing[testChannel].len == 1
check store.lamports[testChannel] > 0
check (await rm.removeChannel(testChannel)).isOk()
check store.dropChannelCalls.getOrDefault(testChannel) == 1
check testChannel notin store.outgoing
check testChannel notin store.lamports
check testChannel notin store.log
check testChannel notin store.incoming
check testChannel notin store.outgoingRepair
check testChannel notin store.incomingRepair
await rm.cleanup()
asyncTest "noOpPersistence keeps existing manager working":
let rm = newReliabilityManager(participantId = "alice").get()
# default no-op persistence (both legacy and V2)
check (await rm.ensureChannel(testChannel)).isOk()
let wrapped = await rm.wrapOutgoingMessage(@[1.byte], "msg-n", testChannel)
check wrapped.isOk()
let buf = await rm.getOutgoingBuffer(testChannel)
check buf.len == 1
await rm.cleanup()
asyncTest "continue operating after restart: lamport stays monotonic":
let store = newInMemoryStore()
let rm1 = newV2Manager(store)
check (await rm1.ensureChannel(testChannel)).isOk()
discard await rm1.wrapOutgoingMessage(@[1.byte], "m1", testChannel)
let lamportAfterSession1 = store.lamports[testChannel]
check lamportAfterSession1 > 0
await rm1.cleanup()
# Restart and send another message — lamport must not regress.
let rm2 = newV2Manager(store)
check (await rm2.ensureChannel(testChannel)).isOk()
check rm2.channels[testChannel].lamportTimestamp == lamportAfterSession1
discard await rm2.wrapOutgoingMessage(@[2.byte], "m2", testChannel)
check store.lamports[testChannel] > lamportAfterSession1
let buf = await rm2.getOutgoingBuffer(testChannel)
check buf.len == 2
await rm2.cleanup()
asyncTest "multiple restart cycles preserve state":
let store = newInMemoryStore()
for i in 1 .. 3:
let rm = newV2Manager(store)
check (await rm.ensureChannel(testChannel)).isOk()
discard await rm.wrapOutgoingMessage(@[byte(i)], "m" & $i, testChannel)
await rm.cleanup()
# Final session: all three messages must be in the buffer.
let rmFinal = newV2Manager(store)
check (await rmFinal.ensureChannel(testChannel)).isOk()
let buf = await rmFinal.getOutgoingBuffer(testChannel)
check buf.len == 3
var ids = newSeq[string]()
for unack in buf:
ids.add(unack.message.messageId.string)
check "m1" in ids
check "m2" in ids
check "m3" in ids
await rmFinal.cleanup()
asyncTest "incoming dep-waiting buffer survives restart with missingDeps intact":
let store = newInMemoryStore()
let rm1 = newV2Manager(store)
check (await rm1.ensureChannel(testChannel)).isOk()
# Receive a message whose causal-history references an unknown predecessor.
let depMsg = SdsMessage.init(
messageId = "msg-with-deps",
lamportTimestamp = 10,
causalHistory = @[HistoryEntry.init("missing-dep", @[])],
channelId = testChannel,
content = @[7.byte],
bloomFilter = @[],
senderId = "carol",
)
let serialized = serializeMessage(depMsg).get()
discard await rm1.unwrapReceivedMessage(serialized)
check "msg-with-deps" in store.incoming[testChannel]
await rm1.cleanup()
# Restart — buffered message and its missing-deps set must be back.
let rm2 = newV2Manager(store)
check (await rm2.ensureChannel(testChannel)).isOk()
let inbuf = await rm2.getIncomingBuffer(testChannel)
check "msg-with-deps" in inbuf
check "missing-dep" in inbuf["msg-with-deps"].missingDeps
await rm2.cleanup()
asyncTest "removeChannel + recreate does not inherit stale lamport":
let store = newInMemoryStore()
let rm1 = newV2Manager(store)
check (await rm1.ensureChannel(testChannel)).isOk()
discard await rm1.wrapOutgoingMessage(@[1.byte], "m-old", testChannel)
check store.lamports[testChannel] > 0
check (await rm1.removeChannel(testChannel)).isOk()
check testChannel notin store.lamports
await rm1.cleanup()
# Recreate the same channelId after a restart — must start fresh.
let rm2 = newV2Manager(store)
check (await rm2.ensureChannel(testChannel)).isOk()
check rm2.channels[testChannel].lamportTimestamp == 0
let buf = await rm2.getOutgoingBuffer(testChannel)
check buf.len == 0
await rm2.cleanup()
asyncTest "SDS-R outgoing repair buffer survives restart with absolute t_req_at":
let store = newInMemoryStore()
let rm1 = newV2Manager(store)
check (await rm1.ensureChannel(testChannel)).isOk()
let depMsg = SdsMessage.init(
messageId = "msg-needs-repair",
lamportTimestamp = 5,
causalHistory = @[HistoryEntry.init("missing-dep", @[])],
channelId = testChannel,
content = @[1.byte],
bloomFilter = @[],
senderId = "bob",
)
discard await rm1.unwrapReceivedMessage(serializeMessage(depMsg).get())
check "missing-dep" in store.outgoingRepair[testChannel]
let originalTReqAt =
store.outgoingRepair[testChannel]["missing-dep"].minTimeRepairReq
check originalTReqAt.toUnix > 0
await rm1.cleanup()
# Restart — repair entry must be back with the SAME absolute time.
# Codec serialises Time as int64 unix milliseconds (PLAN §1.5), so the
# restored Time may differ by sub-millisecond precision from the
# original. Compare at second resolution which is what the protocol
# actually relies on.
let rm2 = newV2Manager(store)
check (await rm2.ensureChannel(testChannel)).isOk()
let buf = rm2.channels[testChannel].outgoingRepairBuffer
check "missing-dep" in buf
check buf["missing-dep"].minTimeRepairReq.toUnix == originalTReqAt.toUnix
await rm2.cleanup()
asyncTest "FIFO eviction state survives restart":
let store = newInMemoryStore()
var smallCfg = defaultConfig()
smallCfg.maxMessageHistory = 3
smallCfg.bloomFilterCapacity = 3
let rm1 = newV2Manager(store, smallCfg)
check (await rm1.ensureChannel(testChannel)).isOk()
# Add 5 delivered messages — first 2 should be evicted by FIFO.
for i in 1 .. 5:
let m = SdsMessage.init(
messageId = "m" & $i,
lamportTimestamp = int64(i),
causalHistory = @[],
channelId = testChannel,
content = @[byte(i)],
bloomFilter = @[],
senderId = "alice",
)
check (await rm1.addToHistory(m, testChannel)).isOk()
check store.log[testChannel].len == 3
check "m1" notin store.log[testChannel]
check "m2" notin store.log[testChannel]
await rm1.cleanup()
# Restart — evicted entries must NOT come back; survivors keep order.
let rm2 = newV2Manager(store, smallCfg)
check (await rm2.ensureChannel(testChannel)).isOk()
let history = rm2.channels[testChannel].messageHistory
check history.len == 3
check "m1" notin history
check "m2" notin history
check "m3" in history
check "m5" in history
# FIFO continues correctly after restart: adding m6 evicts m3.
let m6 = SdsMessage.init(
messageId = "m6",
lamportTimestamp = 6,
causalHistory = @[],
channelId = testChannel,
content = @[6.byte],
bloomFilter = @[],
senderId = "alice",
)
check (await rm2.addToHistory(m6, testChannel)).isOk()
check "m3" notin store.log[testChannel]
check "m6" in store.log[testChannel]
await rm2.cleanup()
asyncTest "dep-clear cascade resumes correctly across a restart":
let store = newInMemoryStore()
let rm1 = newV2Manager(store)
check (await rm1.ensureChannel(testChannel)).isOk()
# Receive c (deps on b), then b (deps on a). Both must buffer.
let msgC = SdsMessage.init(
messageId = "c",
lamportTimestamp = 30,
causalHistory = @[HistoryEntry.init("b", @[])],
channelId = testChannel,
content = @[3.byte],
bloomFilter = @[],
senderId = "carol",
)
let msgB = SdsMessage.init(
messageId = "b",
lamportTimestamp = 20,
causalHistory = @[HistoryEntry.init("a", @[])],
channelId = testChannel,
content = @[2.byte],
bloomFilter = @[],
senderId = "bob",
)
discard await rm1.unwrapReceivedMessage(serializeMessage(msgC).get())
discard await rm1.unwrapReceivedMessage(serializeMessage(msgB).get())
check "c" in store.incoming[testChannel]
check "b" in store.incoming[testChannel]
await rm1.cleanup()
# Restart — both still buffered with intact missingDeps.
let rm2 = newV2Manager(store)
check (await rm2.ensureChannel(testChannel)).isOk()
let inbuf = await rm2.getIncomingBuffer(testChannel)
check "c" in inbuf
check "b" in inbuf
# Now receive a (root) — should cascade-deliver a, b, c.
let msgA = SdsMessage.init(
messageId = "a",
lamportTimestamp = 10,
causalHistory = @[],
channelId = testChannel,
content = @[1.byte],
bloomFilter = @[],
senderId = "alice",
)
discard await rm2.unwrapReceivedMessage(serializeMessage(msgA).get())
let history = rm2.channels[testChannel].messageHistory
check "a" in history
check "b" in history
check "c" in history
let inbufFinal = await rm2.getIncomingBuffer(testChannel)
check inbufFinal.len == 0
await rm2.cleanup()
suite "Persistence: failure policy":
asyncTest "loadChannel failure surfaces as rePersistenceError on bootstrap":
# Bootstrap durability is the semantic intent of getOrCreateChannel —
# the caller asked us to materialise a channel and we can't do that
# without knowing prior state. So this op DOES propagate err on load
# failure (PLAN §8).
let store = newInMemoryStore()
store.failingOps.incl("loadChannel")
let rm = newReliabilityManager(
participantId = "alice", persistence = newInMemoryPersistence(store)
)
.get()
let res = await rm.ensureChannel(testChannel)
check res.isErr()
check res.error == ReliabilityError.rePersistenceError
asyncTest "saveChannelMeta failure during send does NOT surface — non-fatal policy":
# PLAN §8: persistence failures during foreground ops are logged but
# MUST NOT abort the op. The in-memory state is the source of truth;
# the next op's snapshot will re-synchronise on-disk state. This test
# is the inversion of the legacy "write failure surfaces as err" —
# the new policy is deliberate.
let store = newInMemoryStore()
let rm = newReliabilityManager(
participantId = "alice", persistence = newInMemoryPersistence(store)
)
.get()
check (await rm.ensureChannel(testChannel)).isOk()
store.failingOps.incl("saveChannelMeta")
let res = await rm.wrapOutgoingMessage(@[byte(1)], "m1", testChannel)
# Op succeeds: bytes were produced, protocol state is correct in
# memory, the FFI caller is unaffected.
check res.isOk()
# In-memory state is correct even though disk save was rejected.
let buf = await rm.getOutgoingBuffer(testChannel)
check buf.len == 1
check buf[0].message.messageId == "m1"
# Recovery: clear the failure, drive another op, disk catches up.
store.failingOps.excl("saveChannelMeta")
let res2 = await rm.wrapOutgoingMessage(@[byte(2)], "m2", testChannel)
check res2.isOk()
check "m1" in store.outgoing[testChannel]
check "m2" in store.outgoing[testChannel]
asyncTest "updateHistory failure during send does NOT surface — non-fatal policy":
# Same policy applied to the history-update path.
let store = newInMemoryStore()
let rm = newReliabilityManager(
participantId = "alice", persistence = newInMemoryPersistence(store)
)
.get()
check (await rm.ensureChannel(testChannel)).isOk()
store.failingOps.incl("updateHistory")
let res = await rm.wrapOutgoingMessage(@[byte(1)], "m1", testChannel)
check res.isOk()
check rm.channels[testChannel].messageHistory.len == 1
asyncTest "dropChannel failure during removeChannel surfaces as rePersistenceError":
# Durability is the semantic intent of removeChannel — the caller
# asked us to confirm a disk wipe. We cannot silently lie. So this op
# DOES propagate err on failure (PLAN §8).
let store = newInMemoryStore()
let rm = newReliabilityManager(
participantId = "alice", persistence = newInMemoryPersistence(store)
)
.get()
check (await rm.ensureChannel(testChannel)).isOk()
store.failingOps.incl("dropChannel")
let res = await rm.removeChannel(testChannel)
check res.isErr()
check res.error == ReliabilityError.rePersistenceError