feat(persistence): add snapshot types and codec (phase 0)

Introduce atomic-snapshot persistence types that will replace the current
fine-grained 13-proc Persistence interface. This commit is purely additive:
no existing call site changes, no behaviour change.

New types (sds/types/):
- channel_meta.nim — ChannelMeta (atomic per-channel snapshot blob),
  ChannelData (bootstrap payload), OutgoingRepairKV / IncomingRepairKV
  (flattened map entries for protobuf wire shape).
- history_update.nim — HistoryUpdate (combined append/evict payload for
  the message log).

New codec (sds/snapshot_codec.nim):
- Protobuf encode/decode for all new types, reusing the existing
  SdsMessage and HistoryEntry encoders from sds/protobuf.nim.
- Explicit schemaVersion=1 on ChannelMeta; decoder rejects unknown
  versions loudly rather than silently truncating.
- Time encoded as int64 unix milliseconds.

Tests (tests/test_snapshot_codec.nim):
- 13 round-trip cases covering empty, single-entry, full-buffer, and
  repair-heavy snapshots; ChannelData ordering; HistoryUpdate variants;
  schemaVersion rejection.

Planning artefacts:
- ANALYSIS_SDS_PERSISTENCE.md — problem statement (partial-write
  divergence, chatty call rate, non-fatal-error policy gap).
- ANALYSIS_SNAPSHOT_SAVE_POINTS.md — exact save points per protocol op
  and projected call rates.
- PLAN_SNAPSHOT_PERSISTENCE.md — phased refactor plan; this commit
  implements phase 0.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
NagyZoltanPeter 2026-05-29 12:33:17 +02:00
parent 145e5d6459
commit 979a66360b
No known key found for this signature in database
GPG Key ID: 3E1F97CF4A7B6F42
8 changed files with 1487 additions and 0 deletions

240
ANALYSIS_SDS_PERSISTENCE.md Normal file
View File

@ -0,0 +1,240 @@
# SDS Workflow & Persistence Layer — Honest Analysis
## 1. Architecture Overview
The SDS protocol lives in two layers:
| Layer | Files | Responsibility |
|-------|-------|---------------|
| **Core types + helpers** | `sds/sds_utils.nim`, `sds/types/*.nim` | State types, Lamport clock, history management, bloom filter, dependency checking |
| **Protocol orchestration** | `sds.nim` (root module) | `wrapOutgoingMessage`, `unwrapReceivedMessage`, `markDependenciesMet`, periodic tasks |
The `Persistence` interface (`sds/types/persistence.nim`) is a struct of 13 async proc fields. It is injected at `newReliabilityManager` construction time. Default: `noOpPersistence()` — discards all writes, returns empty snapshots.
## 2. SDS Workflow (excluding `library/`)
### Send path (`sds.nim:87174``wrapOutgoingMessage`)
```
acquire lock
→ getOrCreateChannel (loads from persistence if first time)
→ updateLamportTimestamp → saveLamport
→ serialize bloom filter
→ collect expired SDS-R repair requests → removeOutgoingRepair (per entry)
→ build causal history → setRetrievalHint (per entry, if hint provider set)
→ construct SdsMessage
→ add to outgoingBuffer → saveOutgoing
→ add to bloom filter (memory only)
→ addToHistory → appendLogEntry + removeLogEntry (eviction)
→ serialize → return bytes
release lock
```
**Persistence calls per send**: 35+ depending on repair buffer and history eviction.
### Receive path (`sds.nim:235376``unwrapReceivedMessage`)
```
extractChannelId + deserialize
getOrCreateChannel (may loadAllForChannel)
→ cleanup repair buffers: removeOutgoingRepair, removeIncomingRepair
→ duplicate check (return early if in history)
→ add to bloom filter (memory only)
→ updateLamportTimestamp → saveLamport
→ reviewAckStatus → removeOutgoing (per acked message)
→ process SDS-R repair requests → removeOutgoingRepair + saveIncomingRepair (per entry)
→ check dependencies:
- all met, no buffer deps: addToHistory → appendLogEntry; unblock buffered → saveIncoming per; processIncomingBuffer
- all met, but deps in buffer: saveIncoming
- missing deps: saveIncoming; create SDS-R outgoing entries → saveOutgoingRepair per
return
```
**Persistence calls per receive**: 415+ depending on repair entries, ack status, and dependency resolution depth.
### Background tasks (`sds.nim:487571`)
| Task | Interval | Persistence calls |
|------|----------|-------------------|
| `periodicBufferSweep` | `bufferSweepInterval` | `saveOutgoing` or `removeOutgoing` per resend/expiry |
| `periodicSyncMessage` | `syncMessageInterval` | None (callback only) |
| `periodicRepairSweep` | `repairSweepInterval` | `removeIncomingRepair`, `removeOutgoingRepair` per expired entry |
Background tasks **discard** persistence errors (`discard await rm.runRepairSweep()` at line 568, `discard await rm.checkUnacknowledgedMessages(channelId)` at line 494).
### Bootstrap (`sds/sds_utils.nim:289322``getOrCreateChannel`)
```
loadAllForChannel → ChannelSnapshot
→ populate lamportTimestamp
→ populate messageHistory + rebuild bloom filter from it
→ populate outgoingBuffer, incomingBuffer
→ populate outgoingRepairBuffer, incomingRepairBuffer
```
**Bloom filter is never persisted** — rebuilt from message history. This is documented and intentional.
## 3. Persistence Interface Shape (SQLite Backend Perspective)
The 13 operations map naturally to SQLite tables:
| Operation | SQLite analogue |
|-----------|----------------|
| `saveLamport` | `UPSERT INTO lamport_clocks (channel_id, ts)` |
| `appendLogEntry` | `INSERT INTO message_log (channel_id, msg_id, blob)` |
| `removeLogEntry` | `DELETE FROM message_log WHERE msg_id = ?` |
| `setRetrievalHint` | `UPDATE message_log SET hint = ? WHERE msg_id = ?` |
| `saveOutgoing` | `UPSERT INTO outgoing_buffer (channel_id, msg_id, blob)` |
| `removeOutgoing` | `DELETE FROM outgoing_buffer WHERE msg_id = ?` |
| `saveIncoming` | `UPSERT INTO incoming_buffer (channel_id, msg_id, blob)` |
| `removeIncoming` | `DELETE FROM incoming_buffer WHERE msg_id = ?` |
| `saveOutgoingRepair` | `UPSERT INTO outgoing_repair (channel_id, msg_id, blob)` |
| `removeOutgoingRepair` | `DELETE FROM outgoing_repair WHERE msg_id = ?` |
| `saveIncomingRepair` | `UPSERT INTO incoming_repair (channel_id, msg_id, blob)` |
| `removeIncomingRepair` | `DELETE FROM incoming_repair WHERE msg_id = ?` |
| `dropChannel` | `DELETE FROM * WHERE channel_id = ?` (all tables) |
| `loadAllForChannel` | `SELECT * FROM * WHERE channel_id = ?` (all tables) |
Minimum schema: 5 tables (lamport_clocks, message_log, outgoing_buffer, incoming_buffer, repair_entries with a direction column — or 6 if outgoing/incoming repair are separated).
---
## 4. Risk Analysis
### CRITICAL — No Transactional Atomicity Across Persistence Calls
**Risk level: HIGH**
Every protocol operation makes **multiple independent persistence calls**. Example from `unwrapReceivedMessage`:
```
removeOutgoingRepair ← succeeds
removeIncomingRepair ← succeeds
saveLamport ← succeeds
removeOutgoing ← succeeds
appendLogEntry ← FAILS
```
If `appendLogEntry` fails mid-way, the in-memory state has already been mutated (bloom filter updated, buffers modified, Lamport clock advanced). The function returns `err()` to the caller, but:
1. **In-memory state is now ahead of disk state.** The message is in the bloom filter and history in memory but not on disk.
2. **On restart, the snapshot will be stale.** `loadAllForChannel` rebuilds from disk — the message won't be in history, bloom filter will be rebuilt without it, but other nodes may already consider it delivered.
3. **There is no rollback of prior successful persistence calls.** The Lamport clock is already persisted at the new value, repair buffer entries are already deleted.
**Impact**: After a crash following a partial persistence failure, the node's state diverges from what peers believe. Causal ordering assumptions break. Duplicate delivery or permanent buffering of dependent messages becomes possible.
**Mitigation for a SQLite backend**: Wrap all persistence calls within a single protocol operation in one `BEGIN … COMMIT` transaction. The current interface design (individual proc fields) makes this structurally impossible — there's no transaction boundary concept.
### HIGH — In-Memory Mutation Before Persistence Confirmation
**Risk level: HIGH**
Throughout the codebase, the pattern is:
```nim
# mutate in-memory state
channel.outgoingRepairBuffer.del(msg.messageId) # memory mutated
# then persist
(await rm.persistence.removeOutgoingRepair(...)).isOkOr:
return err(...) # too late to undo memory
```
This appears in `unwrapReceivedMessage` (lines 256261), `wrapOutgoingMessage` (lines 131133), `reviewAckStatus` (lines 7781), and throughout `processIncomingBuffer`.
If persistence fails, the function returns an error, but the in-memory state has already been modified. The caller cannot retry because the state is now inconsistent.
**Exception**: `addToHistory` (sds_utils.nim:9192) correctly mutates memory first then persists, but on failure, the memory mutation is **not rolled back**.
### HIGH — Background Tasks Silently Swallow Persistence Errors
**Risk level: MEDIUM-HIGH**
```nim
# sds.nim:494
discard await rm.checkUnacknowledgedMessages(channelId)
# sds.nim:568
discard await rm.runRepairSweep()
```
`checkUnacknowledgedMessages` modifies `channel.outgoingBuffer` (line 478: `channel.outgoingBuffer = newOutgoingBuffer`) and persists entries. If persistence fails partway through, the in-memory buffer has already been rewritten. The `discard` means the error isn't even visible to any caller.
The comment says "next tick retries" — but next tick operates on the already-mutated in-memory state, not the stale disk state. After a restart, disk state wins and the divergence materializes.
### MEDIUM — History Eviction Is Multi-Step Without Atomicity
**Risk level: MEDIUM**
`addToHistory` (sds_utils.nim:81106):
```nim
channel.messageHistory[msg.messageId] = msg # insert
(await rm.persistence.appendLogEntry(...)).isOkOr: # persist insert
return err(...)
while channel.messageHistory.len > max:
# evict oldest
channel.messageHistory.del(firstKey)
(await rm.persistence.removeLogEntry(...)).isOkOr: # persist eviction
return err(...)
```
If the append succeeds but an eviction `removeLogEntry` fails: on restart, the history will contain entries beyond `maxMessageHistory`. Not catastrophic but violates the capacity invariant and could grow unbounded over repeated failures.
### MEDIUM — `dropChannel` Atomicity Depends Entirely on Backend
```nim
# sds_utils.nim:27-35
(await rm.persistence.dropChannel(channelId)).isOkOr:
return err(reliabilityErr(error))
```
The comment on `persistence.nim:103-106` says "Backends should implement this atomically (e.g. one BEGIN/COMMIT)." Good — but there's no enforcement. A naive SQLite backend that does `DELETE FROM t1; DELETE FROM t2; ...` without a transaction could leave partial state.
### MEDIUM — `ChannelSnapshot.messageHistory` Ordering Assumption
```nim
# persistence.nim:41-42
messageHistory*: seq[SdsMessage]
## MUST be ordered oldest-first.
```
The contract says "MUST be ordered oldest-first" — but there's no validation in `getOrCreateChannel`. If a SQLite backend returns messages in wrong order (e.g. missing `ORDER BY lamport_timestamp, message_id`), the `OrderedTable` insertion order will be wrong, corrupting causal history tail selection and FIFO eviction silently.
### LOW — Bloom Filter Rebuild Correctness
The bloom filter is rebuilt from `messageHistory` on bootstrap — which is capped at `maxMessageHistory` entries. Messages evicted from history won't be in the rebuilt bloom filter. This means:
- After restart, the bloom filter covers fewer messages than before the crash.
- Peers may believe we have messages (based on a pre-crash bloom snapshot they received) that we no longer claim to have.
- This can trigger unnecessary SDS-R repair requests.
This is a **known design tradeoff**, documented in `persistence.nim:30-32`. Impact is limited to repair overhead, not correctness.
### LOW — No Backend Exists in This Repo
There is no SQLite backend (or any real backend) in nim-sds. The `noOpPersistence()` default means:
- All tests run without durability.
- The persistence interface is untested against real I/O failure modes.
- Any bugs in the interface contract (ordering, atomicity) won't surface until a backend is integrated.
---
## 5. Summary Verdict
| Area | Grade | Notes |
|------|-------|-------|
| Interface design | **B+** | Clean, well-documented, 13 focused operations. Missing: transaction boundaries. |
| Error propagation | **B** | Consistent `Result[T, string]``rePersistenceError` mapping. But background tasks discard errors. |
| In-memory/disk consistency | **D** | No rollback on partial failure. Memory-first mutation pattern throughout. |
| Atomicity | **D** | Multi-call operations have no transaction concept. Partial writes are structurally possible. |
| Bootstrap correctness | **B-** | Works correctly IF backend orders history right. No validation. Bloom rebuild is lossy by design. |
| Test coverage of persistence | **F** | Zero tests exercise a real backend. All tests use noOpPersistence. |
### Recommendations
1. **Add a transaction/batch concept to the Persistence interface.** Even a simple `beginBatch`/`commitBatch` pair would let a SQLite backend wrap multi-step operations atomically.
2. **Reverse the mutation order**: persist first, mutate memory on success. This eliminates the in-memory-ahead-of-disk divergence.
3. **Don't discard background task results.** At minimum, log them. Better: track failure counts and surface them via a health check callback.
4. **Validate `ChannelSnapshot` ordering in `getOrCreateChannel`.** Assert `lamportTimestamp` monotonicity on the loaded `messageHistory`.
5. **Write integration tests with a real (in-memory SQLite) backend** that exercises failure injection — kill persistence mid-operation and verify recovery.

