From 7d7ea4ecbe30402858b55d3025d87fda103b5ffb Mon Sep 17 00:00:00 2001 From: darshankabariya Date: Fri, 8 May 2026 03:00:58 +0530 Subject: [PATCH] add dropChannel to persistence, replace the loop in dropChannelFromPersistence with a single call --- sds.nim | 2 +- sds/sds_utils.nim | 22 ++++++---------------- sds/types/persistence.nim | 9 +++++++++ tests/in_memory_persistence.nim | 14 ++++++++++++++ tests/test_persistence.nim | 20 ++++++++++++++------ 5 files changed, 44 insertions(+), 23 deletions(-) diff --git a/sds.nim b/sds.nim index 75e01a7..e40e421 100644 --- a/sds.nim +++ b/sds.nim @@ -471,7 +471,7 @@ proc resetReliabilityManager*(rm: ReliabilityManager): Result[void, ReliabilityE withLock rm.lock: try: for channelId, channel in rm.channels: - rm.dropChannelFromPersistence(channelId, channel) + rm.dropChannelFromPersistence(channelId) channel.lamportTimestamp = 0 channel.messageHistory.clear() channel.outgoingBuffer.setLen(0) diff --git a/sds/sds_utils.nim b/sds/sds_utils.nim index d3b593b..394c826 100644 --- a/sds/sds_utils.nim +++ b/sds/sds_utils.nim @@ -15,22 +15,12 @@ proc defaultConfig*(): ReliabilityConfig = return ReliabilityConfig.init() proc dropChannelFromPersistence*( - rm: ReliabilityManager, channelId: SdsChannelID, channel: ChannelContext + rm: ReliabilityManager, channelId: SdsChannelID ) {.gcsafe, raises: [].} = - ## Fires per-entry remove calls for every buffered entry in the channel. - ## Called by cleanup / removeChannel / resetReliabilityManager before they - ## wipe in-memory state, so on-disk state stays consistent. - for unack in channel.outgoingBuffer: - rm.persistence.removeOutgoing(channelId, unack.message.messageId) - for msgId in channel.incomingBuffer.keys: - rm.persistence.removeIncoming(channelId, msgId) - for msgId in channel.messageHistory.keys: - rm.persistence.removeLogEntry(channelId, msgId) - for msgId in channel.outgoingRepairBuffer.keys: - rm.persistence.removeOutgoingRepair(channelId, msgId) - for msgId in channel.incomingRepairBuffer.keys: - rm.persistence.removeIncomingRepair(channelId, msgId) - rm.persistence.saveLamport(channelId, 0) + ## Wipes all persisted state for a channel via a single backend call. + ## Called by removeChannel / resetReliabilityManager before they clear + ## in-memory state. Backend executes the wipe in one transaction. + rm.persistence.dropChannel(channelId) proc cleanup*(rm: ReliabilityManager) {.raises: [].} = ## Releases in-memory state. Does NOT wipe persistence — the manager may be @@ -301,7 +291,7 @@ proc removeChannel*( try: if channelId in rm.channels: let channel = rm.channels[channelId] - rm.dropChannelFromPersistence(channelId, channel) + rm.dropChannelFromPersistence(channelId) channel.outgoingBuffer.setLen(0) channel.incomingBuffer.clear() channel.messageHistory.clear() diff --git a/sds/types/persistence.nim b/sds/types/persistence.nim index ffaf806..673c7d7 100644 --- a/sds/types/persistence.nim +++ b/sds/types/persistence.nim @@ -75,6 +75,13 @@ type removeIncomingRepair*: proc(channelId: SdsChannelID, msgId: SdsMessageID) {.gcsafe, raises: [].} + # Wipe all persisted state for a channel in one transactional call. + # Called by removeChannel / resetReliabilityManager. Backends should + # implement this atomically (e.g. one BEGIN/COMMIT) — a per-row loop on + # the nim-sds side would mean N fsyncs per drop. + dropChannel*: + proc(channelId: SdsChannelID) {.gcsafe, raises: [].} + # Bootstrap on `addChannel` / `getOrCreateChannel`. loadAllForChannel*: proc(channelId: SdsChannelID): ChannelSnapshot {.gcsafe, raises: [].} @@ -112,6 +119,8 @@ proc noOpPersistence*(): Persistence = discard, removeIncomingRepair: proc(channelId: SdsChannelID, msgId: SdsMessageID) = discard, + dropChannel: proc(channelId: SdsChannelID) = + discard, loadAllForChannel: proc(channelId: SdsChannelID): ChannelSnapshot = ChannelSnapshot(), ) diff --git a/tests/in_memory_persistence.nim b/tests/in_memory_persistence.nim index 3f44909..f3c7d88 100644 --- a/tests/in_memory_persistence.nim +++ b/tests/in_memory_persistence.nim @@ -14,6 +14,9 @@ type InMemoryStore* = ref object incoming*: Table[SdsChannelID, OrderedTable[SdsMessageID, IncomingMessage]] outgoingRepair*: Table[SdsChannelID, OrderedTable[SdsMessageID, OutgoingRepairEntry]] incomingRepair*: Table[SdsChannelID, OrderedTable[SdsMessageID, IncomingRepairEntry]] + dropChannelCalls*: Table[SdsChannelID, int] + ## Per-channel counter; lets tests assert dropChannel is invoked exactly + ## once per logical drop (not N times — see PR #66 review). proc newInMemoryStore*(): InMemoryStore = InMemoryStore() @@ -89,6 +92,17 @@ proc newInMemoryPersistence*(store: InMemoryStore): Persistence = if channelId in store.incomingRepair: store.incomingRepair[channelId].del(msgId), + dropChannel: proc(channelId: SdsChannelID) {.gcsafe, raises: [].} = + {.cast(raises: []).}: + store.lamports.del(channelId) + store.log.del(channelId) + store.outgoing.del(channelId) + store.incoming.del(channelId) + store.outgoingRepair.del(channelId) + store.incomingRepair.del(channelId) + store.dropChannelCalls[channelId] = + store.dropChannelCalls.getOrDefault(channelId) + 1, + loadAllForChannel: proc(channelId: SdsChannelID): ChannelSnapshot {.gcsafe, raises: [].} = {.cast(raises: []).}: var snap = ChannelSnapshot() diff --git a/tests/test_persistence.nim b/tests/test_persistence.nim index fc5e36a..c2ab4fe 100644 --- a/tests/test_persistence.nim +++ b/tests/test_persistence.nim @@ -91,7 +91,9 @@ suite "Persistence: write → restart → read-back": check "msg-x" notin store.outgoing[testChannel] rm.cleanup() - test "removeChannel fires per-entry removes": + test "removeChannel issues exactly one dropChannel call and wipes all state": + # Regression for PR #66 review: removal must be a single transactional + # drop, not N per-row removes — otherwise SQLite eats N fsyncs per drop. let store = newInMemoryStore() let p = newInMemoryPersistence(store) let rm = newReliabilityManager(persistence = p).get() @@ -99,9 +101,15 @@ suite "Persistence: write → restart → read-back": discard rm.wrapOutgoingMessage(@[1.byte], "msg-r", testChannel) check store.outgoing[testChannel].len == 1 check store.lamports[testChannel] > 0 + check rm.removeChannel(testChannel).isOk() - check store.outgoing[testChannel].len == 0 - check store.lamports[testChannel] == 0 + check store.dropChannelCalls.getOrDefault(testChannel) == 1 + check testChannel notin store.outgoing + check testChannel notin store.lamports + check testChannel notin store.log + check testChannel notin store.incoming + check testChannel notin store.outgoingRepair + check testChannel notin store.incomingRepair rm.cleanup() test "noOpPersistence keeps existing manager working": @@ -186,8 +194,8 @@ suite "Persistence: write → restart → read-back": rm2.cleanup() test "removeChannel + recreate does not inherit stale lamport": - # Regression: dropChannelFromPersistence used to skip saveLamport(0), - # so the channels row survived removeChannel and a recreate inherited it. + # Regression: dropChannel must wipe the lamport row; otherwise a recreate + # of the same channelId after restart picks up the old timestamp. let store = newInMemoryStore() let p1 = newInMemoryPersistence(store) let rm1 = newReliabilityManager(persistence = p1).get() @@ -195,7 +203,7 @@ suite "Persistence: write → restart → read-back": discard rm1.wrapOutgoingMessage(@[1.byte], "m-old", testChannel) check store.lamports[testChannel] > 0 check rm1.removeChannel(testChannel).isOk() - check store.lamports[testChannel] == 0 + check testChannel notin store.lamports rm1.cleanup() # Recreate the same channelId after a restart — must start fresh.