diff --git a/ANALYSIS_SDS_PERSISTENCE.md b/ANALYSIS_SDS_PERSISTENCE.md new file mode 100644 index 0000000..655c7f4 --- /dev/null +++ b/ANALYSIS_SDS_PERSISTENCE.md @@ -0,0 +1,240 @@ +# SDS Workflow & Persistence Layer — Honest Analysis + +## 1. Architecture Overview + +The SDS protocol lives in two layers: + +| Layer | Files | Responsibility | +|-------|-------|---------------| +| **Core types + helpers** | `sds/sds_utils.nim`, `sds/types/*.nim` | State types, Lamport clock, history management, bloom filter, dependency checking | +| **Protocol orchestration** | `sds.nim` (root module) | `wrapOutgoingMessage`, `unwrapReceivedMessage`, `markDependenciesMet`, periodic tasks | + +The `Persistence` interface (`sds/types/persistence.nim`) is a struct of 13 async proc fields. It is injected at `newReliabilityManager` construction time. Default: `noOpPersistence()` — discards all writes, returns empty snapshots. + +## 2. SDS Workflow (excluding `library/`) + +### Send path (`sds.nim:87–174` — `wrapOutgoingMessage`) + +``` +acquire lock + → getOrCreateChannel (loads from persistence if first time) + → updateLamportTimestamp → saveLamport + → serialize bloom filter + → collect expired SDS-R repair requests → removeOutgoingRepair (per entry) + → build causal history → setRetrievalHint (per entry, if hint provider set) + → construct SdsMessage + → add to outgoingBuffer → saveOutgoing + → add to bloom filter (memory only) + → addToHistory → appendLogEntry + removeLogEntry (eviction) + → serialize → return bytes +release lock +``` + +**Persistence calls per send**: 3–5+ depending on repair buffer and history eviction. + +### Receive path (`sds.nim:235–376` — `unwrapReceivedMessage`) + +``` +extractChannelId + deserialize +getOrCreateChannel (may loadAllForChannel) +→ cleanup repair buffers: removeOutgoingRepair, removeIncomingRepair +→ duplicate check (return early if in history) +→ add to bloom filter (memory only) +→ updateLamportTimestamp → saveLamport +→ reviewAckStatus → removeOutgoing (per acked message) +→ process SDS-R repair requests → removeOutgoingRepair + saveIncomingRepair (per entry) +→ check dependencies: + - all met, no buffer deps: addToHistory → appendLogEntry; unblock buffered → saveIncoming per; processIncomingBuffer + - all met, but deps in buffer: saveIncoming + - missing deps: saveIncoming; create SDS-R outgoing entries → saveOutgoingRepair per +return +``` + +**Persistence calls per receive**: 4–15+ depending on repair entries, ack status, and dependency resolution depth. + +### Background tasks (`sds.nim:487–571`) + +| Task | Interval | Persistence calls | +|------|----------|-------------------| +| `periodicBufferSweep` | `bufferSweepInterval` | `saveOutgoing` or `removeOutgoing` per resend/expiry | +| `periodicSyncMessage` | `syncMessageInterval` | None (callback only) | +| `periodicRepairSweep` | `repairSweepInterval` | `removeIncomingRepair`, `removeOutgoingRepair` per expired entry | + +Background tasks **discard** persistence errors (`discard await rm.runRepairSweep()` at line 568, `discard await rm.checkUnacknowledgedMessages(channelId)` at line 494). + +### Bootstrap (`sds/sds_utils.nim:289–322` — `getOrCreateChannel`) + +``` +loadAllForChannel → ChannelSnapshot + → populate lamportTimestamp + → populate messageHistory + rebuild bloom filter from it + → populate outgoingBuffer, incomingBuffer + → populate outgoingRepairBuffer, incomingRepairBuffer +``` + +**Bloom filter is never persisted** — rebuilt from message history. This is documented and intentional. + +## 3. Persistence Interface Shape (SQLite Backend Perspective) + +The 13 operations map naturally to SQLite tables: + +| Operation | SQLite analogue | +|-----------|----------------| +| `saveLamport` | `UPSERT INTO lamport_clocks (channel_id, ts)` | +| `appendLogEntry` | `INSERT INTO message_log (channel_id, msg_id, blob)` | +| `removeLogEntry` | `DELETE FROM message_log WHERE msg_id = ?` | +| `setRetrievalHint` | `UPDATE message_log SET hint = ? WHERE msg_id = ?` | +| `saveOutgoing` | `UPSERT INTO outgoing_buffer (channel_id, msg_id, blob)` | +| `removeOutgoing` | `DELETE FROM outgoing_buffer WHERE msg_id = ?` | +| `saveIncoming` | `UPSERT INTO incoming_buffer (channel_id, msg_id, blob)` | +| `removeIncoming` | `DELETE FROM incoming_buffer WHERE msg_id = ?` | +| `saveOutgoingRepair` | `UPSERT INTO outgoing_repair (channel_id, msg_id, blob)` | +| `removeOutgoingRepair` | `DELETE FROM outgoing_repair WHERE msg_id = ?` | +| `saveIncomingRepair` | `UPSERT INTO incoming_repair (channel_id, msg_id, blob)` | +| `removeIncomingRepair` | `DELETE FROM incoming_repair WHERE msg_id = ?` | +| `dropChannel` | `DELETE FROM * WHERE channel_id = ?` (all tables) | +| `loadAllForChannel` | `SELECT * FROM * WHERE channel_id = ?` (all tables) | + +Minimum schema: 5 tables (lamport_clocks, message_log, outgoing_buffer, incoming_buffer, repair_entries with a direction column — or 6 if outgoing/incoming repair are separated). + +--- + +## 4. Risk Analysis + +### CRITICAL — No Transactional Atomicity Across Persistence Calls + +**Risk level: HIGH** + +Every protocol operation makes **multiple independent persistence calls**. Example from `unwrapReceivedMessage`: + +``` +removeOutgoingRepair ← succeeds +removeIncomingRepair ← succeeds +saveLamport ← succeeds +removeOutgoing ← succeeds +appendLogEntry ← FAILS +``` + +If `appendLogEntry` fails mid-way, the in-memory state has already been mutated (bloom filter updated, buffers modified, Lamport clock advanced). The function returns `err()` to the caller, but: + +1. **In-memory state is now ahead of disk state.** The message is in the bloom filter and history in memory but not on disk. +2. **On restart, the snapshot will be stale.** `loadAllForChannel` rebuilds from disk — the message won't be in history, bloom filter will be rebuilt without it, but other nodes may already consider it delivered. +3. **There is no rollback of prior successful persistence calls.** The Lamport clock is already persisted at the new value, repair buffer entries are already deleted. + +**Impact**: After a crash following a partial persistence failure, the node's state diverges from what peers believe. Causal ordering assumptions break. Duplicate delivery or permanent buffering of dependent messages becomes possible. + +**Mitigation for a SQLite backend**: Wrap all persistence calls within a single protocol operation in one `BEGIN … COMMIT` transaction. The current interface design (individual proc fields) makes this structurally impossible — there's no transaction boundary concept. + +### HIGH — In-Memory Mutation Before Persistence Confirmation + +**Risk level: HIGH** + +Throughout the codebase, the pattern is: + +```nim +# mutate in-memory state +channel.outgoingRepairBuffer.del(msg.messageId) # memory mutated +# then persist +(await rm.persistence.removeOutgoingRepair(...)).isOkOr: + return err(...) # too late to undo memory +``` + +This appears in `unwrapReceivedMessage` (lines 256–261), `wrapOutgoingMessage` (lines 131–133), `reviewAckStatus` (lines 77–81), and throughout `processIncomingBuffer`. + +If persistence fails, the function returns an error, but the in-memory state has already been modified. The caller cannot retry because the state is now inconsistent. + +**Exception**: `addToHistory` (sds_utils.nim:91–92) correctly mutates memory first then persists, but on failure, the memory mutation is **not rolled back**. + +### HIGH — Background Tasks Silently Swallow Persistence Errors + +**Risk level: MEDIUM-HIGH** + +```nim +# sds.nim:494 +discard await rm.checkUnacknowledgedMessages(channelId) + +# sds.nim:568 +discard await rm.runRepairSweep() +``` + +`checkUnacknowledgedMessages` modifies `channel.outgoingBuffer` (line 478: `channel.outgoingBuffer = newOutgoingBuffer`) and persists entries. If persistence fails partway through, the in-memory buffer has already been rewritten. The `discard` means the error isn't even visible to any caller. + +The comment says "next tick retries" — but next tick operates on the already-mutated in-memory state, not the stale disk state. After a restart, disk state wins and the divergence materializes. + +### MEDIUM — History Eviction Is Multi-Step Without Atomicity + +**Risk level: MEDIUM** + +`addToHistory` (sds_utils.nim:81–106): + +```nim +channel.messageHistory[msg.messageId] = msg # insert +(await rm.persistence.appendLogEntry(...)).isOkOr: # persist insert + return err(...) +while channel.messageHistory.len > max: + # evict oldest + channel.messageHistory.del(firstKey) + (await rm.persistence.removeLogEntry(...)).isOkOr: # persist eviction + return err(...) +``` + +If the append succeeds but an eviction `removeLogEntry` fails: on restart, the history will contain entries beyond `maxMessageHistory`. Not catastrophic but violates the capacity invariant and could grow unbounded over repeated failures. + +### MEDIUM — `dropChannel` Atomicity Depends Entirely on Backend + +```nim +# sds_utils.nim:27-35 +(await rm.persistence.dropChannel(channelId)).isOkOr: + return err(reliabilityErr(error)) +``` + +The comment on `persistence.nim:103-106` says "Backends should implement this atomically (e.g. one BEGIN/COMMIT)." Good — but there's no enforcement. A naive SQLite backend that does `DELETE FROM t1; DELETE FROM t2; ...` without a transaction could leave partial state. + +### MEDIUM — `ChannelSnapshot.messageHistory` Ordering Assumption + +```nim +# persistence.nim:41-42 +messageHistory*: seq[SdsMessage] + ## MUST be ordered oldest-first. +``` + +The contract says "MUST be ordered oldest-first" — but there's no validation in `getOrCreateChannel`. If a SQLite backend returns messages in wrong order (e.g. missing `ORDER BY lamport_timestamp, message_id`), the `OrderedTable` insertion order will be wrong, corrupting causal history tail selection and FIFO eviction silently. + +### LOW — Bloom Filter Rebuild Correctness + +The bloom filter is rebuilt from `messageHistory` on bootstrap — which is capped at `maxMessageHistory` entries. Messages evicted from history won't be in the rebuilt bloom filter. This means: + +- After restart, the bloom filter covers fewer messages than before the crash. +- Peers may believe we have messages (based on a pre-crash bloom snapshot they received) that we no longer claim to have. +- This can trigger unnecessary SDS-R repair requests. + +This is a **known design tradeoff**, documented in `persistence.nim:30-32`. Impact is limited to repair overhead, not correctness. + +### LOW — No Backend Exists in This Repo + +There is no SQLite backend (or any real backend) in nim-sds. The `noOpPersistence()` default means: + +- All tests run without durability. +- The persistence interface is untested against real I/O failure modes. +- Any bugs in the interface contract (ordering, atomicity) won't surface until a backend is integrated. + +--- + +## 5. Summary Verdict + +| Area | Grade | Notes | +|------|-------|-------| +| Interface design | **B+** | Clean, well-documented, 13 focused operations. Missing: transaction boundaries. | +| Error propagation | **B** | Consistent `Result[T, string]` → `rePersistenceError` mapping. But background tasks discard errors. | +| In-memory/disk consistency | **D** | No rollback on partial failure. Memory-first mutation pattern throughout. | +| Atomicity | **D** | Multi-call operations have no transaction concept. Partial writes are structurally possible. | +| Bootstrap correctness | **B-** | Works correctly IF backend orders history right. No validation. Bloom rebuild is lossy by design. | +| Test coverage of persistence | **F** | Zero tests exercise a real backend. All tests use noOpPersistence. | + +### Recommendations + +1. **Add a transaction/batch concept to the Persistence interface.** Even a simple `beginBatch`/`commitBatch` pair would let a SQLite backend wrap multi-step operations atomically. +2. **Reverse the mutation order**: persist first, mutate memory on success. This eliminates the in-memory-ahead-of-disk divergence. +3. **Don't discard background task results.** At minimum, log them. Better: track failure counts and surface them via a health check callback. +4. **Validate `ChannelSnapshot` ordering in `getOrCreateChannel`.** Assert `lamportTimestamp` monotonicity on the loaded `messageHistory`. +5. **Write integration tests with a real (in-memory SQLite) backend** that exercises failure injection — kill persistence mid-operation and verify recovery. diff --git a/ANALYSIS_SNAPSHOT_SAVE_POINTS.md b/ANALYSIS_SNAPSHOT_SAVE_POINTS.md new file mode 100644 index 0000000..48accca --- /dev/null +++ b/ANALYSIS_SNAPSHOT_SAVE_POINTS.md @@ -0,0 +1,71 @@ +# SDS State Snapshot — Save Points & Call Rate + +Snapshot = `saveChannelMeta(channelId, ChannelMeta)` carrying: `lamportTimestamp`, +`outgoingBuffer`, `incomingBuffer`, `outgoingRepairBuffer`, `incomingRepairBuffer`. +(Bloom filter excluded — rebuilt from history. History persisted separately via +`updateHistory(append, evict)`.) + +**Rule: exactly one snapshot save per protocol operation, fired at the end, under +the lock, only if meta actually changed (dirty flag).** + +## Save Points + +| # | Operation | Save? | When | Condition | +|---|-----------|-------|------|-----------| +| 1 | `wrapOutgoingMessage` (sds.nim:163) | **1** | end, before `serializeMessage` | always (lamport + outgoingBuffer always mutate) | +| 2 | `unwrapReceivedMessage` (sds.nim:373) | **0 or 1** | end, before `return` | 0 on duplicate early-return (line 264); else 1 — covers all 3 paths | +| 3 | `markDependenciesMet` (sds.nim:415) | **1** | end, after `processIncomingBuffer` | if any dep matched | +| 4 | `checkUnacknowledgedMessages` (sds.nim:478) | **0 or 1** | end of pass | only if buffer changed (resend/expiry) | +| 5 | `runRepairSweep` (sds.nim:556) | **0..C** | per channel, end of channel loop | one per *dirty* channel only | + +`processIncomingBuffer` and `reviewAckStatus` become pure in-memory helpers — they +never save; the calling op (2 or 3) persists once at the end. + +## Rate Model (per channel) + +Let `S` = sends/s, `R` = non-duplicate receives/s. + +``` +snapshot_rate ≈ S + R (foreground, dominant) + + 1/repairSweepInterval if repair buffers dirty = 0.2/s + + 1/bufferSweepInterval if outgoing buffer dirty = 0.0167/s +``` + +`repairSweepInterval = 5s`, `bufferSweepInterval = 60s`. + +### Background floor (zero traffic) +With dirty-flag guard: **0 saves/s** on a quiet channel (empty buffers → nothing to +persist). Without the guard, the 5s repair sweep alone would force 0.2 saves/s/channel +even when idle — so the dirty-flag guard is mandatory, not optional. + +### Worked examples + +| Scenario | Channels | Per-ch S+R | Foreground | Background | Total snapshot/s | +|----------|----------|-----------|-----------|-----------|------------------| +| Idle | 10 | 0 | 0 | 0 (guarded) | **0** | +| Light chat | 5 | 1 | 5 | ~0.2 | **~5** | +| Busy | 10 | 6 | 60 | ~2 | **~62** | +| Heavy / lossy (SDS-R churning) | 10 | 20 | 200 | ~2 | **~202** | + +Background is negligible vs foreground whenever there is traffic. The snapshot rate +is essentially **one write per protocol message** — bounded by network throughput, +not by internal mutation count. + +## Why this is safe for SQLite-on-a-thread + +- 1 snapshot write per message → 1 cross-thread round-trip, 1 `UPSERT` of a single + blob row, foldable into the same transaction as the `updateHistory` call. +- Snapshot blob is **small**: buffer sizes are bounded by traffic-in-flight, not by + `maxMessageHistory`. Typical < a few KB even under load. +- vs. current fine-grained interface (10–15 calls/op), this is a **5–10× reduction** + in cross-thread round-trips and SQLite operations, with atomic crash consistency. + +## Snapshot vs History rate (separation payoff) + +| | Snapshot (`saveChannelMeta`) | History (`updateHistory`) | +|---|---|---| +| Append rate | n/a | S + R_delivered (every delivered msg) | +| Evict | n/a | batched, only past maxMessageHistory=1000 | +| Save rate | S + R (every msg) | S + R_delivered | +| Blob size | small (buffers) | large but append-only | +| Coupling | both fire together at op end → 1 SQLite txn | diff --git a/PLAN_SNAPSHOT_PERSISTENCE.md b/PLAN_SNAPSHOT_PERSISTENCE.md new file mode 100644 index 0000000..b66d0fa --- /dev/null +++ b/PLAN_SNAPSHOT_PERSISTENCE.md @@ -0,0 +1,502 @@ +# SDS Snapshot Persistence — Design & Refactor Plan + +Companion to `ANALYSIS_SDS_PERSISTENCE.md` (problem statement) and +`ANALYSIS_SNAPSHOT_SAVE_POINTS.md` (where & how often we save). + +This document defines: +1. **Data structures** to be persisted (snapshot + history) +2. **New `Persistence` interface** (5 procs replacing the current 13) +3. **Refactor plan** — phased, test-gated, backward-compatible interim state + +--- + +## 1. Data Structure Design + +### 1.1 Design principles + +| Principle | Reason | +|-----------|--------| +| Snapshot is **one atomic blob** | Eliminates partial-write divergence (the root cause from ANALYSIS_SDS_PERSISTENCE.md §4) | +| Snapshot is **small** (buffers only, no history) | Keeps per-op write cost ≤ a few KB; foldable into one SQLite txn | +| History is **separate, append-batched** | Large data, append-mostly, queryable by msg_id for SDS-R | +| Bloom filter is **not persisted** | Already the case — rebuilt from history on bootstrap | +| **Versioned wire format** | Allow future schema evolution without breaking on-disk data | +| **Protobuf** serialization | Project already uses it (`sds/protobuf.nim`); keeps one codec | + +### 1.2 `ChannelMeta` — the snapshot payload + +```nim +# sds/types/channel_meta.nim (new file) + +import std/[tables, times] +import ./sds_message_id +import ./unacknowledged_message +import ./incoming_message +import ./repair_entry +export + sds_message_id, unacknowledged_message, incoming_message, repair_entry + +const ChannelMetaSchemaVersion* = 1'u32 + +type ChannelMeta* = object + ## Atomic snapshot of the fast-changing per-channel protocol state. + ## Persisted as one blob per `saveChannelMeta` call. Bloom filter is + ## intentionally absent — rebuilt from the message log on bootstrap. + ## Message history is also absent — persisted separately via `updateHistory` + ## because it is large and append-mostly. + schemaVersion*: uint32 + ## On-disk format version. Backends MUST refuse to load a meta whose + ## version they don't know how to decode rather than silently truncating + ## or zero-filling unknown fields. + + lamportTimestamp*: int64 + + outgoingBuffer*: seq[UnacknowledgedMessage] + ## Sent-but-not-yet-acked messages. Order matters: the protocol iterates + ## in insertion order for resend-attempt accounting. + + incomingBuffer*: seq[IncomingMessage] + ## Received-but-not-yet-deliverable messages, each carrying its + ## still-missing dependency set. Order is irrelevant; flattened from + ## the in-memory `Table` for wire-friendliness. + + outgoingRepairBuffer*: seq[OutgoingRepairKV] + incomingRepairBuffer*: seq[IncomingRepairKV] + ## SDS-R repair buffers, flattened from in-memory `Table` to seq of + ## (key, value) for stable serialization. + +type + OutgoingRepairKV* = object + messageId*: SdsMessageID + entry*: OutgoingRepairEntry + + IncomingRepairKV* = object + messageId*: SdsMessageID + entry*: IncomingRepairEntry +``` + +**Why flatten the `Table`s to `seq`s?** +Protobuf has no native map of `SdsMessageID → object`. Flattening to `seq` of KV +objects gives deterministic encoding and trivial decode-time rebuild of the +in-memory `Table`. The cost is one extra alloc per entry on encode/decode — +negligible vs. the I/O it replaces. + +**Why an explicit `schemaVersion`?** +The current interface has no version field. Adding fields later (e.g., a new +SDS-R counter) silently truncates old data on load. The version makes +incompatibility explicit; backends fail loud instead of corrupting state. + +### 1.3 `HistoryAppend` — the history-write payload + +```nim +# extension to sds/types/persistence.nim or new history_update.nim + +type HistoryUpdate* = object + ## Combined append/evict for one protocol operation. Empty `append` and + ## empty `evict` ⇒ caller should skip the call entirely. + append*: seq[SdsMessage] + ## New delivered messages, in delivery order (matters for SDS-R retrieval + ## hint correctness and FIFO eviction on the backend side). + evict*: seq[SdsMessageID] + ## Oldest messages now past `maxMessageHistory`. Backend deletes by id. +``` + +`append` is a `seq` (not a single `SdsMessage`) because `processIncomingBuffer` +can deliver a chain of unblocked messages in one call to the parent op +(`unwrapReceivedMessage` / `markDependenciesMet`). Sending them all in one +`updateHistory` call keeps the "one save per protocol op" guarantee. + +### 1.4 `ChannelData` — the bootstrap payload + +```nim +type ChannelData* = object + ## Returned by `loadChannel` on `getOrCreateChannel` bootstrap. + ## Carries everything needed to rebuild the in-memory `ChannelContext` + ## from a clean restart. + meta*: ChannelMeta + messageHistory*: seq[SdsMessage] + ## MUST be ordered oldest-first (lamportTimestamp ASC, tie-break msg_id + ## ASC). Bloom filter is rebuilt from this on load; FIFO eviction relies + ## on this ordering. Backend contract; validated by nim-sds on load. +``` + +### 1.5 Storage encoding (internal to nim-sds — not the SDS network wire format) + +**Disambiguation.** The SDS **network** wire format (bytes peers exchange) is +handled by the existing `sds/protobuf.nim` and is untouched by this plan. +What this section defines is the **storage** encoding: the codec nim-sds uses +to turn a `ChannelMeta` Nim object into the opaque `seq[byte]` blob it hands +to `saveChannelMeta`. The KV persistence worker treats that blob as +fully opaque — it stores `(key: bytes) → (value: bytes)` and does its own +buffering/batching of writes. Whether nim-sds uses protobuf, CBOR, or +anything else is invisible to the worker. + +**Why this codec exists at all.** The worker stores bytes; something must +produce those bytes from the in-memory `ChannelMeta`. That responsibility +sits inside nim-sds, on the producer side of the persistence boundary. It +runs synchronously inside `saveChannelMeta`, before the blob crosses to the +worker. + +**Choice: protobuf, reusing the existing toolchain.** +- `sds/protobuf.nim` is already a dependency and already encodes `SdsMessage` +- Field-number versioning composes naturally with the explicit `schemaVersion` +- Encoders for the new types compose on top of the existing `SdsMessage` one + — no new codec to maintain + +**Encoders to add:** +- `UnacknowledgedMessage` (wraps `SdsMessage` + `sendTime: int64` unix-ms + `resendAttempts: uint32`) +- `IncomingMessage` (wraps `SdsMessage` + `missingDeps: repeated bytes`) +- `OutgoingRepairEntry` / `IncomingRepairEntry` (HistoryEntry + Time + optional cachedMessage) +- `OutgoingRepairKV` / `IncomingRepairKV` (msgId + entry — flattened map; see §6) +- `ChannelMeta` (top-level) + +`Time` is serialized as `int64` unix milliseconds. The wall-clock semantics +are already used by the protocol itself (`getTime()` in `wrapOutgoingMessage`). + +**On durability.** Because the worker buffers blobs, `saveChannelMeta` +returning `ok()` means "the blob was accepted by the worker," not "the blob +is fsynced." That is the worker's contract to manage. nim-sds's own +invariant — one snapshot save per protocol op, after all in-memory mutation +completes — is satisfied as soon as the worker accepts the blob, because +on recovery the worker replays its own buffer in order, so the snapshot +nim-sds last issued is the snapshot nim-sds will see on next `loadChannel`. + +--- + +## 2. New `Persistence` Interface + +Replace the current 13 procs in `sds/types/persistence.nim` with **5**: + +```nim +type Persistence* = object + saveChannelMeta*: proc( + channelId: SdsChannelID, meta: ChannelMeta + ): Future[Result[void, string]] {.async: (raises: []), gcsafe.} + + updateHistory*: proc( + channelId: SdsChannelID, update: HistoryUpdate + ): Future[Result[void, string]] {.async: (raises: []), gcsafe.} + + loadChannel*: proc( + channelId: SdsChannelID + ): Future[Result[ChannelData, string]] {.async: (raises: []), gcsafe.} + + dropChannel*: proc( + channelId: SdsChannelID + ): Future[Result[void, string]] {.async: (raises: []), gcsafe.} + + setRetrievalHint*: proc( + msgId: SdsMessageID, hint: seq[byte] + ): Future[Result[void, string]] {.async: (raises: []), gcsafe.} +``` + +### Atomicity contract (documented in the interface comment) + +> Backends SHOULD execute `saveChannelMeta` and the immediately following +> `updateHistory` call within a single transaction when both arrive together +> from the same protocol op. nim-sds always issues them back-to-back under +> the channel lock, with no `await`-of-other-work in between, so the backend +> can either (a) buffer `saveChannelMeta` until the next `updateHistory` or +> `flush`, or (b) use a `txn(channelId)` handle. Variant (b) is cleaner; see +> §3.2 for the optional `beginTxn`/`commitTxn` extension. + +### Backend assumption: schema-agnostic KV blob store + +The target backend is the existing schema-agnostic KV persistence module in +the sibling repo. It stores opaque `(key: bytes) → (value: bytes)` blobs with +its own crash-consistency guarantees. Therefore: + +- nim-sds owns the wire format end-to-end (no SQL schema to coordinate) +- The "single transaction per op" requirement reduces to "two KV puts per + op": `meta:` and `history::` (one or more) +- The backend's existing batch/atomicity primitives are what guarantee + crash consistency — nim-sds doesn't need transaction-handle plumbing + +--- + +## 3. Refactor Plan + +### Phase 0 — Pre-work (no behavior change) + +| Step | File(s) | Verify | +|------|---------|--------| +| 0.1 Add `ChannelMeta`, `HistoryUpdate`, `ChannelData` types | new `sds/types/channel_meta.nim`, `sds/types/history_update.nim` | `nimble c sds.nim` compiles | +| 0.2 Add protobuf encoders/decoders for new types | extend `sds/protobuf.nim` | round-trip unit tests | +| 0.3 Add `tests/test_snapshot_codec.nim` | new test file | `nimble test` passes; covers empty, single-entry, full-buffer, repair-heavy cases | + +### Phase 1 — New interface alongside old + +| Step | File(s) | Verify | +|------|---------|--------| +| 1.1 Add new 5-proc `Persistence` type as `PersistenceV2` (rename later) | `sds/types/persistence.nim` | compiles; old interface still works | +| 1.2 Add `noOpPersistenceV2()` for tests | same | `nimble test` passes | +| 1.3 Add `ReliabilityManager.persistenceV2` field, optional | `sds/types/reliability_manager.nim` | one of `persistence` / `persistenceV2` is in use; assert at construction | + +### Phase 2 — Migrate protocol ops, one at a time + +For each op, the pattern is: +1. Add a `dirty: bool` local accumulator +2. Replace inner `await rm.persistence.X` calls with in-memory mutation + set `dirty = true` +3. At the end of the op (under lock, before `return`), emit at most one `saveChannelMeta` and at most one `updateHistory` call + +Order (least risky → highest risk): + +| Step | Op | File:line | Verify | +|------|-----|-----------|--------| +| 2.1 | `runRepairSweep` | sds.nim:510 | repair sweep unit test, with failure injection | +| 2.2 | `checkUnacknowledgedMessages` | sds.nim:445 | resend-flow integration test | +| 2.3 | `processIncomingBuffer` → pure (no persistence) | sds.nim:176 | callers will persist; covered by 2.4/2.5 | +| 2.4 | `reviewAckStatus` → pure (no persistence) | sds.nim:36 | covered by 2.5 | +| 2.5 | `unwrapReceivedMessage` | sds.nim:235 | full receive-path tests (paths A/B/C); duplicate early-return must skip save | +| 2.6 | `wrapOutgoingMessage` | sds.nim:87 | send-path tests | +| 2.7 | `markDependenciesMet` | sds.nim:378 | dep-resolution tests | +| 2.8 | `addToHistory` → return appended/evicted lists instead of persisting | sds_utils.nim:81 | covered by 2.5/2.6/2.7 | +| 2.9 | `updateLamportTimestamp` → pure (no persistence) | sds_utils.nim:108 | covered | +| 2.10 | `getOrCreateChannel` use `loadChannel` | sds_utils.nim:289 | bootstrap unit test | +| 2.11 | `removeChannel`, `resetReliabilityManager` → `dropChannel` | sds_utils.nim, sds.nim | wipe tests | + +Each step is a small commit. After every step: `nimble test` + `gitnexus_detect_changes` to confirm scope. + +### Phase 3 — Remove the old interface + +| Step | File(s) | Verify | +|------|---------|--------| +| 3.1 Delete old 13-proc `Persistence` fields | `sds/types/persistence.nim` | compile fails on stragglers — fix | +| 3.2 Rename `PersistenceV2` → `Persistence` | all call sites | full test suite | +| 3.3 Delete `noOpPersistence` (old), keep `noOpPersistenceV2` as `noOpPersistence` | same | tests pass | +| 3.4 Update `library/` FFI thread to construct the new `Persistence` | `library/sds_thread/...` | FFI smoke test on macOS + Linux | +| 3.5 Update `Broker_FFI_API.md` and any docs referencing the old contract | docs | review | + +### Phase 4 — (removed) + +A reference backend is **not** part of this plan. The schema-agnostic KV +persistence module in the sibling repo is the production backend. Its +authors own the integration adapter that maps the 5 `Persistence` procs onto +KV puts/gets. nim-sds only needs to expose the interface and a working +`noOpPersistence` for its own tests. + +--- + +## 4. Risk Mitigation During Refactor + +| Risk | Mitigation | +|------|------------| +| Mid-refactor inconsistency (some ops on new interface, some on old) | Phase 2 keeps both interfaces wired — only one is active per RM via a constructor switch; integration tests run against both | +| Behavior change masked by passing tests | Add `tests/test_persistence_contract.nim` that asserts exact call count per protocol op (before vs after must match the table in `ANALYSIS_SNAPSHOT_SAVE_POINTS.md`) | +| Memory-first mutation pattern preserved by accident | Move *all* persistence calls to the end of the op, after the lock-held mutation block completes. The dirty flag is set *during* mutation; the save fires *after*. If save fails, the in-memory state is still the source of truth for the next op — but now there's only one possible point of divergence per op, not 10. | +| FFI thread breakage | Phase 3.4 is the FFI cutover; smoke test on both `--mm:refc` and `--mm:orc`, macOS and Linux, before declaring done. ASAN run on the FFI example. | +| Snapshot blob growth surprises | Add a `len()` log on `saveChannelMeta` for the first week of integration; fail-loud if any blob exceeds (configurable) 1 MB | + +--- + +## 5. Acceptance Criteria + +- [ ] All existing `nimble test` cases pass against the new interface +- [ ] New `tests/test_persistence_contract.nim` enforces exactly the call counts from `ANALYSIS_SNAPSHOT_SAVE_POINTS.md` §"Save Points" table +- [ ] New `tests/test_snapshot_codec.nim` round-trips every `ChannelMeta` variant +- [ ] Failure-injection test: kill persistence between `saveChannelMeta` and `updateHistory` → on restart, the manager loads a self-consistent snapshot (no orphan history entries; no dangling buffer references) +- [ ] FFI smoke (`liblogosdelivery`-style) runs clean on macOS+refc, macOS+orc, Linux+refc, Linux+orc +- [ ] `Broker_FFI_API.md` reflects the new contract +- [ ] Bench: snapshot save rate matches the predicted `S + R` (foreground) and ≤ 0.2/s/channel background floor (with dirty-guard) under a synthetic 50-msg/s workload +- [ ] Snapshot blob size on the bench workload matches the estimate in §7 within 2×; outliers logged + +--- + +## 6. Codec & flattening — where protobuf comes in + +### Codec choice + +The KV backend stores opaque blobs. The codec that produces the blob is +**internal to nim-sds**. Protobuf is the natural choice because: + +- The project already uses protobuf for the SDS wire format + (`sds/protobuf.nim` encodes `SdsMessage`). One codec, one toolchain. +- Field-number versioning gives forward/backward compatibility for free — + pairs naturally with the `schemaVersion` field. +- Repeated message fields encode efficiently and round-trip cleanly. + +Concretely: `ChannelMeta` is a top-level protobuf message; `saveChannelMeta` +serializes it to `seq[byte]` and the backend writes that under +`meta:`. On load, the backend returns the bytes; nim-sds +deserializes. + +### Why flatten `Table[Id, Entry]` to `seq[KV]` + +Protobuf's wire format has no first-class "map of bytes-key → message-value" +type in the minimal subset used by `sds/protobuf.nim` (the +`nim-libp2p`-style `minprotobuf`). Even the full proto3 `map` is +encoded on the wire as **repeated KV messages anyway** — the map syntax is +just sugar over `repeated Entry { key = 1; value = 2; }`. + +So flattening is making the wire shape explicit: + +``` +ChannelMeta { + ... + repeated OutgoingRepairKV outgoingRepairBuffer = 5; + repeated IncomingRepairKV incomingRepairBuffer = 6; +} + +OutgoingRepairKV { + bytes messageId = 1; + OutgoingRepairEntry entry = 2; +} +``` + +The `Table` exists only in memory; the wire and disk form is the flat seq. +Decode rebuilds the `Table` by iterating the seq. Cost: one alloc per entry +on encode/decode — negligible against the I/O it replaces. + +`outgoingBuffer` (already a `seq`) and `incomingBuffer` (a `Table` flattened +to `seq[IncomingMessage]` — the key is `message.messageId` so no separate KV +wrapper is needed) follow the same logic. + +--- + +## 7. Snapshot size estimates + +Assumptions (call out — every number below derives from these): + +| Quantity | Assumed bytes | Source | +|----------|---------------|--------| +| `SdsMessageID` | 32 | typical content-addressed id | +| `SdsParticipantID` | 32 | same | +| `SdsChannelID` | 32 | same | +| `bloomFilter` (serialized, in an `SdsMessage`) | 256 | derived from default `bloomFilterCapacity` × `errorRate` | +| `causalHistory` | 10 entries × ~40 B | `maxCausalHistory = 10` from `reliability_config.nim` | +| `repairRequest` in a wire SdsMessage | up to 3 × ~40 B | `maxRepairRequests = 3` | +| Application payload (`content`) — small | 100 B | typical short chat payload | +| Application payload — medium | 1 KB | richer payload | +| Protobuf framing | ~10% overhead | tag bytes + varints | + +**One `SdsMessage` on the wire (no content):** ~700 B +**One `SdsMessage` with 100 B content:** ~800 B +**One `SdsMessage` with 1 KB content:** ~1.7 KB + +Per-entry sizes inside `ChannelMeta`: + +| Entry | Size (100 B payload) | Size (1 KB payload) | Notes | +|-------|----------------------|---------------------|-------| +| `UnacknowledgedMessage` | ~820 B | ~1.7 KB | SdsMessage + sendTime + resendAttempts | +| `IncomingMessage` | ~950 B | ~1.9 KB | SdsMessage + missingDeps (avg 3 × 32 B) | +| `OutgoingRepairKV` | ~110 B | ~110 B | no cached message, payload-independent | +| `IncomingRepairKV` | ~920 B | ~1.8 KB | **cached serialized SdsMessage dominates** | + +Fixed overhead per `ChannelMeta`: ~30 B (schemaVersion + lamportTimestamp + framing). + +### Per-channel snapshot size by load + +| Profile | outBuf | inBuf | outRepair | inRepair | Size (100 B payload) | Size (1 KB payload) | +|---------|--------|-------|-----------|----------|----------------------|---------------------| +| Idle | 0 | 0 | 0 | 0 | **~30 B** | ~30 B | +| Light chat | 2 | 0 | 0 | 0 | **~1.7 KB** | ~3.5 KB | +| Steady | 5 | 1 | 1 | 1 | **~6 KB** | ~12 KB | +| Busy | 10 | 3 | 3 | 3 | **~14 KB** | ~28 KB | +| Heavy, lossy network (SDS-R churning) | 30 | 10 | 20 | 10 | **~45 KB** | ~95 KB | +| Pathological (resend window full, big repair caches) | 50 | 20 | 30 | 20 | **~75 KB** | ~155 KB | + +### Where the bytes go + +| Load profile | Dominant contributor | +|--------------|----------------------| +| Idle / light | Fixed overhead + outgoingBuffer | +| Steady / busy | outgoingBuffer (each entry ~1 KB+) | +| Heavy / lossy | **incomingRepairBuffer** — each KV entry caches a full serialized message for rebroadcast. This is the single biggest amplifier; 20 entries with 1 KB payloads ≈ 36 KB on their own. | + +### Implications + +1. **Typical write is small (1–30 KB).** Comfortably foldable into the + per-op KV write cost; the backend's blob-write cost is bounded. +2. **`IncomingRepairEntry.cachedMessage` is the size lever to watch.** + Under heavy SDS-R activity it dominates the snapshot. If snapshot size + becomes a bottleneck, the optimization is to drop the cache from the + snapshot and re-serialize from `messageHistory` on demand — at the cost + of more CPU and the corner case where the requested message has been + evicted from history between snapshot save and repair sweep firing. +3. **Heavy profile (~95 KB) at the predicted 6/s/ch save rate = ~570 KB/s + per channel.** A 10-channel heavy node is then ~5.7 MB/s of snapshot + churn — well within KV backend throughput, but worth a real bench + before declaring it OK. +4. **The 1 MB hard cap** suggested in §4 stays appropriate; pathological + profile at 1 KB payload is ~155 KB, leaving healthy headroom. + +--- + +## 8. Persistence failure policy — non-fatal, best-effort + +**Change from current branch.** The current implementation treats every +`rePersistenceError` as fatal: the protocol op returns `err()`, the caller +sees a failure, and normal SDS operation breaks even though the in-memory +state is fine. This is wrong for the snapshot model. + +**New policy.** +- In-memory state is the **source of truth** for protocol correctness. + Lamport clock, buffers, history, bloom filter — all live in + `ChannelContext` and are mutated under the lock before any persistence + call. SDS message processing never depends on disk state for correctness + within a session. +- Persistence is **best-effort durability**. A failed `saveChannelMeta` or + `updateHistory` does **not** abort the operation, does not return `err` + to the FFI caller, and does not corrupt protocol semantics. The next op + will issue its own snapshot — if that succeeds, on-disk state is + re-synchronised; if it also fails, the one after that tries again. +- Snapshot writes are **idempotent and self-contained.** Each + `saveChannelMeta` blob is the complete current `ChannelMeta`. A missed + write is fully recovered by any later successful write — no log of + deltas to replay, no compensating action needed. +- Bootstrap loss tolerance: if `loadChannel` fails or returns stale state + on restart, the manager starts from whatever it could load (possibly + empty). Peer traffic and SDS-R repair will re-populate it. This is the + expected behaviour of the bloom-rebuilt-from-history design extended to + the meta blob. + +**Implementation pattern.** At each save point: + +```nim +# end of wrapOutgoingMessage / unwrapReceivedMessage / etc. +if dirty: + let saveRes = await rm.persistence.saveChannelMeta(channelId, snapshot) + if saveRes.isErr: + warn "snapshot save failed; in-memory state unaffected, next op will retry", + channelId = channelId, detail = saveRes.error + # DO NOT return err; protocol op succeeded. +if appended.len > 0 or evicted.len > 0: + let histRes = await rm.persistence.updateHistory(channelId, + HistoryUpdate(append: appended, evict: evicted)) + if histRes.isErr: + warn "history update failed; in-memory log authoritative, next op will retry", + channelId = channelId, detail = histRes.error +return ok(serializedMessage) # protocol op succeeded regardless +``` + +**What still returns `err(rePersistenceError)`.** Only operations whose +**semantic intent** is durability: +- `removeChannel`, `resetReliabilityManager` → must confirm `dropChannel` + succeeded; otherwise the caller may assume disk is clean when it isn't. +- `getOrCreateChannel` on first bootstrap → if `loadChannel` errors (vs. + returns empty), surface it so the caller can decide between "start + fresh in memory" and "abort init". + +**Impact on §5 acceptance criteria.** Add: failure-injection test must +prove that `wrapOutgoingMessage`, `unwrapReceivedMessage`, +`markDependenciesMet`, `checkUnacknowledgedMessages`, `runRepairSweep` all +return `ok` under 100%-failing persistence, with correct in-memory +behaviour and a recovered on-disk state after persistence is restored. + +**Why this is safe.** Each snapshot is a full self-contained blob; +partial-write divergence (the original ANALYSIS §4 critical risk) is +already eliminated by the atomic-blob design. Once that's true, treating +persistence failure as fatal is pure downside — it propagates a +recoverable I/O hiccup into a user-visible protocol failure for no +correctness gain. + +--- + +## 9. What this plan deliberately does NOT do + +- Does not add transaction handles — the KV backend's batch primitive is sufficient +- Does not ship a reference backend — the schema-agnostic KV module in the sibling repo is the production backend +- Does not change the bloom filter persistence policy (still rebuilt from history) +- Does not introduce SDS-R repair extension changes +- Does not touch the FFI surface shape beyond construction of `Persistence` — the existing C API is unchanged +- Does not auto-migrate on-disk data from an older format (no production data exists yet; schemaVersion=1 starts clean) diff --git a/sds.nimble b/sds.nimble index 09d19d5..7c5727a 100644 --- a/sds.nimble +++ b/sds.nimble @@ -68,6 +68,7 @@ task test, "Run the test suite": exec "nim c -r --outdir:build tests/test_bloom.nim" exec "nim c -r --outdir:build tests/test_reliability.nim" exec "nim c -r --outdir:build tests/test_persistence.nim" + exec "nim c -r --outdir:build tests/test_snapshot_codec.nim" task libsdsDynamicWindows, "Generate bindings": let outLibNameAndExt = "libsds.dll" diff --git a/sds/snapshot_codec.nim b/sds/snapshot_codec.nim new file mode 100644 index 0000000..7b626c6 --- /dev/null +++ b/sds/snapshot_codec.nim @@ -0,0 +1,326 @@ +## Storage encoding for the snapshot persistence types. +## +## This is the codec nim-sds runs on its side of the persistence boundary +## to turn a `ChannelMeta` (or `ChannelData`, or `HistoryUpdate`) into the +## opaque `seq[byte]` blob the KV persistence backend stores. The KV +## backend treats the blob as fully opaque. See PLAN_SNAPSHOT_PERSISTENCE.md +## §1.5 for why this codec exists at all and §6 for the choice of protobuf. +## +## This is NOT the SDS network wire format — that lives in `sds/protobuf.nim` +## and is unchanged. Encoders for `SdsMessage` and `HistoryEntry` are reused +## from there to avoid maintaining two codecs for the same shape. + +{.push raises: [].} + +import std/[sets, times] +import libp2p/protobuf/minprotobuf +import ./types/[ + channel_meta, history_update, sds_message, sds_message_id, history_entry, + unacknowledged_message, incoming_message, repair_entry, reliability_error, +] +import ./protobufutil +import ./protobuf as wire + +export channel_meta, history_update + +# --------------------------------------------------------------------------- +# Time <-> int64 unix milliseconds +# --------------------------------------------------------------------------- +# The protocol uses `getTime()` (wall clock). For wire stability we encode +# as unix milliseconds in int64 (zigzag not needed — pre-1970 values do not +# occur in practice). Sub-millisecond precision is intentionally dropped: +# the protocol's repair backoff windows are seconds-scale. + +proc toUnixMs(t: Time): int64 = + t.toUnix * 1000'i64 + int64(t.nanosecond div 1_000_000) + +proc fromUnixMs(ms: int64): Time = + let secs = ms div 1000 + let nanos = (ms mod 1000).int * 1_000_000 + initTime(secs, nanos) + +# --------------------------------------------------------------------------- +# UnacknowledgedMessage +# --------------------------------------------------------------------------- + +proc encodeUnacked(u: UnacknowledgedMessage): ProtoBuffer = + var pb = initProtoBuffer() + let msgPb = wire.encode(u.message) + pb.write(1, msgPb.buffer) + pb.write(2, uint64(u.sendTime.toUnixMs)) + pb.write(3, uint32(u.resendAttempts)) + pb.finish() + pb + +proc decodeUnacked(buf: seq[byte]): ProtobufResult[UnacknowledgedMessage] = + let pb = initProtoBuffer(buf) + var msgBytes: seq[byte] + if not ?pb.getField(1, msgBytes): + return err(ProtobufError.missingRequiredField("UnacknowledgedMessage.message")) + let msg = SdsMessage.decode(msgBytes).valueOr: + return err(ProtobufError.missingRequiredField("UnacknowledgedMessage.message")) + var sendMs: uint64 + if not ?pb.getField(2, sendMs): + return err(ProtobufError.missingRequiredField("UnacknowledgedMessage.sendTime")) + var attempts: uint32 + discard pb.getField(3, attempts) + ok( + UnacknowledgedMessage.init( + message = msg, + sendTime = fromUnixMs(int64(sendMs)), + resendAttempts = int(attempts), + ) + ) + +# --------------------------------------------------------------------------- +# IncomingMessage +# --------------------------------------------------------------------------- + +proc encodeIncoming(m: IncomingMessage): ProtoBuffer = + var pb = initProtoBuffer() + let msgPb = wire.encode(m.message) + pb.write(1, msgPb.buffer) + for dep in m.missingDeps: + pb.write(2, dep) # SdsMessageID is string + pb.finish() + pb + +proc decodeIncoming(buf: seq[byte]): ProtobufResult[IncomingMessage] = + let pb = initProtoBuffer(buf) + var msgBytes: seq[byte] + if not ?pb.getField(1, msgBytes): + return err(ProtobufError.missingRequiredField("IncomingMessage.message")) + let msg = SdsMessage.decode(msgBytes).valueOr: + return err(ProtobufError.missingRequiredField("IncomingMessage.message")) + var deps: seq[SdsMessageID] + discard pb.getRepeatedField(2, deps) + var depSet = initHashSet[SdsMessageID]() + for d in deps: + depSet.incl(d) + ok(IncomingMessage.init(message = msg, missingDeps = depSet)) + +# --------------------------------------------------------------------------- +# OutgoingRepairEntry / OutgoingRepairKV +# --------------------------------------------------------------------------- + +proc encodeOutRepairEntry(e: OutgoingRepairEntry): ProtoBuffer = + var pb = initProtoBuffer() + let histPb = wire.encodeHistoryEntry(e.outHistEntry) + pb.write(1, histPb.buffer) + pb.write(2, uint64(e.minTimeRepairReq.toUnixMs)) + pb.finish() + pb + +proc decodeOutRepairEntry(buf: seq[byte]): ProtobufResult[OutgoingRepairEntry] = + let pb = initProtoBuffer(buf) + var histBytes: seq[byte] + if not ?pb.getField(1, histBytes): + return err(ProtobufError.missingRequiredField("OutgoingRepairEntry.outHistEntry")) + let histPb = initProtoBuffer(histBytes) + let entry = ?wire.decodeHistoryEntry(histPb) + var ms: uint64 + if not ?pb.getField(2, ms): + return err(ProtobufError.missingRequiredField("OutgoingRepairEntry.minTimeRepairReq")) + ok( + OutgoingRepairEntry.init( + outHistEntry = entry, minTimeRepairReq = fromUnixMs(int64(ms)) + ) + ) + +proc encodeOutRepairKV(kv: OutgoingRepairKV): ProtoBuffer = + var pb = initProtoBuffer() + pb.write(1, kv.messageId) + let entryPb = encodeOutRepairEntry(kv.entry) + pb.write(2, entryPb.buffer) + pb.finish() + pb + +proc decodeOutRepairKV(buf: seq[byte]): ProtobufResult[OutgoingRepairKV] = + let pb = initProtoBuffer(buf) + var msgId: SdsMessageID + if not ?pb.getField(1, msgId): + return err(ProtobufError.missingRequiredField("OutgoingRepairKV.messageId")) + var entryBytes: seq[byte] + if not ?pb.getField(2, entryBytes): + return err(ProtobufError.missingRequiredField("OutgoingRepairKV.entry")) + let entry = ?decodeOutRepairEntry(entryBytes) + ok(OutgoingRepairKV(messageId: msgId, entry: entry)) + +# --------------------------------------------------------------------------- +# IncomingRepairEntry / IncomingRepairKV +# --------------------------------------------------------------------------- + +proc encodeInRepairEntry(e: IncomingRepairEntry): ProtoBuffer = + var pb = initProtoBuffer() + let histPb = wire.encodeHistoryEntry(e.inHistEntry) + pb.write(1, histPb.buffer) + pb.write(2, e.cachedMessage) + pb.write(3, uint64(e.minTimeRepairResp.toUnixMs)) + pb.finish() + pb + +proc decodeInRepairEntry(buf: seq[byte]): ProtobufResult[IncomingRepairEntry] = + let pb = initProtoBuffer(buf) + var histBytes: seq[byte] + if not ?pb.getField(1, histBytes): + return err(ProtobufError.missingRequiredField("IncomingRepairEntry.inHistEntry")) + let histPb = initProtoBuffer(histBytes) + let entry = ?wire.decodeHistoryEntry(histPb) + var cached: seq[byte] + if not ?pb.getField(2, cached): + return err(ProtobufError.missingRequiredField("IncomingRepairEntry.cachedMessage")) + var ms: uint64 + if not ?pb.getField(3, ms): + return err(ProtobufError.missingRequiredField("IncomingRepairEntry.minTimeRepairResp")) + ok( + IncomingRepairEntry.init( + inHistEntry = entry, + cachedMessage = cached, + minTimeRepairResp = fromUnixMs(int64(ms)), + ) + ) + +proc encodeInRepairKV(kv: IncomingRepairKV): ProtoBuffer = + var pb = initProtoBuffer() + pb.write(1, kv.messageId) + let entryPb = encodeInRepairEntry(kv.entry) + pb.write(2, entryPb.buffer) + pb.finish() + pb + +proc decodeInRepairKV(buf: seq[byte]): ProtobufResult[IncomingRepairKV] = + let pb = initProtoBuffer(buf) + var msgId: SdsMessageID + if not ?pb.getField(1, msgId): + return err(ProtobufError.missingRequiredField("IncomingRepairKV.messageId")) + var entryBytes: seq[byte] + if not ?pb.getField(2, entryBytes): + return err(ProtobufError.missingRequiredField("IncomingRepairKV.entry")) + let entry = ?decodeInRepairEntry(entryBytes) + ok(IncomingRepairKV(messageId: msgId, entry: entry)) + +# --------------------------------------------------------------------------- +# ChannelMeta (top-level snapshot) +# --------------------------------------------------------------------------- + +proc encode*(meta: ChannelMeta): ProtoBuffer = + var pb = initProtoBuffer() + pb.write(1, meta.schemaVersion) + pb.write(2, uint64(meta.lamportTimestamp)) + for u in meta.outgoingBuffer: + let entryPb = encodeUnacked(u) + pb.write(3, entryPb.buffer) + for m in meta.incomingBuffer: + let entryPb = encodeIncoming(m) + pb.write(4, entryPb.buffer) + for kv in meta.outgoingRepairBuffer: + let entryPb = encodeOutRepairKV(kv) + pb.write(5, entryPb.buffer) + for kv in meta.incomingRepairBuffer: + let entryPb = encodeInRepairKV(kv) + pb.write(6, entryPb.buffer) + pb.finish() + pb + +proc decode*(T: type ChannelMeta, buf: seq[byte]): ProtobufResult[T] = + let pb = initProtoBuffer(buf) + var meta = ChannelMeta.init() + + var ver: uint32 + if not ?pb.getField(1, ver): + return err(ProtobufError.missingRequiredField("ChannelMeta.schemaVersion")) + if ver != ChannelMetaSchemaVersion: + # Per the contract: refuse loudly rather than silently truncating. + return err(ProtobufError.missingRequiredField( + "ChannelMeta.schemaVersion(unsupported)" + )) + meta.schemaVersion = ver + + var lts: uint64 + if not ?pb.getField(2, lts): + return err(ProtobufError.missingRequiredField("ChannelMeta.lamportTimestamp")) + meta.lamportTimestamp = int64(lts) + + var outBufs, inBufs, outRepBufs, inRepBufs: seq[seq[byte]] + discard pb.getRepeatedField(3, outBufs) + for b in outBufs: + meta.outgoingBuffer.add(?decodeUnacked(b)) + discard pb.getRepeatedField(4, inBufs) + for b in inBufs: + meta.incomingBuffer.add(?decodeIncoming(b)) + discard pb.getRepeatedField(5, outRepBufs) + for b in outRepBufs: + meta.outgoingRepairBuffer.add(?decodeOutRepairKV(b)) + discard pb.getRepeatedField(6, inRepBufs) + for b in inRepBufs: + meta.incomingRepairBuffer.add(?decodeInRepairKV(b)) + ok(meta) + +proc serialize*(meta: ChannelMeta): Result[seq[byte], ReliabilityError] = + ok(encode(meta).buffer) + +proc deserializeChannelMeta*( + data: seq[byte] +): Result[ChannelMeta, ReliabilityError] = + let m = ChannelMeta.decode(data).valueOr: + return err(ReliabilityError.reDeserializationError) + ok(m) + +# --------------------------------------------------------------------------- +# ChannelData (bootstrap payload) +# --------------------------------------------------------------------------- + +proc encode*(d: ChannelData): ProtoBuffer = + var pb = initProtoBuffer() + let metaPb = encode(d.meta) + pb.write(1, metaPb.buffer) + for m in d.messageHistory: + let msgPb = wire.encode(m) + pb.write(2, msgPb.buffer) + pb.finish() + pb + +proc decode*(T: type ChannelData, buf: seq[byte]): ProtobufResult[T] = + let pb = initProtoBuffer(buf) + var d = ChannelData.init() + var metaBytes: seq[byte] + if not ?pb.getField(1, metaBytes): + return err(ProtobufError.missingRequiredField("ChannelData.meta")) + d.meta = ?ChannelMeta.decode(metaBytes) + var histBufs: seq[seq[byte]] + discard pb.getRepeatedField(2, histBufs) + for b in histBufs: + let m = SdsMessage.decode(b).valueOr: + return err(ProtobufError.missingRequiredField("ChannelData.messageHistory[i]")) + d.messageHistory.add(m) + ok(d) + +# --------------------------------------------------------------------------- +# HistoryUpdate +# --------------------------------------------------------------------------- + +proc encode*(u: HistoryUpdate): ProtoBuffer = + var pb = initProtoBuffer() + for m in u.append: + let msgPb = wire.encode(m) + pb.write(1, msgPb.buffer) + for id in u.evict: + pb.write(2, id) + pb.finish() + pb + +proc decode*(T: type HistoryUpdate, buf: seq[byte]): ProtobufResult[T] = + let pb = initProtoBuffer(buf) + var u = HistoryUpdate.init() + var appBufs: seq[seq[byte]] + discard pb.getRepeatedField(1, appBufs) + for b in appBufs: + let m = SdsMessage.decode(b).valueOr: + return err(ProtobufError.missingRequiredField("HistoryUpdate.append[i]")) + u.append.add(m) + var ev: seq[SdsMessageID] + discard pb.getRepeatedField(2, ev) + u.evict = ev + ok(u) + +{.pop.} diff --git a/sds/types/channel_meta.nim b/sds/types/channel_meta.nim new file mode 100644 index 0000000..0d1f1fb --- /dev/null +++ b/sds/types/channel_meta.nim @@ -0,0 +1,82 @@ +## Atomic snapshot types for the per-channel protocol state. +## +## These types replace the fine-grained mutation operations of the original +## Persistence interface with a single self-contained blob per channel. +## See PLAN_SNAPSHOT_PERSISTENCE.md §1 for the rationale, §6 for the codec +## choice, §7 for size estimates. +## +## Bloom filter is intentionally absent — rebuilt from the message log on +## bootstrap. Message history is also absent — persisted separately via +## `HistoryUpdate` because it is large and append-mostly. + +import ./sds_message_id +import ./sds_message +import ./unacknowledged_message +import ./incoming_message +import ./repair_entry +export + sds_message_id, sds_message, unacknowledged_message, incoming_message, + repair_entry + +const ChannelMetaSchemaVersion* = 1'u32 + ## On-disk format version for ChannelMeta. Decoders MUST refuse to load a + ## blob whose version they don't know how to interpret, rather than + ## silently truncating or zero-filling unknown fields. + +type + OutgoingRepairKV* = object + ## Flattened (key, value) entry from the in-memory + ## `outgoingRepairBuffer: Table[SdsMessageID, OutgoingRepairEntry]`. + ## Protobuf has no first-class map type in the minprotobuf subset we + ## use; even proto3 `map` is wire-encoded as repeated KV messages. + ## Flattening to a `seq[KV]` makes that shape explicit. + messageId*: SdsMessageID + entry*: OutgoingRepairEntry + + IncomingRepairKV* = object + ## Flattened (key, value) entry from + ## `incomingRepairBuffer: Table[SdsMessageID, IncomingRepairEntry]`. + messageId*: SdsMessageID + entry*: IncomingRepairEntry + + ChannelMeta* = object + ## Atomic snapshot of the fast-changing per-channel protocol state. + ## Persisted as one blob per `saveChannelMeta` call. The `Table`-backed + ## buffers in `ChannelContext` are flattened to `seq`s here for stable + ## serialization and deterministic ordering on disk. + schemaVersion*: uint32 + lamportTimestamp*: int64 + outgoingBuffer*: seq[UnacknowledgedMessage] + ## Sent-but-not-yet-acked. Order matches insertion order in + ## ChannelContext.outgoingBuffer; preserved on save/load. + incomingBuffer*: seq[IncomingMessage] + ## Received-but-not-yet-deliverable; key in memory is + ## `message.messageId`, so no KV wrapper is needed. + outgoingRepairBuffer*: seq[OutgoingRepairKV] + incomingRepairBuffer*: seq[IncomingRepairKV] + + ChannelData* = object + ## Returned by `loadChannel` on `getOrCreateChannel` bootstrap. + ## Carries everything needed to rebuild the in-memory `ChannelContext` + ## from a clean restart. + meta*: ChannelMeta + messageHistory*: seq[SdsMessage] + ## MUST be ordered oldest-first (lamportTimestamp ASC, tie-break + ## msg_id ASC). Bloom filter is rebuilt from this on load; FIFO + ## eviction at maxMessageHistory relies on this ordering. Backend + ## contract; the loader SHOULD validate. + +proc init*(T: type ChannelMeta): T = + ## Empty snapshot with current schema version. Used as the bootstrap + ## payload when no on-disk state exists for a channel. + T( + schemaVersion: ChannelMetaSchemaVersion, + lamportTimestamp: 0, + outgoingBuffer: @[], + incomingBuffer: @[], + outgoingRepairBuffer: @[], + incomingRepairBuffer: @[], + ) + +proc init*(T: type ChannelData): T = + T(meta: ChannelMeta.init(), messageHistory: @[]) diff --git a/sds/types/history_update.nim b/sds/types/history_update.nim new file mode 100644 index 0000000..422bbef --- /dev/null +++ b/sds/types/history_update.nim @@ -0,0 +1,30 @@ +## Combined append/evict payload for the persistence message log. +## +## One protocol operation may deliver multiple messages in sequence (e.g. +## `unwrapReceivedMessage` followed by a `processIncomingBuffer` cascade +## that unblocks several buffered messages), and may also evict the oldest +## entries past `maxMessageHistory` in the same operation. Bundling all of +## those into a single `HistoryUpdate` lets the persistence backend execute +## the append + evict as one atomic batch alongside the matching +## `saveChannelMeta` call. + +import ./sds_message_id +import ./sds_message +export sds_message_id, sds_message + +type HistoryUpdate* = object + ## When BOTH `append` and `evict` are empty, callers SHOULD skip the + ## persistence call entirely. The Persistence interface treats an + ## "empty" update as a no-op but the round-trip is not free. + append*: seq[SdsMessage] + ## New delivered messages, in delivery order. Order matters for the + ## backend's append-only log; nim-sds preserves causal ordering when + ## populating this list. + evict*: seq[SdsMessageID] + ## Oldest messages now past `maxMessageHistory`. Backend deletes by id. + +proc init*(T: type HistoryUpdate): T = + T(append: @[], evict: @[]) + +proc isEmpty*(u: HistoryUpdate): bool = + u.append.len == 0 and u.evict.len == 0 diff --git a/tests/test_snapshot_codec.nim b/tests/test_snapshot_codec.nim new file mode 100644 index 0000000..be52365 --- /dev/null +++ b/tests/test_snapshot_codec.nim @@ -0,0 +1,235 @@ +## Round-trip tests for the snapshot persistence codec. +## Each `encode` → `decode` cycle must preserve every field exactly. + +import std/[times, sets, unittest] +import results +import ../sds/snapshot_codec +import + ../sds/types/[ + sds_message, sds_message_id, history_entry, unacknowledged_message, + incoming_message, repair_entry, + ] + +converter toParticipantID(s: string): SdsParticipantID = + s.SdsParticipantID + +proc mkMsg(id: string, ts: int64 = 1, content: seq[byte] = @[]): SdsMessage = + SdsMessage.init( + messageId = id, + lamportTimestamp = ts, + causalHistory = @[], + channelId = "chan", + content = content, + bloomFilter = @[], + senderId = "alice", + repairRequest = @[], + ) + +proc mkHistEntry(id: string): HistoryEntry = + HistoryEntry.init(messageId = id, senderId = "alice") + +suite "snapshot codec — ChannelMeta": + test "empty meta round-trips": + let m = ChannelMeta.init() + let buf = encode(m).buffer + let dec = ChannelMeta.decode(buf).get() + check: + dec.schemaVersion == ChannelMetaSchemaVersion + dec.lamportTimestamp == 0 + dec.outgoingBuffer.len == 0 + dec.incomingBuffer.len == 0 + dec.outgoingRepairBuffer.len == 0 + dec.incomingRepairBuffer.len == 0 + + test "meta with lamport and single outgoing entry": + var m = ChannelMeta.init() + m.lamportTimestamp = 42 + m.outgoingBuffer.add( + UnacknowledgedMessage.init( + message = mkMsg("m1", 42, @[1.byte, 2, 3]), + sendTime = fromUnix(1_700_000_000), + resendAttempts = 2, + ) + ) + let buf = encode(m).buffer + let dec = ChannelMeta.decode(buf).get() + check: + dec.lamportTimestamp == 42 + dec.outgoingBuffer.len == 1 + dec.outgoingBuffer[0].message.messageId == "m1" + dec.outgoingBuffer[0].message.content == @[1.byte, 2, 3] + dec.outgoingBuffer[0].resendAttempts == 2 + dec.outgoingBuffer[0].sendTime.toUnix == 1_700_000_000 + + test "meta with incoming entry carrying missing deps": + var m = ChannelMeta.init() + var deps = initHashSet[SdsMessageID]() + deps.incl("dep1") + deps.incl("dep2") + m.incomingBuffer.add( + IncomingMessage.init(message = mkMsg("m2"), missingDeps = deps) + ) + let buf = encode(m).buffer + let dec = ChannelMeta.decode(buf).get() + check: + dec.incomingBuffer.len == 1 + dec.incomingBuffer[0].message.messageId == "m2" + dec.incomingBuffer[0].missingDeps == deps + + test "meta with both repair buffers populated": + var m = ChannelMeta.init() + m.outgoingRepairBuffer.add( + OutgoingRepairKV( + messageId: "missing1", + entry: OutgoingRepairEntry.init( + outHistEntry = mkHistEntry("missing1"), + minTimeRepairReq = fromUnix(1_700_000_100), + ), + ) + ) + m.incomingRepairBuffer.add( + IncomingRepairKV( + messageId: "requested1", + entry: IncomingRepairEntry.init( + inHistEntry = mkHistEntry("requested1"), + cachedMessage = @[9.byte, 8, 7, 6], + minTimeRepairResp = fromUnix(1_700_000_200), + ), + ) + ) + let buf = encode(m).buffer + let dec = ChannelMeta.decode(buf).get() + check: + dec.outgoingRepairBuffer.len == 1 + dec.outgoingRepairBuffer[0].messageId == "missing1" + dec.outgoingRepairBuffer[0].entry.minTimeRepairReq.toUnix == + 1_700_000_100 + dec.incomingRepairBuffer.len == 1 + dec.incomingRepairBuffer[0].messageId == "requested1" + dec.incomingRepairBuffer[0].entry.cachedMessage == @[9.byte, 8, 7, 6] + dec.incomingRepairBuffer[0].entry.minTimeRepairResp.toUnix == + 1_700_000_200 + + test "fully-populated meta — multiple entries each buffer": + var m = ChannelMeta.init() + m.lamportTimestamp = 999 + for i in 0 ..< 5: + m.outgoingBuffer.add( + UnacknowledgedMessage.init( + message = mkMsg("o" & $i, int64(i), @[byte(i)]), + sendTime = fromUnix(1_700_000_000 + i.int64), + resendAttempts = i, + ) + ) + for i in 0 ..< 3: + var deps = initHashSet[SdsMessageID]() + deps.incl("dep" & $i) + m.incomingBuffer.add( + IncomingMessage.init( + message = mkMsg("i" & $i, int64(100 + i)), missingDeps = deps + ) + ) + for i in 0 ..< 4: + m.outgoingRepairBuffer.add( + OutgoingRepairKV( + messageId: "or" & $i, + entry: OutgoingRepairEntry.init( + outHistEntry = mkHistEntry("or" & $i), + minTimeRepairReq = fromUnix(1_700_000_300 + i.int64), + ), + ) + ) + for i in 0 ..< 2: + m.incomingRepairBuffer.add( + IncomingRepairKV( + messageId: "ir" & $i, + entry: IncomingRepairEntry.init( + inHistEntry = mkHistEntry("ir" & $i), + cachedMessage = @[byte(i), byte(i + 1)], + minTimeRepairResp = fromUnix(1_700_000_400 + i.int64), + ), + ) + ) + let buf = encode(m).buffer + let dec = ChannelMeta.decode(buf).get() + check: + dec.lamportTimestamp == 999 + dec.outgoingBuffer.len == 5 + dec.incomingBuffer.len == 3 + dec.outgoingRepairBuffer.len == 4 + dec.incomingRepairBuffer.len == 2 + dec.outgoingBuffer[4].message.messageId == "o4" + dec.outgoingBuffer[4].resendAttempts == 4 + dec.outgoingRepairBuffer[3].messageId == "or3" + dec.incomingRepairBuffer[1].entry.cachedMessage == @[1.byte, 2] + + test "decoder rejects unknown schemaVersion": + var m = ChannelMeta.init() + m.schemaVersion = 999'u32 + let buf = encode(m).buffer + check ChannelMeta.decode(buf).isErr + +suite "snapshot codec — ChannelData": + test "empty channel data round-trips": + let d = ChannelData.init() + let buf = encode(d).buffer + let dec = ChannelData.decode(buf).get() + check: + dec.meta.schemaVersion == ChannelMetaSchemaVersion + dec.messageHistory.len == 0 + + test "channel data with meta and history preserves order": + var d = ChannelData.init() + d.meta.lamportTimestamp = 17 + d.messageHistory.add(mkMsg("h1", 1)) + d.messageHistory.add(mkMsg("h2", 2)) + d.messageHistory.add(mkMsg("h3", 3)) + let buf = encode(d).buffer + let dec = ChannelData.decode(buf).get() + check: + dec.meta.lamportTimestamp == 17 + dec.messageHistory.len == 3 + dec.messageHistory[0].messageId == "h1" + dec.messageHistory[1].messageId == "h2" + dec.messageHistory[2].messageId == "h3" + +suite "snapshot codec — HistoryUpdate": + test "empty update reports isEmpty (callers skip persistence)": + # By contract (HistoryUpdate doc): when both append and evict are + # empty, callers MUST skip the persistence call entirely. The codec + # is not required to round-trip an empty update — minprotobuf's + # finish() refuses an empty buffer, by design. + let u = HistoryUpdate.init() + check u.isEmpty + + test "append-only update": + var u = HistoryUpdate.init() + u.append.add(mkMsg("a1")) + u.append.add(mkMsg("a2")) + let buf = encode(u).buffer + let dec = HistoryUpdate.decode(buf).get() + check: + dec.append.len == 2 + dec.append[0].messageId == "a1" + dec.append[1].messageId == "a2" + dec.evict.len == 0 + + test "evict-only update": + var u = HistoryUpdate.init() + u.evict = @["e1", "e2", "e3"] + let buf = encode(u).buffer + let dec = HistoryUpdate.decode(buf).get() + check: + dec.append.len == 0 + dec.evict == @["e1", "e2", "e3"] + + test "mixed append + evict update": + var u = HistoryUpdate.init() + u.append.add(mkMsg("new")) + u.evict = @["old1", "old2"] + let buf = encode(u).buffer + let dec = HistoryUpdate.decode(buf).get() + check: + dec.append.len == 1 + dec.append[0].messageId == "new" + dec.evict == @["old1", "old2"]