View File

@ -0,0 +1,71 @@
# SDS State Snapshot — Save Points & Call Rate
Snapshot = `saveChannelMeta(channelId, ChannelMeta)` carrying: `lamportTimestamp`,
`outgoingBuffer`, `incomingBuffer`, `outgoingRepairBuffer`, `incomingRepairBuffer`.
(Bloom filter excluded — rebuilt from history. History persisted separately via
`updateHistory(append, evict)`.)
**Rule: exactly one snapshot save per protocol operation, fired at the end, under
the lock, only if meta actually changed (dirty flag).**
## Save Points
| # | Operation | Save? | When | Condition |
|---|-----------|-------|------|-----------|
| 1 | `wrapOutgoingMessage` (sds.nim:163) | **1** | end, before `serializeMessage` | always (lamport + outgoingBuffer always mutate) |
| 2 | `unwrapReceivedMessage` (sds.nim:373) | **0 or 1** | end, before `return` | 0 on duplicate early-return (line 264); else 1 — covers all 3 paths |
| 3 | `markDependenciesMet` (sds.nim:415) | **1** | end, after `processIncomingBuffer` | if any dep matched |
| 4 | `checkUnacknowledgedMessages` (sds.nim:478) | **0 or 1** | end of pass | only if buffer changed (resend/expiry) |
| 5 | `runRepairSweep` (sds.nim:556) | **0..C** | per channel, end of channel loop | one per *dirty* channel only |
`processIncomingBuffer` and `reviewAckStatus` become pure in-memory helpers — they
never save; the calling op (2 or 3) persists once at the end.
## Rate Model (per channel)
Let `S` = sends/s, `R` = non-duplicate receives/s.
```
snapshot_rate ≈ S + R (foreground, dominant)
+ 1/repairSweepInterval if repair buffers dirty = 0.2/s
+ 1/bufferSweepInterval if outgoing buffer dirty = 0.0167/s
```
`repairSweepInterval = 5s`, `bufferSweepInterval = 60s`.
### Background floor (zero traffic)
With dirty-flag guard: **0 saves/s** on a quiet channel (empty buffers → nothing to
persist). Without the guard, the 5s repair sweep alone would force 0.2 saves/s/channel
even when idle — so the dirty-flag guard is mandatory, not optional.
### Worked examples
| Scenario | Channels | Per-ch S+R | Foreground | Background | Total snapshot/s |
|----------|----------|-----------|-----------|-----------|------------------|
| Idle | 10 | 0 | 0 | 0 (guarded) | **0** |
| Light chat | 5 | 1 | 5 | ~0.2 | **~5** |
| Busy | 10 | 6 | 60 | ~2 | **~62** |
| Heavy / lossy (SDS-R churning) | 10 | 20 | 200 | ~2 | **~202** |
Background is negligible vs foreground whenever there is traffic. The snapshot rate
is essentially **one write per protocol message** — bounded by network throughput,
not by internal mutation count.
## Why this is safe for SQLite-on-a-thread
- 1 snapshot write per message → 1 cross-thread round-trip, 1 `UPSERT` of a single
blob row, foldable into the same transaction as the `updateHistory` call.
- Snapshot blob is **small**: buffer sizes are bounded by traffic-in-flight, not by
`maxMessageHistory`. Typical < a few KB even under load.
- vs. current fine-grained interface (1015 calls/op), this is a **510× reduction**
in cross-thread round-trips and SQLite operations, with atomic crash consistency.
## Snapshot vs History rate (separation payoff)
| | Snapshot (`saveChannelMeta`) | History (`updateHistory`) |
|---|---|---|
| Append rate | n/a | S + R_delivered (every delivered msg) |
| Evict | n/a | batched, only past maxMessageHistory=1000 |
| Save rate | S + R (every msg) | S + R_delivered |
| Blob size | small (buffers) | large but append-only |
| Coupling | both fire together at op end → 1 SQLite txn |

