diff --git a/sds.nim b/sds.nim index 15a2158..027e57b 100644 --- a/sds.nim +++ b/sds.nim @@ -449,6 +449,12 @@ proc setCallbacks*( proc checkUnacknowledgedMessages( rm: ReliabilityManager, channelId: SdsChannelID ): Future[Result[void, ReliabilityError]] {.async: (raises: []).} = + ## Persistence model (PLAN_SNAPSHOT_PERSISTENCE.md phase 2.2): per-entry + ## saveOutgoing / removeOutgoing calls are replaced by a single + ## `trySaveMeta` at the end of the pass, *only* if the buffer actually + ## changed (resend-attempt incremented, or entry expired). Failure is + ## logged but does not abort the pass — next tick reissues a fresh + ## snapshot. try: await rm.lock.acquire() try: @@ -459,6 +465,7 @@ proc checkUnacknowledgedMessages( let channel = rm.channels[channelId] let now = getTime() var newOutgoingBuffer: seq[UnacknowledgedMessage] = @[] + var dirty = false for unackMsg in channel.outgoingBuffer: let elapsed = now - unackMsg.sendTime @@ -468,18 +475,18 @@ proc checkUnacknowledgedMessages( updatedMsg.resendAttempts += 1 updatedMsg.sendTime = now newOutgoingBuffer.add(updatedMsg) - (await rm.persistence.saveOutgoing(channelId, updatedMsg)).isOkOr: - return err(reliabilityErr(error)) + dirty = true else: if not rm.onMessageSent.isNil(): {.cast(raises: []).}: rm.onMessageSent(unackMsg.message.messageId, channelId) - (await rm.persistence.removeOutgoing(channelId, unackMsg.message.messageId)).isOkOr: - return err(reliabilityErr(error)) + dirty = true # entry dropped from newOutgoingBuffer else: newOutgoingBuffer.add(unackMsg) channel.outgoingBuffer = newOutgoingBuffer + if dirty: + await rm.trySaveMeta(channelId, channel) ok() finally: rm.lock.release()