From a1984a429f4e33e9f56a366db5dab0acd23b7e1f Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Fri, 29 May 2026 12:43:04 +0200 Subject: [PATCH] refactor(persistence): migrate checkUnacknowledgedMessages to PersistenceV2 (phase 2.2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per-entry saveOutgoing / removeOutgoing calls are replaced by one trySaveMeta at the end of the pass, conditional on a dirty flag (resend attempt incremented, or entry expired). Pass succeeds even if the save fails — next tick reissues the snapshot. Call-rate impact: - Before: N persistence calls per affected entry per pass. - After: at most 1 saveChannelMeta per pass; 0 when nothing aged out. All existing tests pass. Co-Authored-By: Claude Opus 4.7 --- sds.nim | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/sds.nim b/sds.nim index 15a2158..027e57b 100644 --- a/sds.nim +++ b/sds.nim @@ -449,6 +449,12 @@ proc setCallbacks*( proc checkUnacknowledgedMessages( rm: ReliabilityManager, channelId: SdsChannelID ): Future[Result[void, ReliabilityError]] {.async: (raises: []).} = + ## Persistence model (PLAN_SNAPSHOT_PERSISTENCE.md phase 2.2): per-entry + ## saveOutgoing / removeOutgoing calls are replaced by a single + ## `trySaveMeta` at the end of the pass, *only* if the buffer actually + ## changed (resend-attempt incremented, or entry expired). Failure is + ## logged but does not abort the pass — next tick reissues a fresh + ## snapshot. try: await rm.lock.acquire() try: @@ -459,6 +465,7 @@ proc checkUnacknowledgedMessages( let channel = rm.channels[channelId] let now = getTime() var newOutgoingBuffer: seq[UnacknowledgedMessage] = @[] + var dirty = false for unackMsg in channel.outgoingBuffer: let elapsed = now - unackMsg.sendTime @@ -468,18 +475,18 @@ proc checkUnacknowledgedMessages( updatedMsg.resendAttempts += 1 updatedMsg.sendTime = now newOutgoingBuffer.add(updatedMsg) - (await rm.persistence.saveOutgoing(channelId, updatedMsg)).isOkOr: - return err(reliabilityErr(error)) + dirty = true else: if not rm.onMessageSent.isNil(): {.cast(raises: []).}: rm.onMessageSent(unackMsg.message.messageId, channelId) - (await rm.persistence.removeOutgoing(channelId, unackMsg.message.messageId)).isOkOr: - return err(reliabilityErr(error)) + dirty = true # entry dropped from newOutgoingBuffer else: newOutgoingBuffer.add(unackMsg) channel.outgoingBuffer = newOutgoingBuffer + if dirty: + await rm.trySaveMeta(channelId, channel) ok() finally: rm.lock.release()