View File

@ -0,0 +1,502 @@
# SDS Snapshot Persistence — Design & Refactor Plan
Companion to `ANALYSIS_SDS_PERSISTENCE.md` (problem statement) and
`ANALYSIS_SNAPSHOT_SAVE_POINTS.md` (where & how often we save).
This document defines:
1. **Data structures** to be persisted (snapshot + history)
2. **New `Persistence` interface** (5 procs replacing the current 13)
3. **Refactor plan** — phased, test-gated, backward-compatible interim state
---
## 1. Data Structure Design
### 1.1 Design principles
| Principle | Reason |
|-----------|--------|
| Snapshot is **one atomic blob** | Eliminates partial-write divergence (the root cause from ANALYSIS_SDS_PERSISTENCE.md §4) |
| Snapshot is **small** (buffers only, no history) | Keeps per-op write cost ≤ a few KB; foldable into one SQLite txn |
| History is **separate, append-batched** | Large data, append-mostly, queryable by msg_id for SDS-R |
| Bloom filter is **not persisted** | Already the case — rebuilt from history on bootstrap |
| **Versioned wire format** | Allow future schema evolution without breaking on-disk data |
| **Protobuf** serialization | Project already uses it (`sds/protobuf.nim`); keeps one codec |
### 1.2 `ChannelMeta` — the snapshot payload
```nim
# sds/types/channel_meta.nim (new file)
import std/[tables, times]
import ./sds_message_id
import ./unacknowledged_message
import ./incoming_message
import ./repair_entry
export
sds_message_id, unacknowledged_message, incoming_message, repair_entry
const ChannelMetaSchemaVersion* = 1'u32
type ChannelMeta* = object
## Atomic snapshot of the fast-changing per-channel protocol state.
## Persisted as one blob per `saveChannelMeta` call. Bloom filter is
## intentionally absent — rebuilt from the message log on bootstrap.
## Message history is also absent — persisted separately via `updateHistory`
## because it is large and append-mostly.
schemaVersion*: uint32
## On-disk format version. Backends MUST refuse to load a meta whose
## version they don't know how to decode rather than silently truncating
## or zero-filling unknown fields.
lamportTimestamp*: int64
outgoingBuffer*: seq[UnacknowledgedMessage]
## Sent-but-not-yet-acked messages. Order matters: the protocol iterates
## in insertion order for resend-attempt accounting.
incomingBuffer*: seq[IncomingMessage]
## Received-but-not-yet-deliverable messages, each carrying its
## still-missing dependency set. Order is irrelevant; flattened from
## the in-memory `Table` for wire-friendliness.
outgoingRepairBuffer*: seq[OutgoingRepairKV]
incomingRepairBuffer*: seq[IncomingRepairKV]
## SDS-R repair buffers, flattened from in-memory `Table` to seq of
## (key, value) for stable serialization.
type
OutgoingRepairKV* = object
messageId*: SdsMessageID
entry*: OutgoingRepairEntry
IncomingRepairKV* = object
messageId*: SdsMessageID
entry*: IncomingRepairEntry
```
**Why flatten the `Table`s to `seq`s?**
Protobuf has no native map of `SdsMessageID → object`. Flattening to `seq` of KV
objects gives deterministic encoding and trivial decode-time rebuild of the
in-memory `Table`. The cost is one extra alloc per entry on encode/decode —
negligible vs. the I/O it replaces.
**Why an explicit `schemaVersion`?**
The current interface has no version field. Adding fields later (e.g., a new
SDS-R counter) silently truncates old data on load. The version makes
incompatibility explicit; backends fail loud instead of corrupting state.
### 1.3 `HistoryAppend` — the history-write payload
```nim
# extension to sds/types/persistence.nim or new history_update.nim
type HistoryUpdate* = object
## Combined append/evict for one protocol operation. Empty `append` and
## empty `evict` ⇒ caller should skip the call entirely.
append*: seq[SdsMessage]
## New delivered messages, in delivery order (matters for SDS-R retrieval
## hint correctness and FIFO eviction on the backend side).
evict*: seq[SdsMessageID]
## Oldest messages now past `maxMessageHistory`. Backend deletes by id.
```
`append` is a `seq` (not a single `SdsMessage`) because `processIncomingBuffer`
can deliver a chain of unblocked messages in one call to the parent op
(`unwrapReceivedMessage` / `markDependenciesMet`). Sending them all in one
`updateHistory` call keeps the "one save per protocol op" guarantee.
### 1.4 `ChannelData` — the bootstrap payload
```nim
type ChannelData* = object
## Returned by `loadChannel` on `getOrCreateChannel` bootstrap.
## Carries everything needed to rebuild the in-memory `ChannelContext`
## from a clean restart.
meta*: ChannelMeta
messageHistory*: seq[SdsMessage]
## MUST be ordered oldest-first (lamportTimestamp ASC, tie-break msg_id
## ASC). Bloom filter is rebuilt from this on load; FIFO eviction relies
## on this ordering. Backend contract; validated by nim-sds on load.
```
### 1.5 Storage encoding (internal to nim-sds — not the SDS network wire format)
**Disambiguation.** The SDS **network** wire format (bytes peers exchange) is
handled by the existing `sds/protobuf.nim` and is untouched by this plan.
What this section defines is the **storage** encoding: the codec nim-sds uses
to turn a `ChannelMeta` Nim object into the opaque `seq[byte]` blob it hands
to `saveChannelMeta`. The KV persistence worker treats that blob as
fully opaque — it stores `(key: bytes) → (value: bytes)` and does its own
buffering/batching of writes. Whether nim-sds uses protobuf, CBOR, or
anything else is invisible to the worker.
**Why this codec exists at all.** The worker stores bytes; something must
produce those bytes from the in-memory `ChannelMeta`. That responsibility
sits inside nim-sds, on the producer side of the persistence boundary. It
runs synchronously inside `saveChannelMeta`, before the blob crosses to the
worker.
**Choice: protobuf, reusing the existing toolchain.**
- `sds/protobuf.nim` is already a dependency and already encodes `SdsMessage`
- Field-number versioning composes naturally with the explicit `schemaVersion`
- Encoders for the new types compose on top of the existing `SdsMessage` one
— no new codec to maintain
**Encoders to add:**
- `UnacknowledgedMessage` (wraps `SdsMessage` + `sendTime: int64` unix-ms + `resendAttempts: uint32`)
- `IncomingMessage` (wraps `SdsMessage` + `missingDeps: repeated bytes`)
- `OutgoingRepairEntry` / `IncomingRepairEntry` (HistoryEntry + Time + optional cachedMessage)
- `OutgoingRepairKV` / `IncomingRepairKV` (msgId + entry — flattened map; see §6)
- `ChannelMeta` (top-level)
`Time` is serialized as `int64` unix milliseconds. The wall-clock semantics
are already used by the protocol itself (`getTime()` in `wrapOutgoingMessage`).
**On durability.** Because the worker buffers blobs, `saveChannelMeta`
returning `ok()` means "the blob was accepted by the worker," not "the blob
is fsynced." That is the worker's contract to manage. nim-sds's own
invariant — one snapshot save per protocol op, after all in-memory mutation
completes — is satisfied as soon as the worker accepts the blob, because
on recovery the worker replays its own buffer in order, so the snapshot
nim-sds last issued is the snapshot nim-sds will see on next `loadChannel`.
---
## 2. New `Persistence` Interface
Replace the current 13 procs in `sds/types/persistence.nim` with **5**:
```nim
type Persistence* = object
saveChannelMeta*: proc(
channelId: SdsChannelID, meta: ChannelMeta
): Future[Result[void, string]] {.async: (raises: []), gcsafe.}
updateHistory*: proc(
channelId: SdsChannelID, update: HistoryUpdate
): Future[Result[void, string]] {.async: (raises: []), gcsafe.}
loadChannel*: proc(
channelId: SdsChannelID
): Future[Result[ChannelData, string]] {.async: (raises: []), gcsafe.}
dropChannel*: proc(
channelId: SdsChannelID
): Future[Result[void, string]] {.async: (raises: []), gcsafe.}
setRetrievalHint*: proc(
msgId: SdsMessageID, hint: seq[byte]
): Future[Result[void, string]] {.async: (raises: []), gcsafe.}
```
### Atomicity contract (documented in the interface comment)
> Backends SHOULD execute `saveChannelMeta` and the immediately following
> `updateHistory` call within a single transaction when both arrive together
> from the same protocol op. nim-sds always issues them back-to-back under
> the channel lock, with no `await`-of-other-work in between, so the backend
> can either (a) buffer `saveChannelMeta` until the next `updateHistory` or
> `flush`, or (b) use a `txn(channelId)` handle. Variant (b) is cleaner; see
> §3.2 for the optional `beginTxn`/`commitTxn` extension.
### Backend assumption: schema-agnostic KV blob store
The target backend is the existing schema-agnostic KV persistence module in
the sibling repo. It stores opaque `(key: bytes) → (value: bytes)` blobs with
its own crash-consistency guarantees. Therefore:
- nim-sds owns the wire format end-to-end (no SQL schema to coordinate)
- The "single transaction per op" requirement reduces to "two KV puts per
op": `meta:<channelId>` and `history:<channelId>:<msgId>` (one or more)
- The backend's existing batch/atomicity primitives are what guarantee
crash consistency — nim-sds doesn't need transaction-handle plumbing
---
## 3. Refactor Plan
### Phase 0 — Pre-work (no behavior change)
| Step | File(s) | Verify |
|------|---------|--------|
| 0.1 Add `ChannelMeta`, `HistoryUpdate`, `ChannelData` types | new `sds/types/channel_meta.nim`, `sds/types/history_update.nim` | `nimble c sds.nim` compiles |
| 0.2 Add protobuf encoders/decoders for new types | extend `sds/protobuf.nim` | round-trip unit tests |
| 0.3 Add `tests/test_snapshot_codec.nim` | new test file | `nimble test` passes; covers empty, single-entry, full-buffer, repair-heavy cases |
### Phase 1 — New interface alongside old
| Step | File(s) | Verify |
|------|---------|--------|
| 1.1 Add new 5-proc `Persistence` type as `PersistenceV2` (rename later) | `sds/types/persistence.nim` | compiles; old interface still works |
| 1.2 Add `noOpPersistenceV2()` for tests | same | `nimble test` passes |
| 1.3 Add `ReliabilityManager.persistenceV2` field, optional | `sds/types/reliability_manager.nim` | one of `persistence` / `persistenceV2` is in use; assert at construction |
### Phase 2 — Migrate protocol ops, one at a time
For each op, the pattern is:
1. Add a `dirty: bool` local accumulator
2. Replace inner `await rm.persistence.X` calls with in-memory mutation + set `dirty = true`
3. At the end of the op (under lock, before `return`), emit at most one `saveChannelMeta` and at most one `updateHistory` call
Order (least risky → highest risk):
| Step | Op | File:line | Verify |
|------|-----|-----------|--------|
| 2.1 | `runRepairSweep` | sds.nim:510 | repair sweep unit test, with failure injection |
| 2.2 | `checkUnacknowledgedMessages` | sds.nim:445 | resend-flow integration test |
| 2.3 | `processIncomingBuffer` → pure (no persistence) | sds.nim:176 | callers will persist; covered by 2.4/2.5 |
| 2.4 | `reviewAckStatus` → pure (no persistence) | sds.nim:36 | covered by 2.5 |
| 2.5 | `unwrapReceivedMessage` | sds.nim:235 | full receive-path tests (paths A/B/C); duplicate early-return must skip save |
| 2.6 | `wrapOutgoingMessage` | sds.nim:87 | send-path tests |
| 2.7 | `markDependenciesMet` | sds.nim:378 | dep-resolution tests |
| 2.8 | `addToHistory` → return appended/evicted lists instead of persisting | sds_utils.nim:81 | covered by 2.5/2.6/2.7 |
| 2.9 | `updateLamportTimestamp` → pure (no persistence) | sds_utils.nim:108 | covered |
| 2.10 | `getOrCreateChannel` use `loadChannel` | sds_utils.nim:289 | bootstrap unit test |
| 2.11 | `removeChannel`, `resetReliabilityManager``dropChannel` | sds_utils.nim, sds.nim | wipe tests |
Each step is a small commit. After every step: `nimble test` + `gitnexus_detect_changes` to confirm scope.
### Phase 3 — Remove the old interface
| Step | File(s) | Verify |
|------|---------|--------|
| 3.1 Delete old 13-proc `Persistence` fields | `sds/types/persistence.nim` | compile fails on stragglers — fix |
| 3.2 Rename `PersistenceV2``Persistence` | all call sites | full test suite |
| 3.3 Delete `noOpPersistence` (old), keep `noOpPersistenceV2` as `noOpPersistence` | same | tests pass |
| 3.4 Update `library/` FFI thread to construct the new `Persistence` | `library/sds_thread/...` | FFI smoke test on macOS + Linux |
| 3.5 Update `Broker_FFI_API.md` and any docs referencing the old contract | docs | review |
### Phase 4 — (removed)
A reference backend is **not** part of this plan. The schema-agnostic KV
persistence module in the sibling repo is the production backend. Its
authors own the integration adapter that maps the 5 `Persistence` procs onto
KV puts/gets. nim-sds only needs to expose the interface and a working
`noOpPersistence` for its own tests.
---
## 4. Risk Mitigation During Refactor
| Risk | Mitigation |
|------|------------|
| Mid-refactor inconsistency (some ops on new interface, some on old) | Phase 2 keeps both interfaces wired — only one is active per RM via a constructor switch; integration tests run against both |
| Behavior change masked by passing tests | Add `tests/test_persistence_contract.nim` that asserts exact call count per protocol op (before vs after must match the table in `ANALYSIS_SNAPSHOT_SAVE_POINTS.md`) |
| Memory-first mutation pattern preserved by accident | Move *all* persistence calls to the end of the op, after the lock-held mutation block completes. The dirty flag is set *during* mutation; the save fires *after*. If save fails, the in-memory state is still the source of truth for the next op — but now there's only one possible point of divergence per op, not 10. |
| FFI thread breakage | Phase 3.4 is the FFI cutover; smoke test on both `--mm:refc` and `--mm:orc`, macOS and Linux, before declaring done. ASAN run on the FFI example. |
| Snapshot blob growth surprises | Add a `len()` log on `saveChannelMeta` for the first week of integration; fail-loud if any blob exceeds (configurable) 1 MB |
---
## 5. Acceptance Criteria
- [ ] All existing `nimble test` cases pass against the new interface
- [ ] New `tests/test_persistence_contract.nim` enforces exactly the call counts from `ANALYSIS_SNAPSHOT_SAVE_POINTS.md` §"Save Points" table
- [ ] New `tests/test_snapshot_codec.nim` round-trips every `ChannelMeta` variant
- [ ] Failure-injection test: kill persistence between `saveChannelMeta` and `updateHistory` → on restart, the manager loads a self-consistent snapshot (no orphan history entries; no dangling buffer references)
- [ ] FFI smoke (`liblogosdelivery`-style) runs clean on macOS+refc, macOS+orc, Linux+refc, Linux+orc
- [ ] `Broker_FFI_API.md` reflects the new contract
- [ ] Bench: snapshot save rate matches the predicted `S + R` (foreground) and ≤ 0.2/s/channel background floor (with dirty-guard) under a synthetic 50-msg/s workload
- [ ] Snapshot blob size on the bench workload matches the estimate in §7 within 2×; outliers logged
---
## 6. Codec & flattening — where protobuf comes in
### Codec choice
The KV backend stores opaque blobs. The codec that produces the blob is
**internal to nim-sds**. Protobuf is the natural choice because:
- The project already uses protobuf for the SDS wire format
(`sds/protobuf.nim` encodes `SdsMessage`). One codec, one toolchain.
- Field-number versioning gives forward/backward compatibility for free —
pairs naturally with the `schemaVersion` field.
- Repeated message fields encode efficiently and round-trip cleanly.
Concretely: `ChannelMeta` is a top-level protobuf message; `saveChannelMeta`
serializes it to `seq[byte]` and the backend writes that under
`meta:<channelId>`. On load, the backend returns the bytes; nim-sds
deserializes.
### Why flatten `Table[Id, Entry]` to `seq[KV]`
Protobuf's wire format has no first-class "map of bytes-key → message-value"
type in the minimal subset used by `sds/protobuf.nim` (the
`nim-libp2p`-style `minprotobuf`). Even the full proto3 `map<K, V>` is
encoded on the wire as **repeated KV messages anyway** — the map syntax is
just sugar over `repeated Entry { key = 1; value = 2; }`.
So flattening is making the wire shape explicit:
```
ChannelMeta {
...
repeated OutgoingRepairKV outgoingRepairBuffer = 5;
repeated IncomingRepairKV incomingRepairBuffer = 6;
}
OutgoingRepairKV {
bytes messageId = 1;
OutgoingRepairEntry entry = 2;
}
```
The `Table` exists only in memory; the wire and disk form is the flat seq.
Decode rebuilds the `Table` by iterating the seq. Cost: one alloc per entry
on encode/decode — negligible against the I/O it replaces.
`outgoingBuffer` (already a `seq`) and `incomingBuffer` (a `Table` flattened
to `seq[IncomingMessage]` — the key is `message.messageId` so no separate KV
wrapper is needed) follow the same logic.
---
## 7. Snapshot size estimates
Assumptions (call out — every number below derives from these):
| Quantity | Assumed bytes | Source |
|----------|---------------|--------|
| `SdsMessageID` | 32 | typical content-addressed id |
| `SdsParticipantID` | 32 | same |
| `SdsChannelID` | 32 | same |
| `bloomFilter` (serialized, in an `SdsMessage`) | 256 | derived from default `bloomFilterCapacity` × `errorRate` |
| `causalHistory` | 10 entries × ~40 B | `maxCausalHistory = 10` from `reliability_config.nim` |
| `repairRequest` in a wire SdsMessage | up to 3 × ~40 B | `maxRepairRequests = 3` |
| Application payload (`content`) — small | 100 B | typical short chat payload |
| Application payload — medium | 1 KB | richer payload |
| Protobuf framing | ~10% overhead | tag bytes + varints |
**One `SdsMessage` on the wire (no content):** ~700 B
**One `SdsMessage` with 100 B content:** ~800 B
**One `SdsMessage` with 1 KB content:** ~1.7 KB
Per-entry sizes inside `ChannelMeta`:
| Entry | Size (100 B payload) | Size (1 KB payload) | Notes |
|-------|----------------------|---------------------|-------|
| `UnacknowledgedMessage` | ~820 B | ~1.7 KB | SdsMessage + sendTime + resendAttempts |
| `IncomingMessage` | ~950 B | ~1.9 KB | SdsMessage + missingDeps (avg 3 × 32 B) |
| `OutgoingRepairKV` | ~110 B | ~110 B | no cached message, payload-independent |
| `IncomingRepairKV` | ~920 B | ~1.8 KB | **cached serialized SdsMessage dominates** |
Fixed overhead per `ChannelMeta`: ~30 B (schemaVersion + lamportTimestamp + framing).
### Per-channel snapshot size by load
| Profile | outBuf | inBuf | outRepair | inRepair | Size (100 B payload) | Size (1 KB payload) |
|---------|--------|-------|-----------|----------|----------------------|---------------------|
| Idle | 0 | 0 | 0 | 0 | **~30 B** | ~30 B |
| Light chat | 2 | 0 | 0 | 0 | **~1.7 KB** | ~3.5 KB |
| Steady | 5 | 1 | 1 | 1 | **~6 KB** | ~12 KB |
| Busy | 10 | 3 | 3 | 3 | **~14 KB** | ~28 KB |
| Heavy, lossy network (SDS-R churning) | 30 | 10 | 20 | 10 | **~45 KB** | ~95 KB |
| Pathological (resend window full, big repair caches) | 50 | 20 | 30 | 20 | **~75 KB** | ~155 KB |
### Where the bytes go
| Load profile | Dominant contributor |
|--------------|----------------------|
| Idle / light | Fixed overhead + outgoingBuffer |
| Steady / busy | outgoingBuffer (each entry ~1 KB+) |
| Heavy / lossy | **incomingRepairBuffer** — each KV entry caches a full serialized message for rebroadcast. This is the single biggest amplifier; 20 entries with 1 KB payloads ≈ 36 KB on their own. |
### Implications
1. **Typical write is small (130 KB).** Comfortably foldable into the
per-op KV write cost; the backend's blob-write cost is bounded.
2. **`IncomingRepairEntry.cachedMessage` is the size lever to watch.**
Under heavy SDS-R activity it dominates the snapshot. If snapshot size
becomes a bottleneck, the optimization is to drop the cache from the
snapshot and re-serialize from `messageHistory` on demand — at the cost
of more CPU and the corner case where the requested message has been
evicted from history between snapshot save and repair sweep firing.
3. **Heavy profile (~95 KB) at the predicted 6/s/ch save rate = ~570 KB/s
per channel.** A 10-channel heavy node is then ~5.7 MB/s of snapshot
churn — well within KV backend throughput, but worth a real bench
before declaring it OK.
4. **The 1 MB hard cap** suggested in §4 stays appropriate; pathological
profile at 1 KB payload is ~155 KB, leaving healthy headroom.
---
## 8. Persistence failure policy — non-fatal, best-effort
**Change from current branch.** The current implementation treats every
`rePersistenceError` as fatal: the protocol op returns `err()`, the caller
sees a failure, and normal SDS operation breaks even though the in-memory
state is fine. This is wrong for the snapshot model.
**New policy.**
- In-memory state is the **source of truth** for protocol correctness.
Lamport clock, buffers, history, bloom filter — all live in
`ChannelContext` and are mutated under the lock before any persistence
call. SDS message processing never depends on disk state for correctness
within a session.
- Persistence is **best-effort durability**. A failed `saveChannelMeta` or
`updateHistory` does **not** abort the operation, does not return `err`
to the FFI caller, and does not corrupt protocol semantics. The next op
will issue its own snapshot — if that succeeds, on-disk state is
re-synchronised; if it also fails, the one after that tries again.
- Snapshot writes are **idempotent and self-contained.** Each
`saveChannelMeta` blob is the complete current `ChannelMeta`. A missed
write is fully recovered by any later successful write — no log of
deltas to replay, no compensating action needed.
- Bootstrap loss tolerance: if `loadChannel` fails or returns stale state
on restart, the manager starts from whatever it could load (possibly
empty). Peer traffic and SDS-R repair will re-populate it. This is the
expected behaviour of the bloom-rebuilt-from-history design extended to
the meta blob.
**Implementation pattern.** At each save point:
```nim
# end of wrapOutgoingMessage / unwrapReceivedMessage / etc.
if dirty:
let saveRes = await rm.persistence.saveChannelMeta(channelId, snapshot)
if saveRes.isErr:
warn "snapshot save failed; in-memory state unaffected, next op will retry",
channelId = channelId, detail = saveRes.error
# DO NOT return err; protocol op succeeded.
if appended.len > 0 or evicted.len > 0:
let histRes = await rm.persistence.updateHistory(channelId,
HistoryUpdate(append: appended, evict: evicted))
if histRes.isErr:
warn "history update failed; in-memory log authoritative, next op will retry",
channelId = channelId, detail = histRes.error
return ok(serializedMessage) # protocol op succeeded regardless
```
**What still returns `err(rePersistenceError)`.** Only operations whose
**semantic intent** is durability:
- `removeChannel`, `resetReliabilityManager` → must confirm `dropChannel`
succeeded; otherwise the caller may assume disk is clean when it isn't.
- `getOrCreateChannel` on first bootstrap → if `loadChannel` errors (vs.
returns empty), surface it so the caller can decide between "start
fresh in memory" and "abort init".
**Impact on §5 acceptance criteria.** Add: failure-injection test must
prove that `wrapOutgoingMessage`, `unwrapReceivedMessage`,
`markDependenciesMet`, `checkUnacknowledgedMessages`, `runRepairSweep` all
return `ok` under 100%-failing persistence, with correct in-memory
behaviour and a recovered on-disk state after persistence is restored.
**Why this is safe.** Each snapshot is a full self-contained blob;
partial-write divergence (the original ANALYSIS §4 critical risk) is
already eliminated by the atomic-blob design. Once that's true, treating
persistence failure as fatal is pure downside — it propagates a
recoverable I/O hiccup into a user-visible protocol failure for no
correctness gain.
---
## 9. What this plan deliberately does NOT do
- Does not add transaction handles — the KV backend's batch primitive is sufficient
- Does not ship a reference backend — the schema-agnostic KV module in the sibling repo is the production backend
- Does not change the bloom filter persistence policy (still rebuilt from history)
- Does not introduce SDS-R repair extension changes
- Does not touch the FFI surface shape beyond construction of `Persistence` — the existing C API is unchanged
- Does not auto-migrate on-disk data from an older format (no production data exists yet; schemaVersion=1 starts clean)

View File

@ -68,6 +68,7 @@ task test, "Run the test suite":
exec "nim c -r --outdir:build tests/test_bloom.nim"
exec "nim c -r --outdir:build tests/test_reliability.nim"
exec "nim c -r --outdir:build tests/test_persistence.nim"
exec "nim c -r --outdir:build tests/test_snapshot_codec.nim"
task libsdsDynamicWindows, "Generate bindings":
let outLibNameAndExt = "libsds.dll"

326
sds/snapshot_codec.nim Normal file
View File

@ -0,0 +1,326 @@
## Storage encoding for the snapshot persistence types.
##
## This is the codec nim-sds runs on its side of the persistence boundary
## to turn a `ChannelMeta` (or `ChannelData`, or `HistoryUpdate`) into the
## opaque `seq[byte]` blob the KV persistence backend stores. The KV
## backend treats the blob as fully opaque. See PLAN_SNAPSHOT_PERSISTENCE.md
## §1.5 for why this codec exists at all and §6 for the choice of protobuf.
##
## This is NOT the SDS network wire format — that lives in `sds/protobuf.nim`
## and is unchanged. Encoders for `SdsMessage` and `HistoryEntry` are reused
## from there to avoid maintaining two codecs for the same shape.
{.push raises: [].}
import std/[sets, times]
import libp2p/protobuf/minprotobuf
import ./types/[
channel_meta, history_update, sds_message, sds_message_id, history_entry,
unacknowledged_message, incoming_message, repair_entry, reliability_error,
]
import ./protobufutil
import ./protobuf as wire
export channel_meta, history_update
# ---------------------------------------------------------------------------
# Time <-> int64 unix milliseconds
# ---------------------------------------------------------------------------
# The protocol uses `getTime()` (wall clock). For wire stability we encode
# as unix milliseconds in int64 (zigzag not needed — pre-1970 values do not
# occur in practice). Sub-millisecond precision is intentionally dropped:
# the protocol's repair backoff windows are seconds-scale.
proc toUnixMs(t: Time): int64 =
t.toUnix * 1000'i64 + int64(t.nanosecond div 1_000_000)
proc fromUnixMs(ms: int64): Time =
let secs = ms div 1000
let nanos = (ms mod 1000).int * 1_000_000
initTime(secs, nanos)
# ---------------------------------------------------------------------------
# UnacknowledgedMessage
# ---------------------------------------------------------------------------
proc encodeUnacked(u: UnacknowledgedMessage): ProtoBuffer =
var pb = initProtoBuffer()
let msgPb = wire.encode(u.message)
pb.write(1, msgPb.buffer)
pb.write(2, uint64(u.sendTime.toUnixMs))
pb.write(3, uint32(u.resendAttempts))
pb.finish()
pb
proc decodeUnacked(buf: seq[byte]): ProtobufResult[UnacknowledgedMessage] =
let pb = initProtoBuffer(buf)
var msgBytes: seq[byte]
if not ?pb.getField(1, msgBytes):
return err(ProtobufError.missingRequiredField("UnacknowledgedMessage.message"))
let msg = SdsMessage.decode(msgBytes).valueOr:
return err(ProtobufError.missingRequiredField("UnacknowledgedMessage.message"))
var sendMs: uint64
if not ?pb.getField(2, sendMs):
return err(ProtobufError.missingRequiredField("UnacknowledgedMessage.sendTime"))
var attempts: uint32
discard pb.getField(3, attempts)
ok(
UnacknowledgedMessage.init(
message = msg,
sendTime = fromUnixMs(int64(sendMs)),
resendAttempts = int(attempts),
)
)
# ---------------------------------------------------------------------------
# IncomingMessage
# ---------------------------------------------------------------------------
proc encodeIncoming(m: IncomingMessage): ProtoBuffer =
var pb = initProtoBuffer()
let msgPb = wire.encode(m.message)
pb.write(1, msgPb.buffer)
for dep in m.missingDeps:
pb.write(2, dep) # SdsMessageID is string
pb.finish()
pb
proc decodeIncoming(buf: seq[byte]): ProtobufResult[IncomingMessage] =
let pb = initProtoBuffer(buf)
var msgBytes: seq[byte]
if not ?pb.getField(1, msgBytes):
return err(ProtobufError.missingRequiredField("IncomingMessage.message"))
let msg = SdsMessage.decode(msgBytes).valueOr:
return err(ProtobufError.missingRequiredField("IncomingMessage.message"))
var deps: seq[SdsMessageID]
discard pb.getRepeatedField(2, deps)
var depSet = initHashSet[SdsMessageID]()
for d in deps:
depSet.incl(d)
ok(IncomingMessage.init(message = msg, missingDeps = depSet))
# ---------------------------------------------------------------------------
# OutgoingRepairEntry / OutgoingRepairKV
# ---------------------------------------------------------------------------
proc encodeOutRepairEntry(e: OutgoingRepairEntry): ProtoBuffer =
var pb = initProtoBuffer()
let histPb = wire.encodeHistoryEntry(e.outHistEntry)
pb.write(1, histPb.buffer)
pb.write(2, uint64(e.minTimeRepairReq.toUnixMs))
pb.finish()
pb
proc decodeOutRepairEntry(buf: seq[byte]): ProtobufResult[OutgoingRepairEntry] =
let pb = initProtoBuffer(buf)
var histBytes: seq[byte]
if not ?pb.getField(1, histBytes):
return err(ProtobufError.missingRequiredField("OutgoingRepairEntry.outHistEntry"))
let histPb = initProtoBuffer(histBytes)
let entry = ?wire.decodeHistoryEntry(histPb)
var ms: uint64
if not ?pb.getField(2, ms):
return err(ProtobufError.missingRequiredField("OutgoingRepairEntry.minTimeRepairReq"))
ok(
OutgoingRepairEntry.init(
outHistEntry = entry, minTimeRepairReq = fromUnixMs(int64(ms))
)
)
proc encodeOutRepairKV(kv: OutgoingRepairKV): ProtoBuffer =
var pb = initProtoBuffer()
pb.write(1, kv.messageId)
let entryPb = encodeOutRepairEntry(kv.entry)
pb.write(2, entryPb.buffer)
pb.finish()
pb
proc decodeOutRepairKV(buf: seq[byte]): ProtobufResult[OutgoingRepairKV] =
let pb = initProtoBuffer(buf)
var msgId: SdsMessageID
if not ?pb.getField(1, msgId):
return err(ProtobufError.missingRequiredField("OutgoingRepairKV.messageId"))
var entryBytes: seq[byte]
if not ?pb.getField(2, entryBytes):
return err(ProtobufError.missingRequiredField("OutgoingRepairKV.entry"))
let entry = ?decodeOutRepairEntry(entryBytes)
ok(OutgoingRepairKV(messageId: msgId, entry: entry))
# ---------------------------------------------------------------------------
# IncomingRepairEntry / IncomingRepairKV
# ---------------------------------------------------------------------------
proc encodeInRepairEntry(e: IncomingRepairEntry): ProtoBuffer =
var pb = initProtoBuffer()
let histPb = wire.encodeHistoryEntry(e.inHistEntry)
pb.write(1, histPb.buffer)
pb.write(2, e.cachedMessage)
pb.write(3, uint64(e.minTimeRepairResp.toUnixMs))
pb.finish()
pb
proc decodeInRepairEntry(buf: seq[byte]): ProtobufResult[IncomingRepairEntry] =
let pb = initProtoBuffer(buf)
var histBytes: seq[byte]
if not ?pb.getField(1, histBytes):
return err(ProtobufError.missingRequiredField("IncomingRepairEntry.inHistEntry"))
let histPb = initProtoBuffer(histBytes)
let entry = ?wire.decodeHistoryEntry(histPb)
var cached: seq[byte]
if not ?pb.getField(2, cached):
return err(ProtobufError.missingRequiredField("IncomingRepairEntry.cachedMessage"))
var ms: uint64
if not ?pb.getField(3, ms):
return err(ProtobufError.missingRequiredField("IncomingRepairEntry.minTimeRepairResp"))
ok(
IncomingRepairEntry.init(
inHistEntry = entry,
cachedMessage = cached,
minTimeRepairResp = fromUnixMs(int64(ms)),
)
)
proc encodeInRepairKV(kv: IncomingRepairKV): ProtoBuffer =
var pb = initProtoBuffer()
pb.write(1, kv.messageId)
let entryPb = encodeInRepairEntry(kv.entry)
pb.write(2, entryPb.buffer)
pb.finish()
pb
proc decodeInRepairKV(buf: seq[byte]): ProtobufResult[IncomingRepairKV] =
let pb = initProtoBuffer(buf)
var msgId: SdsMessageID
if not ?pb.getField(1, msgId):
return err(ProtobufError.missingRequiredField("IncomingRepairKV.messageId"))
var entryBytes: seq[byte]
if not ?pb.getField(2, entryBytes):
return err(ProtobufError.missingRequiredField("IncomingRepairKV.entry"))
let entry = ?decodeInRepairEntry(entryBytes)
ok(IncomingRepairKV(messageId: msgId, entry: entry))
# ---------------------------------------------------------------------------
# ChannelMeta (top-level snapshot)
# ---------------------------------------------------------------------------
proc encode*(meta: ChannelMeta): ProtoBuffer =
var pb = initProtoBuffer()
pb.write(1, meta.schemaVersion)
pb.write(2, uint64(meta.lamportTimestamp))
for u in meta.outgoingBuffer:
let entryPb = encodeUnacked(u)
pb.write(3, entryPb.buffer)
for m in meta.incomingBuffer:
let entryPb = encodeIncoming(m)
pb.write(4, entryPb.buffer)
for kv in meta.outgoingRepairBuffer:
let entryPb = encodeOutRepairKV(kv)
pb.write(5, entryPb.buffer)
for kv in meta.incomingRepairBuffer:
let entryPb = encodeInRepairKV(kv)
pb.write(6, entryPb.buffer)
pb.finish()
pb
proc decode*(T: type ChannelMeta, buf: seq[byte]): ProtobufResult[T] =
let pb = initProtoBuffer(buf)
var meta = ChannelMeta.init()
var ver: uint32
if not ?pb.getField(1, ver):
return err(ProtobufError.missingRequiredField("ChannelMeta.schemaVersion"))
if ver != ChannelMetaSchemaVersion:
# Per the contract: refuse loudly rather than silently truncating.
return err(ProtobufError.missingRequiredField(
"ChannelMeta.schemaVersion(unsupported)"
))
meta.schemaVersion = ver
var lts: uint64
if not ?pb.getField(2, lts):
return err(ProtobufError.missingRequiredField("ChannelMeta.lamportTimestamp"))
meta.lamportTimestamp = int64(lts)
var outBufs, inBufs, outRepBufs, inRepBufs: seq[seq[byte]]
discard pb.getRepeatedField(3, outBufs)
for b in outBufs:
meta.outgoingBuffer.add(?decodeUnacked(b))
discard pb.getRepeatedField(4, inBufs)
for b in inBufs:
meta.incomingBuffer.add(?decodeIncoming(b))
discard pb.getRepeatedField(5, outRepBufs)
for b in outRepBufs:
meta.outgoingRepairBuffer.add(?decodeOutRepairKV(b))
discard pb.getRepeatedField(6, inRepBufs)
for b in inRepBufs:
meta.incomingRepairBuffer.add(?decodeInRepairKV(b))
ok(meta)
proc serialize*(meta: ChannelMeta): Result[seq[byte], ReliabilityError] =
ok(encode(meta).buffer)
proc deserializeChannelMeta*(
data: seq[byte]
): Result[ChannelMeta, ReliabilityError] =
let m = ChannelMeta.decode(data).valueOr:
return err(ReliabilityError.reDeserializationError)
ok(m)
# ---------------------------------------------------------------------------
# ChannelData (bootstrap payload)
# ---------------------------------------------------------------------------
proc encode*(d: ChannelData): ProtoBuffer =
var pb = initProtoBuffer()
let metaPb = encode(d.meta)
pb.write(1, metaPb.buffer)
for m in d.messageHistory:
let msgPb = wire.encode(m)
pb.write(2, msgPb.buffer)
pb.finish()
pb
proc decode*(T: type ChannelData, buf: seq[byte]): ProtobufResult[T] =
let pb = initProtoBuffer(buf)
var d = ChannelData.init()
var metaBytes: seq[byte]
if not ?pb.getField(1, metaBytes):
return err(ProtobufError.missingRequiredField("ChannelData.meta"))
d.meta = ?ChannelMeta.decode(metaBytes)
var histBufs: seq[seq[byte]]
discard pb.getRepeatedField(2, histBufs)
for b in histBufs:
let m = SdsMessage.decode(b).valueOr:
return err(ProtobufError.missingRequiredField("ChannelData.messageHistory[i]"))
d.messageHistory.add(m)
ok(d)
# ---------------------------------------------------------------------------
# HistoryUpdate
# ---------------------------------------------------------------------------
proc encode*(u: HistoryUpdate): ProtoBuffer =
var pb = initProtoBuffer()
for m in u.append:
let msgPb = wire.encode(m)
pb.write(1, msgPb.buffer)
for id in u.evict:
pb.write(2, id)
pb.finish()
pb
proc decode*(T: type HistoryUpdate, buf: seq[byte]): ProtobufResult[T] =
let pb = initProtoBuffer(buf)
var u = HistoryUpdate.init()
var appBufs: seq[seq[byte]]
discard pb.getRepeatedField(1, appBufs)
for b in appBufs:
let m = SdsMessage.decode(b).valueOr:
return err(ProtobufError.missingRequiredField("HistoryUpdate.append[i]"))
u.append.add(m)
var ev: seq[SdsMessageID]
discard pb.getRepeatedField(2, ev)
u.evict = ev
ok(u)
{.pop.}

View File

@ -0,0 +1,82 @@
## Atomic snapshot types for the per-channel protocol state.
##
## These types replace the fine-grained mutation operations of the original
## Persistence interface with a single self-contained blob per channel.
## See PLAN_SNAPSHOT_PERSISTENCE.md §1 for the rationale, §6 for the codec
## choice, §7 for size estimates.
##
## Bloom filter is intentionally absent — rebuilt from the message log on
## bootstrap. Message history is also absent — persisted separately via
## `HistoryUpdate` because it is large and append-mostly.
import ./sds_message_id
import ./sds_message
import ./unacknowledged_message
import ./incoming_message
import ./repair_entry
export
sds_message_id, sds_message, unacknowledged_message, incoming_message,
repair_entry
const ChannelMetaSchemaVersion* = 1'u32
## On-disk format version for ChannelMeta. Decoders MUST refuse to load a
## blob whose version they don't know how to interpret, rather than
## silently truncating or zero-filling unknown fields.
type
OutgoingRepairKV* = object
## Flattened (key, value) entry from the in-memory
## `outgoingRepairBuffer: Table[SdsMessageID, OutgoingRepairEntry]`.
## Protobuf has no first-class map type in the minprotobuf subset we
## use; even proto3 `map<K,V>` is wire-encoded as repeated KV messages.
## Flattening to a `seq[KV]` makes that shape explicit.
messageId*: SdsMessageID
entry*: OutgoingRepairEntry
IncomingRepairKV* = object
## Flattened (key, value) entry from
## `incomingRepairBuffer: Table[SdsMessageID, IncomingRepairEntry]`.
messageId*: SdsMessageID
entry*: IncomingRepairEntry
ChannelMeta* = object
## Atomic snapshot of the fast-changing per-channel protocol state.
## Persisted as one blob per `saveChannelMeta` call. The `Table`-backed
## buffers in `ChannelContext` are flattened to `seq`s here for stable
## serialization and deterministic ordering on disk.
schemaVersion*: uint32
lamportTimestamp*: int64
outgoingBuffer*: seq[UnacknowledgedMessage]
## Sent-but-not-yet-acked. Order matches insertion order in
## ChannelContext.outgoingBuffer; preserved on save/load.
incomingBuffer*: seq[IncomingMessage]
## Received-but-not-yet-deliverable; key in memory is
## `message.messageId`, so no KV wrapper is needed.
outgoingRepairBuffer*: seq[OutgoingRepairKV]
incomingRepairBuffer*: seq[IncomingRepairKV]
ChannelData* = object
## Returned by `loadChannel` on `getOrCreateChannel` bootstrap.
## Carries everything needed to rebuild the in-memory `ChannelContext`
## from a clean restart.
meta*: ChannelMeta
messageHistory*: seq[SdsMessage]
## MUST be ordered oldest-first (lamportTimestamp ASC, tie-break
## msg_id ASC). Bloom filter is rebuilt from this on load; FIFO
## eviction at maxMessageHistory relies on this ordering. Backend
## contract; the loader SHOULD validate.
proc init*(T: type ChannelMeta): T =
## Empty snapshot with current schema version. Used as the bootstrap
## payload when no on-disk state exists for a channel.
T(
schemaVersion: ChannelMetaSchemaVersion,
lamportTimestamp: 0,
outgoingBuffer: @[],
incomingBuffer: @[],
outgoingRepairBuffer: @[],
incomingRepairBuffer: @[],
)
proc init*(T: type ChannelData): T =
T(meta: ChannelMeta.init(), messageHistory: @[])

View File

@ -0,0 +1,30 @@
## Combined append/evict payload for the persistence message log.
##
## One protocol operation may deliver multiple messages in sequence (e.g.
## `unwrapReceivedMessage` followed by a `processIncomingBuffer` cascade
## that unblocks several buffered messages), and may also evict the oldest
## entries past `maxMessageHistory` in the same operation. Bundling all of
## those into a single `HistoryUpdate` lets the persistence backend execute
## the append + evict as one atomic batch alongside the matching
## `saveChannelMeta` call.
import ./sds_message_id
import ./sds_message
export sds_message_id, sds_message
type HistoryUpdate* = object
## When BOTH `append` and `evict` are empty, callers SHOULD skip the
## persistence call entirely. The Persistence interface treats an
## "empty" update as a no-op but the round-trip is not free.
append*: seq[SdsMessage]
## New delivered messages, in delivery order. Order matters for the
## backend's append-only log; nim-sds preserves causal ordering when
## populating this list.
evict*: seq[SdsMessageID]
## Oldest messages now past `maxMessageHistory`. Backend deletes by id.
proc init*(T: type HistoryUpdate): T =
T(append: @[], evict: @[])
proc isEmpty*(u: HistoryUpdate): bool =
u.append.len == 0 and u.evict.len == 0

View File

@ -0,0 +1,235 @@
## Round-trip tests for the snapshot persistence codec.
## Each `encode` → `decode` cycle must preserve every field exactly.
import std/[times, sets, unittest]
import results
import ../sds/snapshot_codec
import
../sds/types/[
sds_message, sds_message_id, history_entry, unacknowledged_message,
incoming_message, repair_entry,
]
converter toParticipantID(s: string): SdsParticipantID =
s.SdsParticipantID
proc mkMsg(id: string, ts: int64 = 1, content: seq[byte] = @[]): SdsMessage =
SdsMessage.init(
messageId = id,
lamportTimestamp = ts,
causalHistory = @[],
channelId = "chan",
content = content,
bloomFilter = @[],
senderId = "alice",
repairRequest = @[],
)
proc mkHistEntry(id: string): HistoryEntry =
HistoryEntry.init(messageId = id, senderId = "alice")
suite "snapshot codec — ChannelMeta":
test "empty meta round-trips":
let m = ChannelMeta.init()
let buf = encode(m).buffer
let dec = ChannelMeta.decode(buf).get()
check:
dec.schemaVersion == ChannelMetaSchemaVersion
dec.lamportTimestamp == 0
dec.outgoingBuffer.len == 0
dec.incomingBuffer.len == 0
dec.outgoingRepairBuffer.len == 0
dec.incomingRepairBuffer.len == 0
test "meta with lamport and single outgoing entry":
var m = ChannelMeta.init()
m.lamportTimestamp = 42
m.outgoingBuffer.add(
UnacknowledgedMessage.init(
message = mkMsg("m1", 42, @[1.byte, 2, 3]),
sendTime = fromUnix(1_700_000_000),
resendAttempts = 2,
)
)
let buf = encode(m).buffer
let dec = ChannelMeta.decode(buf).get()
check:
dec.lamportTimestamp == 42
dec.outgoingBuffer.len == 1
dec.outgoingBuffer[0].message.messageId == "m1"
dec.outgoingBuffer[0].message.content == @[1.byte, 2, 3]
dec.outgoingBuffer[0].resendAttempts == 2
dec.outgoingBuffer[0].sendTime.toUnix == 1_700_000_000
test "meta with incoming entry carrying missing deps":
var m = ChannelMeta.init()
var deps = initHashSet[SdsMessageID]()
deps.incl("dep1")
deps.incl("dep2")
m.incomingBuffer.add(
IncomingMessage.init(message = mkMsg("m2"), missingDeps = deps)
)
let buf = encode(m).buffer
let dec = ChannelMeta.decode(buf).get()
check:
dec.incomingBuffer.len == 1
dec.incomingBuffer[0].message.messageId == "m2"
dec.incomingBuffer[0].missingDeps == deps
test "meta with both repair buffers populated":
var m = ChannelMeta.init()
m.outgoingRepairBuffer.add(
OutgoingRepairKV(
messageId: "missing1",
entry: OutgoingRepairEntry.init(
outHistEntry = mkHistEntry("missing1"),
minTimeRepairReq = fromUnix(1_700_000_100),
),
)
)
m.incomingRepairBuffer.add(
IncomingRepairKV(
messageId: "requested1",
entry: IncomingRepairEntry.init(
inHistEntry = mkHistEntry("requested1"),
cachedMessage = @[9.byte, 8, 7, 6],
minTimeRepairResp = fromUnix(1_700_000_200),
),
)
)
let buf = encode(m).buffer
let dec = ChannelMeta.decode(buf).get()
check:
dec.outgoingRepairBuffer.len == 1
dec.outgoingRepairBuffer[0].messageId == "missing1"
dec.outgoingRepairBuffer[0].entry.minTimeRepairReq.toUnix ==
1_700_000_100
dec.incomingRepairBuffer.len == 1
dec.incomingRepairBuffer[0].messageId == "requested1"
dec.incomingRepairBuffer[0].entry.cachedMessage == @[9.byte, 8, 7, 6]
dec.incomingRepairBuffer[0].entry.minTimeRepairResp.toUnix ==
1_700_000_200
test "fully-populated meta — multiple entries each buffer":
var m = ChannelMeta.init()
m.lamportTimestamp = 999
for i in 0 ..< 5:
m.outgoingBuffer.add(
UnacknowledgedMessage.init(
message = mkMsg("o" & $i, int64(i), @[byte(i)]),
sendTime = fromUnix(1_700_000_000 + i.int64),
resendAttempts = i,
)
)
for i in 0 ..< 3:
var deps = initHashSet[SdsMessageID]()
deps.incl("dep" & $i)
m.incomingBuffer.add(
IncomingMessage.init(
message = mkMsg("i" & $i, int64(100 + i)), missingDeps = deps
)
)
for i in 0 ..< 4:
m.outgoingRepairBuffer.add(
OutgoingRepairKV(
messageId: "or" & $i,
entry: OutgoingRepairEntry.init(
outHistEntry = mkHistEntry("or" & $i),
minTimeRepairReq = fromUnix(1_700_000_300 + i.int64),
),
)
)
for i in 0 ..< 2:
m.incomingRepairBuffer.add(
IncomingRepairKV(
messageId: "ir" & $i,
entry: IncomingRepairEntry.init(
inHistEntry = mkHistEntry("ir" & $i),
cachedMessage = @[byte(i), byte(i + 1)],
minTimeRepairResp = fromUnix(1_700_000_400 + i.int64),
),
)
)
let buf = encode(m).buffer
let dec = ChannelMeta.decode(buf).get()
check:
dec.lamportTimestamp == 999
dec.outgoingBuffer.len == 5
dec.incomingBuffer.len == 3
dec.outgoingRepairBuffer.len == 4
dec.incomingRepairBuffer.len == 2
dec.outgoingBuffer[4].message.messageId == "o4"
dec.outgoingBuffer[4].resendAttempts == 4
dec.outgoingRepairBuffer[3].messageId == "or3"
dec.incomingRepairBuffer[1].entry.cachedMessage == @[1.byte, 2]
test "decoder rejects unknown schemaVersion":
var m = ChannelMeta.init()
m.schemaVersion = 999'u32
let buf = encode(m).buffer
check ChannelMeta.decode(buf).isErr
suite "snapshot codec — ChannelData":
test "empty channel data round-trips":
let d = ChannelData.init()
let buf = encode(d).buffer
let dec = ChannelData.decode(buf).get()
check:
dec.meta.schemaVersion == ChannelMetaSchemaVersion
dec.messageHistory.len == 0
test "channel data with meta and history preserves order":
var d = ChannelData.init()
d.meta.lamportTimestamp = 17
d.messageHistory.add(mkMsg("h1", 1))
d.messageHistory.add(mkMsg("h2", 2))
d.messageHistory.add(mkMsg("h3", 3))
let buf = encode(d).buffer
let dec = ChannelData.decode(buf).get()
check:
dec.meta.lamportTimestamp == 17
dec.messageHistory.len == 3
dec.messageHistory[0].messageId == "h1"
dec.messageHistory[1].messageId == "h2"
dec.messageHistory[2].messageId == "h3"
suite "snapshot codec — HistoryUpdate":
test "empty update reports isEmpty (callers skip persistence)":
# By contract (HistoryUpdate doc): when both append and evict are
# empty, callers MUST skip the persistence call entirely. The codec
# is not required to round-trip an empty update — minprotobuf's
# finish() refuses an empty buffer, by design.
let u = HistoryUpdate.init()
check u.isEmpty
test "append-only update":
var u = HistoryUpdate.init()
u.append.add(mkMsg("a1"))
u.append.add(mkMsg("a2"))
let buf = encode(u).buffer
let dec = HistoryUpdate.decode(buf).get()
check:
dec.append.len == 2
dec.append[0].messageId == "a1"
dec.append[1].messageId == "a2"
dec.evict.len == 0
test "evict-only update":
var u = HistoryUpdate.init()
u.evict = @["e1", "e2", "e3"]
let buf = encode(u).buffer
let dec = HistoryUpdate.decode(buf).get()
check:
dec.append.len == 0
dec.evict == @["e1", "e2", "e3"]
test "mixed append + evict update":
var u = HistoryUpdate.init()
u.append.add(mkMsg("new"))
u.evict = @["old1", "old2"]
let buf = encode(u).buffer
let dec = HistoryUpdate.decode(buf).get()
check:
dec.append.len == 1
dec.append[0].messageId == "new"
dec.evict == @["old1", "old2"]