diff --git a/AGENTS.md b/AGENTS.md index 84d8372..ff1cb75 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -1,7 +1,7 @@ # GitNexus — Code Intelligence -This project is indexed by GitNexus as **nim-sds** (889 symbols, 1437 relationships, 45 execution flows). Use the GitNexus MCP tools to understand code, assess impact, and navigate safely. +This project is indexed by GitNexus as **nim-sds** (1100 symbols, 1820 relationships, 66 execution flows). Use the GitNexus MCP tools to understand code, assess impact, and navigate safely. > If any GitNexus tool warns the index is stale, run `npx gitnexus analyze` in terminal first. @@ -40,4 +40,4 @@ This project is indexed by GitNexus as **nim-sds** (889 symbols, 1437 relationsh | Tools, resources, schema reference | `.claude/skills/gitnexus/gitnexus-guide/SKILL.md` | | Index, status, clean, wiki CLI commands | `.claude/skills/gitnexus/gitnexus-cli/SKILL.md` | - \ No newline at end of file + diff --git a/CLAUDE.md b/CLAUDE.md index 1df91d7..94736b6 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -166,7 +166,7 @@ If using Nix, also recalculate the fixed-output hash in `nix/deps.nix` after upd # GitNexus — Code Intelligence -This project is indexed by GitNexus as **nim-sds** (889 symbols, 1437 relationships, 45 execution flows). Use the GitNexus MCP tools to understand code, assess impact, and navigate safely. +This project is indexed by GitNexus as **nim-sds** (1100 symbols, 1820 relationships, 66 execution flows). Use the GitNexus MCP tools to understand code, assess impact, and navigate safely. > If any GitNexus tool warns the index is stale, run `npx gitnexus analyze` in terminal first. diff --git a/doc/PLAN_SNAPSHOT_PERSISTENCE.md b/doc/PLAN_SNAPSHOT_PERSISTENCE.md new file mode 100644 index 0000000..b66d0fa --- /dev/null +++ b/doc/PLAN_SNAPSHOT_PERSISTENCE.md @@ -0,0 +1,502 @@ +# SDS Snapshot Persistence — Design & Refactor Plan + +Companion to `ANALYSIS_SDS_PERSISTENCE.md` (problem statement) and +`ANALYSIS_SNAPSHOT_SAVE_POINTS.md` (where & how often we save). + +This document defines: +1. **Data structures** to be persisted (snapshot + history) +2. **New `Persistence` interface** (5 procs replacing the current 13) +3. **Refactor plan** — phased, test-gated, backward-compatible interim state + +--- + +## 1. Data Structure Design + +### 1.1 Design principles + +| Principle | Reason | +|-----------|--------| +| Snapshot is **one atomic blob** | Eliminates partial-write divergence (the root cause from ANALYSIS_SDS_PERSISTENCE.md §4) | +| Snapshot is **small** (buffers only, no history) | Keeps per-op write cost ≤ a few KB; foldable into one SQLite txn | +| History is **separate, append-batched** | Large data, append-mostly, queryable by msg_id for SDS-R | +| Bloom filter is **not persisted** | Already the case — rebuilt from history on bootstrap | +| **Versioned wire format** | Allow future schema evolution without breaking on-disk data | +| **Protobuf** serialization | Project already uses it (`sds/protobuf.nim`); keeps one codec | + +### 1.2 `ChannelMeta` — the snapshot payload + +```nim +# sds/types/channel_meta.nim (new file) + +import std/[tables, times] +import ./sds_message_id +import ./unacknowledged_message +import ./incoming_message +import ./repair_entry +export + sds_message_id, unacknowledged_message, incoming_message, repair_entry + +const ChannelMetaSchemaVersion* = 1'u32 + +type ChannelMeta* = object + ## Atomic snapshot of the fast-changing per-channel protocol state. + ## Persisted as one blob per `saveChannelMeta` call. Bloom filter is + ## intentionally absent — rebuilt from the message log on bootstrap. + ## Message history is also absent — persisted separately via `updateHistory` + ## because it is large and append-mostly. + schemaVersion*: uint32 + ## On-disk format version. Backends MUST refuse to load a meta whose + ## version they don't know how to decode rather than silently truncating + ## or zero-filling unknown fields. + + lamportTimestamp*: int64 + + outgoingBuffer*: seq[UnacknowledgedMessage] + ## Sent-but-not-yet-acked messages. Order matters: the protocol iterates + ## in insertion order for resend-attempt accounting. + + incomingBuffer*: seq[IncomingMessage] + ## Received-but-not-yet-deliverable messages, each carrying its + ## still-missing dependency set. Order is irrelevant; flattened from + ## the in-memory `Table` for wire-friendliness. + + outgoingRepairBuffer*: seq[OutgoingRepairKV] + incomingRepairBuffer*: seq[IncomingRepairKV] + ## SDS-R repair buffers, flattened from in-memory `Table` to seq of + ## (key, value) for stable serialization. + +type + OutgoingRepairKV* = object + messageId*: SdsMessageID + entry*: OutgoingRepairEntry + + IncomingRepairKV* = object + messageId*: SdsMessageID + entry*: IncomingRepairEntry +``` + +**Why flatten the `Table`s to `seq`s?** +Protobuf has no native map of `SdsMessageID → object`. Flattening to `seq` of KV +objects gives deterministic encoding and trivial decode-time rebuild of the +in-memory `Table`. The cost is one extra alloc per entry on encode/decode — +negligible vs. the I/O it replaces. + +**Why an explicit `schemaVersion`?** +The current interface has no version field. Adding fields later (e.g., a new +SDS-R counter) silently truncates old data on load. The version makes +incompatibility explicit; backends fail loud instead of corrupting state. + +### 1.3 `HistoryAppend` — the history-write payload + +```nim +# extension to sds/types/persistence.nim or new history_update.nim + +type HistoryUpdate* = object + ## Combined append/evict for one protocol operation. Empty `append` and + ## empty `evict` ⇒ caller should skip the call entirely. + append*: seq[SdsMessage] + ## New delivered messages, in delivery order (matters for SDS-R retrieval + ## hint correctness and FIFO eviction on the backend side). + evict*: seq[SdsMessageID] + ## Oldest messages now past `maxMessageHistory`. Backend deletes by id. +``` + +`append` is a `seq` (not a single `SdsMessage`) because `processIncomingBuffer` +can deliver a chain of unblocked messages in one call to the parent op +(`unwrapReceivedMessage` / `markDependenciesMet`). Sending them all in one +`updateHistory` call keeps the "one save per protocol op" guarantee. + +### 1.4 `ChannelData` — the bootstrap payload + +```nim +type ChannelData* = object + ## Returned by `loadChannel` on `getOrCreateChannel` bootstrap. + ## Carries everything needed to rebuild the in-memory `ChannelContext` + ## from a clean restart. + meta*: ChannelMeta + messageHistory*: seq[SdsMessage] + ## MUST be ordered oldest-first (lamportTimestamp ASC, tie-break msg_id + ## ASC). Bloom filter is rebuilt from this on load; FIFO eviction relies + ## on this ordering. Backend contract; validated by nim-sds on load. +``` + +### 1.5 Storage encoding (internal to nim-sds — not the SDS network wire format) + +**Disambiguation.** The SDS **network** wire format (bytes peers exchange) is +handled by the existing `sds/protobuf.nim` and is untouched by this plan. +What this section defines is the **storage** encoding: the codec nim-sds uses +to turn a `ChannelMeta` Nim object into the opaque `seq[byte]` blob it hands +to `saveChannelMeta`. The KV persistence worker treats that blob as +fully opaque — it stores `(key: bytes) → (value: bytes)` and does its own +buffering/batching of writes. Whether nim-sds uses protobuf, CBOR, or +anything else is invisible to the worker. + +**Why this codec exists at all.** The worker stores bytes; something must +produce those bytes from the in-memory `ChannelMeta`. That responsibility +sits inside nim-sds, on the producer side of the persistence boundary. It +runs synchronously inside `saveChannelMeta`, before the blob crosses to the +worker. + +**Choice: protobuf, reusing the existing toolchain.** +- `sds/protobuf.nim` is already a dependency and already encodes `SdsMessage` +- Field-number versioning composes naturally with the explicit `schemaVersion` +- Encoders for the new types compose on top of the existing `SdsMessage` one + — no new codec to maintain + +**Encoders to add:** +- `UnacknowledgedMessage` (wraps `SdsMessage` + `sendTime: int64` unix-ms + `resendAttempts: uint32`) +- `IncomingMessage` (wraps `SdsMessage` + `missingDeps: repeated bytes`) +- `OutgoingRepairEntry` / `IncomingRepairEntry` (HistoryEntry + Time + optional cachedMessage) +- `OutgoingRepairKV` / `IncomingRepairKV` (msgId + entry — flattened map; see §6) +- `ChannelMeta` (top-level) + +`Time` is serialized as `int64` unix milliseconds. The wall-clock semantics +are already used by the protocol itself (`getTime()` in `wrapOutgoingMessage`). + +**On durability.** Because the worker buffers blobs, `saveChannelMeta` +returning `ok()` means "the blob was accepted by the worker," not "the blob +is fsynced." That is the worker's contract to manage. nim-sds's own +invariant — one snapshot save per protocol op, after all in-memory mutation +completes — is satisfied as soon as the worker accepts the blob, because +on recovery the worker replays its own buffer in order, so the snapshot +nim-sds last issued is the snapshot nim-sds will see on next `loadChannel`. + +--- + +## 2. New `Persistence` Interface + +Replace the current 13 procs in `sds/types/persistence.nim` with **5**: + +```nim +type Persistence* = object + saveChannelMeta*: proc( + channelId: SdsChannelID, meta: ChannelMeta + ): Future[Result[void, string]] {.async: (raises: []), gcsafe.} + + updateHistory*: proc( + channelId: SdsChannelID, update: HistoryUpdate + ): Future[Result[void, string]] {.async: (raises: []), gcsafe.} + + loadChannel*: proc( + channelId: SdsChannelID + ): Future[Result[ChannelData, string]] {.async: (raises: []), gcsafe.} + + dropChannel*: proc( + channelId: SdsChannelID + ): Future[Result[void, string]] {.async: (raises: []), gcsafe.} + + setRetrievalHint*: proc( + msgId: SdsMessageID, hint: seq[byte] + ): Future[Result[void, string]] {.async: (raises: []), gcsafe.} +``` + +### Atomicity contract (documented in the interface comment) + +> Backends SHOULD execute `saveChannelMeta` and the immediately following +> `updateHistory` call within a single transaction when both arrive together +> from the same protocol op. nim-sds always issues them back-to-back under +> the channel lock, with no `await`-of-other-work in between, so the backend +> can either (a) buffer `saveChannelMeta` until the next `updateHistory` or +> `flush`, or (b) use a `txn(channelId)` handle. Variant (b) is cleaner; see +> §3.2 for the optional `beginTxn`/`commitTxn` extension. + +### Backend assumption: schema-agnostic KV blob store + +The target backend is the existing schema-agnostic KV persistence module in +the sibling repo. It stores opaque `(key: bytes) → (value: bytes)` blobs with +its own crash-consistency guarantees. Therefore: + +- nim-sds owns the wire format end-to-end (no SQL schema to coordinate) +- The "single transaction per op" requirement reduces to "two KV puts per + op": `meta:` and `history::` (one or more) +- The backend's existing batch/atomicity primitives are what guarantee + crash consistency — nim-sds doesn't need transaction-handle plumbing + +--- + +## 3. Refactor Plan + +### Phase 0 — Pre-work (no behavior change) + +| Step | File(s) | Verify | +|------|---------|--------| +| 0.1 Add `ChannelMeta`, `HistoryUpdate`, `ChannelData` types | new `sds/types/channel_meta.nim`, `sds/types/history_update.nim` | `nimble c sds.nim` compiles | +| 0.2 Add protobuf encoders/decoders for new types | extend `sds/protobuf.nim` | round-trip unit tests | +| 0.3 Add `tests/test_snapshot_codec.nim` | new test file | `nimble test` passes; covers empty, single-entry, full-buffer, repair-heavy cases | + +### Phase 1 — New interface alongside old + +| Step | File(s) | Verify | +|------|---------|--------| +| 1.1 Add new 5-proc `Persistence` type as `PersistenceV2` (rename later) | `sds/types/persistence.nim` | compiles; old interface still works | +| 1.2 Add `noOpPersistenceV2()` for tests | same | `nimble test` passes | +| 1.3 Add `ReliabilityManager.persistenceV2` field, optional | `sds/types/reliability_manager.nim` | one of `persistence` / `persistenceV2` is in use; assert at construction | + +### Phase 2 — Migrate protocol ops, one at a time + +For each op, the pattern is: +1. Add a `dirty: bool` local accumulator +2. Replace inner `await rm.persistence.X` calls with in-memory mutation + set `dirty = true` +3. At the end of the op (under lock, before `return`), emit at most one `saveChannelMeta` and at most one `updateHistory` call + +Order (least risky → highest risk): + +| Step | Op | File:line | Verify | +|------|-----|-----------|--------| +| 2.1 | `runRepairSweep` | sds.nim:510 | repair sweep unit test, with failure injection | +| 2.2 | `checkUnacknowledgedMessages` | sds.nim:445 | resend-flow integration test | +| 2.3 | `processIncomingBuffer` → pure (no persistence) | sds.nim:176 | callers will persist; covered by 2.4/2.5 | +| 2.4 | `reviewAckStatus` → pure (no persistence) | sds.nim:36 | covered by 2.5 | +| 2.5 | `unwrapReceivedMessage` | sds.nim:235 | full receive-path tests (paths A/B/C); duplicate early-return must skip save | +| 2.6 | `wrapOutgoingMessage` | sds.nim:87 | send-path tests | +| 2.7 | `markDependenciesMet` | sds.nim:378 | dep-resolution tests | +| 2.8 | `addToHistory` → return appended/evicted lists instead of persisting | sds_utils.nim:81 | covered by 2.5/2.6/2.7 | +| 2.9 | `updateLamportTimestamp` → pure (no persistence) | sds_utils.nim:108 | covered | +| 2.10 | `getOrCreateChannel` use `loadChannel` | sds_utils.nim:289 | bootstrap unit test | +| 2.11 | `removeChannel`, `resetReliabilityManager` → `dropChannel` | sds_utils.nim, sds.nim | wipe tests | + +Each step is a small commit. After every step: `nimble test` + `gitnexus_detect_changes` to confirm scope. + +### Phase 3 — Remove the old interface + +| Step | File(s) | Verify | +|------|---------|--------| +| 3.1 Delete old 13-proc `Persistence` fields | `sds/types/persistence.nim` | compile fails on stragglers — fix | +| 3.2 Rename `PersistenceV2` → `Persistence` | all call sites | full test suite | +| 3.3 Delete `noOpPersistence` (old), keep `noOpPersistenceV2` as `noOpPersistence` | same | tests pass | +| 3.4 Update `library/` FFI thread to construct the new `Persistence` | `library/sds_thread/...` | FFI smoke test on macOS + Linux | +| 3.5 Update `Broker_FFI_API.md` and any docs referencing the old contract | docs | review | + +### Phase 4 — (removed) + +A reference backend is **not** part of this plan. The schema-agnostic KV +persistence module in the sibling repo is the production backend. Its +authors own the integration adapter that maps the 5 `Persistence` procs onto +KV puts/gets. nim-sds only needs to expose the interface and a working +`noOpPersistence` for its own tests. + +--- + +## 4. Risk Mitigation During Refactor + +| Risk | Mitigation | +|------|------------| +| Mid-refactor inconsistency (some ops on new interface, some on old) | Phase 2 keeps both interfaces wired — only one is active per RM via a constructor switch; integration tests run against both | +| Behavior change masked by passing tests | Add `tests/test_persistence_contract.nim` that asserts exact call count per protocol op (before vs after must match the table in `ANALYSIS_SNAPSHOT_SAVE_POINTS.md`) | +| Memory-first mutation pattern preserved by accident | Move *all* persistence calls to the end of the op, after the lock-held mutation block completes. The dirty flag is set *during* mutation; the save fires *after*. If save fails, the in-memory state is still the source of truth for the next op — but now there's only one possible point of divergence per op, not 10. | +| FFI thread breakage | Phase 3.4 is the FFI cutover; smoke test on both `--mm:refc` and `--mm:orc`, macOS and Linux, before declaring done. ASAN run on the FFI example. | +| Snapshot blob growth surprises | Add a `len()` log on `saveChannelMeta` for the first week of integration; fail-loud if any blob exceeds (configurable) 1 MB | + +--- + +## 5. Acceptance Criteria + +- [ ] All existing `nimble test` cases pass against the new interface +- [ ] New `tests/test_persistence_contract.nim` enforces exactly the call counts from `ANALYSIS_SNAPSHOT_SAVE_POINTS.md` §"Save Points" table +- [ ] New `tests/test_snapshot_codec.nim` round-trips every `ChannelMeta` variant +- [ ] Failure-injection test: kill persistence between `saveChannelMeta` and `updateHistory` → on restart, the manager loads a self-consistent snapshot (no orphan history entries; no dangling buffer references) +- [ ] FFI smoke (`liblogosdelivery`-style) runs clean on macOS+refc, macOS+orc, Linux+refc, Linux+orc +- [ ] `Broker_FFI_API.md` reflects the new contract +- [ ] Bench: snapshot save rate matches the predicted `S + R` (foreground) and ≤ 0.2/s/channel background floor (with dirty-guard) under a synthetic 50-msg/s workload +- [ ] Snapshot blob size on the bench workload matches the estimate in §7 within 2×; outliers logged + +--- + +## 6. Codec & flattening — where protobuf comes in + +### Codec choice + +The KV backend stores opaque blobs. The codec that produces the blob is +**internal to nim-sds**. Protobuf is the natural choice because: + +- The project already uses protobuf for the SDS wire format + (`sds/protobuf.nim` encodes `SdsMessage`). One codec, one toolchain. +- Field-number versioning gives forward/backward compatibility for free — + pairs naturally with the `schemaVersion` field. +- Repeated message fields encode efficiently and round-trip cleanly. + +Concretely: `ChannelMeta` is a top-level protobuf message; `saveChannelMeta` +serializes it to `seq[byte]` and the backend writes that under +`meta:`. On load, the backend returns the bytes; nim-sds +deserializes. + +### Why flatten `Table[Id, Entry]` to `seq[KV]` + +Protobuf's wire format has no first-class "map of bytes-key → message-value" +type in the minimal subset used by `sds/protobuf.nim` (the +`nim-libp2p`-style `minprotobuf`). Even the full proto3 `map` is +encoded on the wire as **repeated KV messages anyway** — the map syntax is +just sugar over `repeated Entry { key = 1; value = 2; }`. + +So flattening is making the wire shape explicit: + +``` +ChannelMeta { + ... + repeated OutgoingRepairKV outgoingRepairBuffer = 5; + repeated IncomingRepairKV incomingRepairBuffer = 6; +} + +OutgoingRepairKV { + bytes messageId = 1; + OutgoingRepairEntry entry = 2; +} +``` + +The `Table` exists only in memory; the wire and disk form is the flat seq. +Decode rebuilds the `Table` by iterating the seq. Cost: one alloc per entry +on encode/decode — negligible against the I/O it replaces. + +`outgoingBuffer` (already a `seq`) and `incomingBuffer` (a `Table` flattened +to `seq[IncomingMessage]` — the key is `message.messageId` so no separate KV +wrapper is needed) follow the same logic. + +--- + +## 7. Snapshot size estimates + +Assumptions (call out — every number below derives from these): + +| Quantity | Assumed bytes | Source | +|----------|---------------|--------| +| `SdsMessageID` | 32 | typical content-addressed id | +| `SdsParticipantID` | 32 | same | +| `SdsChannelID` | 32 | same | +| `bloomFilter` (serialized, in an `SdsMessage`) | 256 | derived from default `bloomFilterCapacity` × `errorRate` | +| `causalHistory` | 10 entries × ~40 B | `maxCausalHistory = 10` from `reliability_config.nim` | +| `repairRequest` in a wire SdsMessage | up to 3 × ~40 B | `maxRepairRequests = 3` | +| Application payload (`content`) — small | 100 B | typical short chat payload | +| Application payload — medium | 1 KB | richer payload | +| Protobuf framing | ~10% overhead | tag bytes + varints | + +**One `SdsMessage` on the wire (no content):** ~700 B +**One `SdsMessage` with 100 B content:** ~800 B +**One `SdsMessage` with 1 KB content:** ~1.7 KB + +Per-entry sizes inside `ChannelMeta`: + +| Entry | Size (100 B payload) | Size (1 KB payload) | Notes | +|-------|----------------------|---------------------|-------| +| `UnacknowledgedMessage` | ~820 B | ~1.7 KB | SdsMessage + sendTime + resendAttempts | +| `IncomingMessage` | ~950 B | ~1.9 KB | SdsMessage + missingDeps (avg 3 × 32 B) | +| `OutgoingRepairKV` | ~110 B | ~110 B | no cached message, payload-independent | +| `IncomingRepairKV` | ~920 B | ~1.8 KB | **cached serialized SdsMessage dominates** | + +Fixed overhead per `ChannelMeta`: ~30 B (schemaVersion + lamportTimestamp + framing). + +### Per-channel snapshot size by load + +| Profile | outBuf | inBuf | outRepair | inRepair | Size (100 B payload) | Size (1 KB payload) | +|---------|--------|-------|-----------|----------|----------------------|---------------------| +| Idle | 0 | 0 | 0 | 0 | **~30 B** | ~30 B | +| Light chat | 2 | 0 | 0 | 0 | **~1.7 KB** | ~3.5 KB | +| Steady | 5 | 1 | 1 | 1 | **~6 KB** | ~12 KB | +| Busy | 10 | 3 | 3 | 3 | **~14 KB** | ~28 KB | +| Heavy, lossy network (SDS-R churning) | 30 | 10 | 20 | 10 | **~45 KB** | ~95 KB | +| Pathological (resend window full, big repair caches) | 50 | 20 | 30 | 20 | **~75 KB** | ~155 KB | + +### Where the bytes go + +| Load profile | Dominant contributor | +|--------------|----------------------| +| Idle / light | Fixed overhead + outgoingBuffer | +| Steady / busy | outgoingBuffer (each entry ~1 KB+) | +| Heavy / lossy | **incomingRepairBuffer** — each KV entry caches a full serialized message for rebroadcast. This is the single biggest amplifier; 20 entries with 1 KB payloads ≈ 36 KB on their own. | + +### Implications + +1. **Typical write is small (1–30 KB).** Comfortably foldable into the + per-op KV write cost; the backend's blob-write cost is bounded. +2. **`IncomingRepairEntry.cachedMessage` is the size lever to watch.** + Under heavy SDS-R activity it dominates the snapshot. If snapshot size + becomes a bottleneck, the optimization is to drop the cache from the + snapshot and re-serialize from `messageHistory` on demand — at the cost + of more CPU and the corner case where the requested message has been + evicted from history between snapshot save and repair sweep firing. +3. **Heavy profile (~95 KB) at the predicted 6/s/ch save rate = ~570 KB/s + per channel.** A 10-channel heavy node is then ~5.7 MB/s of snapshot + churn — well within KV backend throughput, but worth a real bench + before declaring it OK. +4. **The 1 MB hard cap** suggested in §4 stays appropriate; pathological + profile at 1 KB payload is ~155 KB, leaving healthy headroom. + +--- + +## 8. Persistence failure policy — non-fatal, best-effort + +**Change from current branch.** The current implementation treats every +`rePersistenceError` as fatal: the protocol op returns `err()`, the caller +sees a failure, and normal SDS operation breaks even though the in-memory +state is fine. This is wrong for the snapshot model. + +**New policy.** +- In-memory state is the **source of truth** for protocol correctness. + Lamport clock, buffers, history, bloom filter — all live in + `ChannelContext` and are mutated under the lock before any persistence + call. SDS message processing never depends on disk state for correctness + within a session. +- Persistence is **best-effort durability**. A failed `saveChannelMeta` or + `updateHistory` does **not** abort the operation, does not return `err` + to the FFI caller, and does not corrupt protocol semantics. The next op + will issue its own snapshot — if that succeeds, on-disk state is + re-synchronised; if it also fails, the one after that tries again. +- Snapshot writes are **idempotent and self-contained.** Each + `saveChannelMeta` blob is the complete current `ChannelMeta`. A missed + write is fully recovered by any later successful write — no log of + deltas to replay, no compensating action needed. +- Bootstrap loss tolerance: if `loadChannel` fails or returns stale state + on restart, the manager starts from whatever it could load (possibly + empty). Peer traffic and SDS-R repair will re-populate it. This is the + expected behaviour of the bloom-rebuilt-from-history design extended to + the meta blob. + +**Implementation pattern.** At each save point: + +```nim +# end of wrapOutgoingMessage / unwrapReceivedMessage / etc. +if dirty: + let saveRes = await rm.persistence.saveChannelMeta(channelId, snapshot) + if saveRes.isErr: + warn "snapshot save failed; in-memory state unaffected, next op will retry", + channelId = channelId, detail = saveRes.error + # DO NOT return err; protocol op succeeded. +if appended.len > 0 or evicted.len > 0: + let histRes = await rm.persistence.updateHistory(channelId, + HistoryUpdate(append: appended, evict: evicted)) + if histRes.isErr: + warn "history update failed; in-memory log authoritative, next op will retry", + channelId = channelId, detail = histRes.error +return ok(serializedMessage) # protocol op succeeded regardless +``` + +**What still returns `err(rePersistenceError)`.** Only operations whose +**semantic intent** is durability: +- `removeChannel`, `resetReliabilityManager` → must confirm `dropChannel` + succeeded; otherwise the caller may assume disk is clean when it isn't. +- `getOrCreateChannel` on first bootstrap → if `loadChannel` errors (vs. + returns empty), surface it so the caller can decide between "start + fresh in memory" and "abort init". + +**Impact on §5 acceptance criteria.** Add: failure-injection test must +prove that `wrapOutgoingMessage`, `unwrapReceivedMessage`, +`markDependenciesMet`, `checkUnacknowledgedMessages`, `runRepairSweep` all +return `ok` under 100%-failing persistence, with correct in-memory +behaviour and a recovered on-disk state after persistence is restored. + +**Why this is safe.** Each snapshot is a full self-contained blob; +partial-write divergence (the original ANALYSIS §4 critical risk) is +already eliminated by the atomic-blob design. Once that's true, treating +persistence failure as fatal is pure downside — it propagates a +recoverable I/O hiccup into a user-visible protocol failure for no +correctness gain. + +--- + +## 9. What this plan deliberately does NOT do + +- Does not add transaction handles — the KV backend's batch primitive is sufficient +- Does not ship a reference backend — the schema-agnostic KV module in the sibling repo is the production backend +- Does not change the bloom filter persistence policy (still rebuilt from history) +- Does not introduce SDS-R repair extension changes +- Does not touch the FFI surface shape beyond construction of `Persistence` — the existing C API is unchanged +- Does not auto-migrate on-disk data from an older format (no production data exists yet; schemaVersion=1 starts clean) diff --git a/sds.nim b/sds.nim index 9935fff..d859954 100644 --- a/sds.nim +++ b/sds.nim @@ -35,7 +35,7 @@ proc isAcknowledged*( proc reviewAckStatus( rm: ReliabilityManager, msg: SdsMessage -) {.async: (raises: [CatchableError]).} = +): Future[Result[void, ReliabilityError]] {.async: (raises: []).} = try: var rbf: Option[RollingBloomFilter] if msg.bloomFilter.len > 0: @@ -59,7 +59,7 @@ proc reviewAckStatus( rbf = none[RollingBloomFilter]() if msg.channelId notin rm.channels: - return + return ok() let channel = rm.channels[msg.channelId] var toDelete: seq[(int, SdsMessageID)] = @[] @@ -75,12 +75,14 @@ proc reviewAckStatus( inc i for k in countdown(toDelete.high, 0): - let (idx, ackedId) = toDelete[k] - channel.outgoingBuffer.delete(idx) - await rm.persistence.removeOutgoing(msg.channelId, ackedId) - except CatchableError as e: + # Phase 2B: in-memory deletion only; the caller's op-end trySaveMeta + # captures the new outgoingBuffer state. The msgId half of the + # tuple is unused now that there is no per-row persistence call. + channel.outgoingBuffer.delete(toDelete[k][0]) + ok() + except CatchableError: error "Failed to review ack status", msg = getCurrentExceptionMsg() - raise e + err(ReliabilityError.reInternalError) proc wrapOutgoingMessage*( rm: ReliabilityManager, @@ -98,8 +100,10 @@ proc wrapOutgoingMessage*( await rm.lock.acquire() try: try: - let channel = await rm.getOrCreateChannel(channelId) - await rm.updateLamportTimestamp(getTime().toUnix, channelId) + let channel = (await rm.getOrCreateChannel(channelId)).valueOr: + return err(error) + (await rm.updateLamportTimestamp(getTime().toUnix, channelId)).isOkOr: + return err(error) let bfResult = serializeBloomFilter(channel.bloomFilter.filter) if bfResult.isErr: @@ -125,13 +129,16 @@ proc wrapOutgoingMessage*( expiredKeys.add(eligible[i][0]) for key in expiredKeys: channel.outgoingRepairBuffer.del(key) - await rm.persistence.removeOutgoingRepair(channelId, key) + # Phase 2B: in-memory deletion only; op-end trySaveMeta covers it. + let causalHistory = ( + await rm.getRecentHistoryEntries(rm.config.maxCausalHistory, channelId) + ).valueOr: + return err(error) let msg = SdsMessage.init( messageId = messageId, lamportTimestamp = channel.lamportTimestamp, - causalHistory = - await rm.getRecentHistoryEntries(rm.config.maxCausalHistory, channelId), + causalHistory = causalHistory, channelId = channelId, content = message, bloomFilter = bfResult.get(), @@ -143,13 +150,21 @@ proc wrapOutgoingMessage*( message = msg, sendTime = getTime(), resendAttempts = 0 ) channel.outgoingBuffer.add(unackMsg) - await rm.persistence.saveOutgoing(channelId, unackMsg) + # Phase 2B: in-memory append only; op-end trySaveMeta covers it. channel.bloomFilter.add(msg.messageId) - # The full SdsMessage carries senderId and content, so a single - # addToHistory replaces the old triple-write to messageHistory, - # messageCache, and messageSenders. - await rm.addToHistory(msg, channelId) + # addToHistory mutates in-memory state and queues the append/evict + # on the channel's pending-history queue; persistence happens + # ONCE at op end via tryUpdateHistory. + (await rm.addToHistory(msg, channelId)).isOkOr: + return err(error) + + # Op end: one meta snapshot + one history flush, paired under the + # lock per the Persistence atomicity contract. tryUpdateHistory + # flushes the channel's pending queue (this op's mutations PLUS + # any leftovers from a prior failed write — R2 retry). + await rm.trySaveMeta(channelId, channel) + await rm.tryUpdateHistory(channelId) return serializeMessage(msg) except CatchableError: @@ -165,17 +180,22 @@ proc wrapOutgoingMessage*( proc processIncomingBuffer( rm: ReliabilityManager, channelId: SdsChannelID -) {.async: (raises: [CatchableError]).} = +): Future[Result[void, ReliabilityError]] {.async: (raises: []).} = + ## Cascade-deliver any buffered messages whose dependencies are now met. + ## Each `addToHistory` call queues its append/evict on the channel's + ## pending-history queue; the *caller* (a public protocol op) issues + ## ONE `tryUpdateHistory` at op end to flush the whole cascade in a + ## single round-trip. try: await rm.lock.acquire() try: if channelId notin rm.channels: error "Channel does not exist", channelId = channelId - return + return ok() let channel = rm.channels[channelId] if channel.incomingBuffer.len == 0: - return + return ok() var processed = initHashSet[SdsMessageID]() var readyToProcess = newSeq[SdsMessageID]() @@ -190,7 +210,8 @@ proc processIncomingBuffer( continue if msgId in channel.incomingBuffer: - await rm.addToHistory(channel.incomingBuffer[msgId].message, channelId) + (await rm.addToHistory(channel.incomingBuffer[msgId].message, channelId)).isOkOr: + return err(error) if not rm.onMessageReady.isNil(): {.cast(raises: []).}: rm.onMessageReady(msgId, channelId) @@ -199,22 +220,24 @@ proc processIncomingBuffer( for remainingId, entry in channel.incomingBuffer: if remainingId notin processed: if msgId in entry.missingDeps: + # Phase 2B: in-memory dep-set shrink only; the parent op + # (unwrap / markDeps) issues a single trySaveMeta at its + # end that captures the final incomingBuffer state. channel.incomingBuffer[remainingId].missingDeps.excl(msgId) - await rm.persistence.saveIncoming( - channelId, channel.incomingBuffer[remainingId] - ) if channel.incomingBuffer[remainingId].missingDeps.len == 0: readyToProcess.add(remainingId) for msgId in processed: + # Phase 2B: in-memory deletion only; parent op's trySaveMeta covers + # the drained buffer state. channel.incomingBuffer.del(msgId) - await rm.persistence.removeIncoming(channelId, msgId) + ok() finally: rm.lock.release() - except CatchableError as e: + except CatchableError: error "Failed to process incoming buffer", channelId = channelId, msg = getCurrentExceptionMsg() - raise e + err(ReliabilityError.reInternalError) proc unwrapReceivedMessage*( rm: ReliabilityManager, message: seq[byte] @@ -232,22 +255,29 @@ proc unwrapReceivedMessage*( let msg = deserializeMessage(message).valueOr: return err(ReliabilityError.reDeserializationError) - let channel = await rm.getOrCreateChannel(channelId) + let channel = (await rm.getOrCreateChannel(channelId)).valueOr: + return err(error) # SDS-R: opportunistic repair-buffer cleanup — applies to duplicates too, # so rebroadcasts cancel redundant responses on peers that already have the message. + # Phase 2B: in-memory deletes only; op-end trySaveMeta covers it. channel.outgoingRepairBuffer.del(msg.messageId) - await rm.persistence.removeOutgoingRepair(channelId, msg.messageId) channel.incomingRepairBuffer.del(msg.messageId) - await rm.persistence.removeIncomingRepair(channelId, msg.messageId) if msg.messageId in channel.messageHistory: + # Duplicate: no history change. Still flush the meta (repair-buffer + # dels above are mutations) and the history queue (any pending + # entries from a prior failed write get retried here too). + await rm.trySaveMeta(channelId, channel) + await rm.tryUpdateHistory(channelId) return ok((msg.content, @[], channelId)) channel.bloomFilter.add(msg.messageId) - await rm.updateLamportTimestamp(msg.lamportTimestamp, channelId) - await rm.reviewAckStatus(msg) + (await rm.updateLamportTimestamp(msg.lamportTimestamp, channelId)).isOkOr: + return err(error) + (await rm.reviewAckStatus(msg)).isOkOr: + return err(error) # SDS-R: process incoming repair requests from this message. We can only # answer for messages we have actually delivered (i.e. that live in @@ -255,9 +285,9 @@ proc unwrapReceivedMessage*( # to confidently rebroadcast. let now = getTime() for repairEntry in msg.repairRequest: - # Remove from our own outgoing repair buffer (someone else is also requesting) + # Remove from our own outgoing repair buffer (someone else is also requesting). + # Phase 2B: in-memory delete only; op-end trySaveMeta covers it. channel.outgoingRepairBuffer.del(repairEntry.messageId) - await rm.persistence.removeOutgoingRepair(channelId, repairEntry.messageId) if repairEntry.messageId in channel.messageHistory and rm.participantId.len > 0 and repairEntry.senderId.len > 0: if isInResponseGroup( @@ -276,10 +306,8 @@ proc unwrapReceivedMessage*( cachedMessage: serialized.get(), minTimeRepairResp: now + tResp, ) + # Phase 2B: in-memory insert only; op-end trySaveMeta covers it. channel.incomingRepairBuffer[repairEntry.messageId] = inEntry - await rm.persistence.saveIncomingRepair( - channelId, repairEntry.messageId, inEntry - ) var missingDeps = rm.checkDependencies(msg.causalHistory, channelId) @@ -292,21 +320,20 @@ proc unwrapReceivedMessage*( if depsInBuffer: let entry = IncomingMessage.init(message = msg, missingDeps = initHashSet[SdsMessageID]()) + # Phase 2B: in-memory insert only; op-end trySaveMeta covers it. channel.incomingBuffer[msg.messageId] = entry - await rm.persistence.saveIncoming(channelId, entry) else: - await rm.addToHistory(msg, channelId) + (await rm.addToHistory(msg, channelId)).isOkOr: + return err(error) # Unblock any buffered messages that were waiting on this one. - var unblocked: seq[SdsMessageID] = @[] for pendingId, entry in channel.incomingBuffer: if msg.messageId in entry.missingDeps: channel.incomingBuffer[pendingId].missingDeps.excl(msg.messageId) - unblocked.add(pendingId) - for pendingId in unblocked: - await rm.persistence.saveIncoming( - channelId, channel.incomingBuffer[pendingId] - ) - await rm.processIncomingBuffer(channelId) + # Cascade — addToHistory calls within processIncomingBuffer queue + # their entries on the channel's pending-history queue, flushed + # by the single op-end tryUpdateHistory below. + (await rm.processIncomingBuffer(channelId)).isOkOr: + return err(error) if not rm.onMessageReady.isNil(): {.cast(raises: []).}: rm.onMessageReady(msg.messageId, channelId) @@ -314,8 +341,8 @@ proc unwrapReceivedMessage*( let entry = IncomingMessage.init( message = msg, missingDeps = missingDeps.getMessageIds().toHashSet() ) + # Phase 2B: in-memory insert only; op-end trySaveMeta covers it. channel.incomingBuffer[msg.messageId] = entry - await rm.persistence.saveIncoming(channelId, entry) if not rm.onMissingDependencies.isNil(): {.cast(raises: []).}: rm.onMissingDependencies(msg.messageId, missingDeps, channelId) @@ -330,8 +357,14 @@ proc unwrapReceivedMessage*( ) let outEntry = OutgoingRepairEntry(outHistEntry: dep, minTimeRepairReq: now + tReq) + # Phase 2B: in-memory insert only; op-end trySaveMeta covers it. channel.outgoingRepairBuffer[dep.messageId] = outEntry - await rm.persistence.saveOutgoingRepair(channelId, dep.messageId, outEntry) + + # Op end: one meta snapshot + one history flush, paired under the + # lock. The flush is the single point where any cascade-driven + # appends/evicts hit disk (R2 queue absorbs failures). + await rm.trySaveMeta(channelId, channel) + await rm.tryUpdateHistory(channelId) return ok((msg.content, missingDeps, channelId)) except CatchableError: @@ -352,21 +385,25 @@ proc markDependenciesMet*( if not channel.bloomFilter.contains(msgId): channel.bloomFilter.add(msgId) - var unblocked: seq[SdsMessageID] = @[] + # Phase 2B: in-memory dep-set shrink + repair-buffer dels only; the + # op-end trySaveMeta below covers all mutations atomically. for pendingId, entry in channel.incomingBuffer: if msgId in entry.missingDeps: channel.incomingBuffer[pendingId].missingDeps.excl(msgId) - unblocked.add(pendingId) - for pendingId in unblocked: - await rm.persistence.saveIncoming(channelId, channel.incomingBuffer[pendingId]) - # SDS-R: clear from repair buffers (dependency now met) + # SDS-R: clear from repair buffers (dependency now met). channel.outgoingRepairBuffer.del(msgId) - await rm.persistence.removeOutgoingRepair(channelId, msgId) channel.incomingRepairBuffer.del(msgId) - await rm.persistence.removeIncomingRepair(channelId, msgId) - await rm.processIncomingBuffer(channelId) + (await rm.processIncomingBuffer(channelId)).isOkOr: + return err(error) + + # Op end: one meta snapshot + one history flush, paired under the lock. + # The flush covers any cascade-driven appends/evicts queued during + # processIncomingBuffer. + if channelId in rm.channels: + await rm.trySaveMeta(channelId, rm.channels[channelId]) + await rm.tryUpdateHistory(channelId) return ok() except CatchableError: error "Failed to mark dependencies as met", @@ -399,17 +436,24 @@ proc setCallbacks*( proc checkUnacknowledgedMessages( rm: ReliabilityManager, channelId: SdsChannelID -) {.async: (raises: []).} = +): Future[Result[void, ReliabilityError]] {.async: (raises: []).} = + ## Persistence model (PLAN_SNAPSHOT_PERSISTENCE.md phase 2.2): per-entry + ## saveOutgoing / removeOutgoing calls are replaced by a single + ## `trySaveMeta` at the end of the pass, *only* if the buffer actually + ## changed (resend-attempt incremented, or entry expired). Failure is + ## logged but does not abort the pass — next tick reissues a fresh + ## snapshot. try: await rm.lock.acquire() try: if channelId notin rm.channels: error "Channel does not exist", channelId = channelId - return + return ok() let channel = rm.channels[channelId] let now = getTime() var newOutgoingBuffer: seq[UnacknowledgedMessage] = @[] + var dirty = false for unackMsg in channel.outgoingBuffer: let elapsed = now - unackMsg.sendTime @@ -419,27 +463,34 @@ proc checkUnacknowledgedMessages( updatedMsg.resendAttempts += 1 updatedMsg.sendTime = now newOutgoingBuffer.add(updatedMsg) - await rm.persistence.saveOutgoing(channelId, updatedMsg) + dirty = true else: if not rm.onMessageSent.isNil(): {.cast(raises: []).}: rm.onMessageSent(unackMsg.message.messageId, channelId) - await rm.persistence.removeOutgoing(channelId, unackMsg.message.messageId) + dirty = true # entry dropped from newOutgoingBuffer else: newOutgoingBuffer.add(unackMsg) channel.outgoingBuffer = newOutgoingBuffer + if dirty: + await rm.trySaveMeta(channelId, channel) + ok() finally: rm.lock.release() except CatchableError: error "Failed to check unacknowledged messages", channelId = channelId, msg = getCurrentExceptionMsg() + err(ReliabilityError.reInternalError) proc periodicBufferSweep(rm: ReliabilityManager) {.async: (raises: [CancelledError]).} = while true: try: for channelId, channel in rm.channels: - await rm.checkUnacknowledgedMessages(channelId) + # Background maintenance has no caller to return to: a persistence + # error is logged (by reliabilityErr) and the sweep continues; the + # next tick retries. + discard await rm.checkUnacknowledgedMessages(channelId) await rm.cleanBloomFilter(channelId) except CatchableError: error "Error in periodic buffer sweep", msg = getCurrentExceptionMsg() @@ -455,18 +506,28 @@ proc periodicSyncMessage(rm: ReliabilityManager) {.async: (raises: [CancelledErr error "Error in periodic sync", msg = getCurrentExceptionMsg() await sleepAsync(chronos.seconds(rm.config.syncMessageInterval.inSeconds)) -proc runRepairSweep*(rm: ReliabilityManager) {.async: (raises: []).} = +proc runRepairSweep*( + rm: ReliabilityManager +): Future[Result[void, ReliabilityError]] {.async: (raises: []).} = ## SDS-R: Runs a single pass of the repair sweep. ## - Incoming: fires onRepairReady for expired T_resp entries and removes them ## - Outgoing: drops entries past T_max window ## Exposed so it can be driven directly in tests; also invoked by periodicRepairSweep. ## Acquires rm.lock so the repair buffers cannot be observed mid-mutation by ## a concurrent wrapOutgoingMessage / unwrapReceivedMessage on another task. + ## + ## Persistence model (PLAN_SNAPSHOT_PERSISTENCE.md phase 2.1): per-entry + ## removeIncomingRepair / removeOutgoingRepair calls are replaced by a + ## single `trySaveMeta` per *dirty* channel at the end of that channel's + ## sweep. A persistence failure is logged but DOES NOT abort the sweep — + ## in-memory state is the source of truth and the next op (or sweep tick) + ## will issue a fresh self-contained snapshot. try: await rm.lock.acquire() try: let now = getTime() for channelId, channel in rm.channels: + var dirty = false try: # Check incoming repair buffer for expired T_resp (time to rebroadcast) var toRebroadcast: seq[SdsMessageID] = @[] @@ -477,7 +538,7 @@ proc runRepairSweep*(rm: ReliabilityManager) {.async: (raises: []).} = for msgId in toRebroadcast: let entry = channel.incomingRepairBuffer[msgId] channel.incomingRepairBuffer.del(msgId) - await rm.persistence.removeIncomingRepair(channelId, msgId) + dirty = true if not rm.onRepairReady.isNil(): {.cast(raises: []).}: rm.onRepairReady(entry.cachedMessage, channelId) @@ -490,20 +551,29 @@ proc runRepairSweep*(rm: ReliabilityManager) {.async: (raises: []).} = toRemove.add(msgId) for msgId in toRemove: channel.outgoingRepairBuffer.del(msgId) - await rm.persistence.removeOutgoingRepair(channelId, msgId) + dirty = true except CatchableError: error "Error in repair sweep for channel", channelId = channelId, msg = getCurrentExceptionMsg() + # Snapshot only if this channel actually mutated. Skipping the call + # when clean honours the dirty-flag guard in ANALYSIS_SNAPSHOT_SAVE_POINTS + # — otherwise an idle node still issues 0.2 saves/s/channel just + # because the periodic sweep ran. + if dirty: + await rm.trySaveMeta(channelId, channel) + ok() finally: rm.lock.release() except CatchableError: error "Error in repair sweep", msg = getCurrentExceptionMsg() + err(ReliabilityError.reInternalError) proc periodicRepairSweep(rm: ReliabilityManager) {.async: (raises: [CancelledError]).} = ## SDS-R: Periodically checks repair buffers for expired entries. while true: try: - await rm.runRepairSweep() + # Background maintenance: log a failed pass and retry next tick. + discard await rm.runRepairSweep() except CatchableError: error "Error in periodic repair sweep", msg = getCurrentExceptionMsg() await sleepAsync(chronos.milliseconds(rm.config.repairSweepInterval.inMilliseconds)) @@ -526,13 +596,16 @@ proc resetReliabilityManager*( try: try: for channelId, channel in rm.channels: - await rm.dropChannelFromPersistence(channelId) + (await rm.dropChannelFromPersistence(channelId)).isOkOr: + return err(error) channel.lamportTimestamp = 0 channel.messageHistory.clear() channel.outgoingBuffer.setLen(0) channel.incomingBuffer.clear() channel.outgoingRepairBuffer.clear() channel.incomingRepairBuffer.clear() + channel.pendingHistoryAppends.clear() + channel.pendingHistoryEvicts.clear() channel.bloomFilter = RollingBloomFilter.init( rm.config.bloomFilterCapacity, rm.config.bloomFilterErrorRate ) diff --git a/sds.nimble b/sds.nimble index 6139576..7c5727a 100644 --- a/sds.nimble +++ b/sds.nimble @@ -1,7 +1,7 @@ import strutils, os # Package -version = "0.2.4" +version = "0.3.0" author = "Logos Messaging Team" description = "E2E Scalable Data Sync API" license = "MIT" @@ -68,6 +68,7 @@ task test, "Run the test suite": exec "nim c -r --outdir:build tests/test_bloom.nim" exec "nim c -r --outdir:build tests/test_reliability.nim" exec "nim c -r --outdir:build tests/test_persistence.nim" + exec "nim c -r --outdir:build tests/test_snapshot_codec.nim" task libsdsDynamicWindows, "Generate bindings": let outLibNameAndExt = "libsds.dll" diff --git a/sds/sds_utils.nim b/sds/sds_utils.nim index 105102f..6459a4b 100644 --- a/sds/sds_utils.nim +++ b/sds/sds_utils.nim @@ -1,4 +1,4 @@ -import std/[times, tables, sequtils, hashes] +import std/[times, tables, sequtils, sets, hashes] import chronos, chronicles, results import ./rolling_bloom_filter import @@ -15,13 +15,158 @@ export proc defaultConfig*(): ReliabilityConfig = return ReliabilityConfig.init() -proc dropChannelFromPersistence*( +proc reliabilityErr*(detail: string): ReliabilityError {.gcsafe, raises: [].} = + ## Maps a backend-supplied persistence error string onto the + ## `rePersistenceError` enum value. The enum carries no payload, so the + ## original detail is logged here — this is the single point where a + ## persistence failure is recorded, while the enum value travels up the + ## `Result` chain to the public API caller, who decides what to do. + ## + ## With the snapshot-based Persistence interface, most protocol ops no + ## longer propagate persistence errors at all — they log and continue + ## (see PLAN_SNAPSHOT_PERSISTENCE.md §8). This helper is still used by + ## the durability-intent ops (removeChannel, resetReliabilityManager, + ## getOrCreateChannel) that retain err-on-failure semantics. + warn "persistence operation failed", detail = detail + ReliabilityError.rePersistenceError + +proc snapshotMeta*(channel: ChannelContext): ChannelMeta {.gcsafe, raises: [].} = + ## Captures the current in-memory state of a `ChannelContext` as a + ## `ChannelMeta` blob, suitable for `Persistence.saveChannelMeta`. + ## + ## The in-memory shape uses `Table`-keyed buffers for fast lookup; + ## `ChannelMeta` flattens them to `seq`s for stable wire serialization + ## (see PLAN §6). The bloom filter and message history are intentionally + ## excluded — the former is rebuilt from the latter on bootstrap, and + ## the latter is persisted separately via `updateHistory`. + result = ChannelMeta.init() + result.lamportTimestamp = channel.lamportTimestamp + for u in channel.outgoingBuffer: + result.outgoingBuffer.add(u) + for _, m in channel.incomingBuffer.pairs: + result.incomingBuffer.add(m) + for id, e in channel.outgoingRepairBuffer.pairs: + result.outgoingRepairBuffer.add(OutgoingRepairKV(messageId: id, entry: e)) + for id, e in channel.incomingRepairBuffer.pairs: + result.incomingRepairBuffer.add(IncomingRepairKV(messageId: id, entry: e)) + +proc trySaveMeta*( + rm: ReliabilityManager, channelId: SdsChannelID, channel: ChannelContext +) {.async: (raises: []).} = + ## Best-effort meta snapshot save. Per PLAN §8 the protocol op does NOT + ## abort on persistence failure — in-memory state is the source of truth + ## and the next op's snapshot will re-synchronise on-disk state. + ## + ## This helper is the single point where snapshot-save failures are + ## logged; callers do not need to handle the Result. + let res = await rm.persistence.saveChannelMeta(channelId, snapshotMeta(channel)) + if res.isErr: + warn "snapshot save failed; in-memory state authoritative, next op will retry", + channelId = channelId, detail = res.error + +proc queueHistoryAppend*(channel: ChannelContext, msgId: SdsMessageID) = + ## Push an append onto the pending history queue. Only the id is + ## stored — the full SdsMessage is looked up from `messageHistory` at + ## flush time (invariant: every queued id is present in messageHistory). + ## + ## Merge rule: **latest operation wins.** Cancels any pending evict for + ## the same id, then adds. Handles the evict-then-re-add sequence + ## correctly (e.g. SDS-R repair re-delivers a previously-evicted + ## message while the backend is unreachable). + channel.pendingHistoryEvicts.excl(msgId) + channel.pendingHistoryAppends.incl(msgId) + +proc queueHistoryEvict*(channel: ChannelContext, msgId: SdsMessageID) = + ## Push an evict onto the pending history queue. Merge rule symmetric + ## with `queueHistoryAppend`: cancels any pending append for the same + ## id (the just-evicted message no longer needs to be persisted as an + ## addition), then adds to the evict set. + channel.pendingHistoryAppends.excl(msgId) + channel.pendingHistoryEvicts.incl(msgId) + +proc tryUpdateHistory*( rm: ReliabilityManager, channelId: SdsChannelID ) {.async: (raises: []).} = + ## Flush the channel's pending history queue to disk. + ## + ## The pending queue (`channel.pendingHistoryAppends` / + ## `pendingHistoryEvicts`) plays a DUAL role — and that's deliberate: + ## 1. **Per-op accumulator.** Every `addToHistory` call pushes its + ## mutation into this queue but does NOT persist. A protocol op + ## that invokes `addToHistory` N times (e.g. a + ## `processIncomingBuffer` cascade) leaves N entries queued and + ## issues exactly ONE `tryUpdateHistory` at op end — one + ## round-trip per op regardless of cascade depth. This fixes PR + ## #72 review comments #2 and #3. + ## 2. **R2 retry queue.** If the flush fails, the queue is NOT + ## cleared. The next op's `addToHistory` calls add to it; the + ## next op's `tryUpdateHistory` retries the merged batch. This + ## fixes PR #72 review comment #1 (delta loss). + ## + ## Both roles share the same data structure because they want the same + ## semantics: "merge everything pending into one batch and try to + ## flush". Failure is non-fatal at the FFI boundary (PLAN §8) — the + ## in-memory state is the source of truth. + ## + ## Callers MUST invoke this once at the end of every protocol op (even + ## when this op had no history changes) — otherwise a previously-failed + ## batch could sit on the queue indefinitely. + var channel: ChannelContext + try: + if channelId notin rm.channels: + return + channel = rm.channels[channelId] + except KeyError: + return # checked `in` above; unreachable, but tables can raise per spec + + if channel.pendingHistoryAppends.len == 0 and + channel.pendingHistoryEvicts.len == 0: + return # nothing to flush — no round-trip cost + + var batch = HistoryUpdate.init() + # Look up each queued id in messageHistory (source of truth). The + # invariant on pendingHistoryAppends guarantees the id is present; + # the defensive check below logs any violation rather than crashing. + for id in channel.pendingHistoryAppends: + try: + if id in channel.messageHistory: + batch.append.add(channel.messageHistory[id]) + else: + warn "queued append id missing from messageHistory; invariant violated, skipping", + channelId = channelId, msgId = id + except KeyError: + discard # unreachable — `in` was true + for id in channel.pendingHistoryEvicts: + batch.evict.add(id) + + let res = await rm.persistence.updateHistory(channelId, batch) + if res.isOk: + channel.pendingHistoryAppends.clear() + channel.pendingHistoryEvicts.clear() + else: + warn "history update failed; queued for retry on next op", + channelId = channelId, + pendingAppends = channel.pendingHistoryAppends.len, + pendingEvicts = channel.pendingHistoryEvicts.len, + detail = res.error + if channel.pendingHistoryAppends.len > rm.config.maxMessageHistory: + warn "pending history queue exceeds maxMessageHistory; backend may be stuck", + channelId = channelId, + pendingAppends = channel.pendingHistoryAppends.len + +proc dropChannelFromPersistence*( + rm: ReliabilityManager, channelId: SdsChannelID +): Future[Result[void, ReliabilityError]] {.async: (raises: []).} = ## Wipes all persisted state for a channel via a single backend call. ## Called by removeChannel / resetReliabilityManager before they clear ## in-memory state. Backend executes the wipe in one transaction. - await rm.persistence.dropChannel(channelId) + ## + ## Phase 2D: uses `persistenceV2.dropChannel`. This op DOES propagate + ## err on failure (durability is the semantic intent — the caller asked + ## us to confirm a disk wipe; we cannot silently lie). See PLAN §8. + (await rm.persistence.dropChannel(channelId)).isOkOr: + return err(reliabilityErr(error)) + ok() proc cleanup*(rm: ReliabilityManager) {.async: (raises: []).} = ## Releases in-memory state. Does NOT wipe persistence — the manager may be @@ -47,6 +192,8 @@ proc cleanup*(rm: ReliabilityManager) {.async: (raises: []).} = channel.messageHistory.clear() channel.outgoingRepairBuffer.clear() channel.incomingRepairBuffer.clear() + channel.pendingHistoryAppends.clear() + channel.pendingHistoryEvicts.clear() rm.channels.clear() finally: rm.lock.release() @@ -69,38 +216,52 @@ proc cleanBloomFilter*( proc addToHistory*( rm: ReliabilityManager, msg: SdsMessage, channelId: SdsChannelID -) {.async: (raises: []).} = - ## Inserts a delivered message into the channel's history map and evicts the - ## eldest entries when the bound is exceeded. The full SdsMessage is kept so - ## senderId is available for downstream causal-history population and the - ## bytes can be re-serialized on demand to answer SDS-R repair requests. +): Future[Result[void, ReliabilityError]] {.async: (raises: []).} = + ## Inserts a delivered message into the channel's history map, evicts + ## the eldest entries past `maxMessageHistory`, and queues the resulting + ## append+evict on the channel's pending-history queue. Does NOT issue + ## a persistence call — the caller's op-end `tryUpdateHistory` flushes + ## the queue in one round-trip. + ## + ## A cascade of N unblocked messages (e.g. `processIncomingBuffer`) + ## therefore leaves N entries queued and triggers ONE persistence call + ## at op end, not N. Fixes PR #72 review #2/#3. + ## + ## Direct callers (tests, ad-hoc) that want the disk write to land + ## immediately should follow this with `await rm.tryUpdateHistory(channelId)`. try: if channelId in rm.channels: let channel = rm.channels[channelId] channel.messageHistory[msg.messageId] = msg - await rm.persistence.appendLogEntry(channelId, msg) + queueHistoryAppend(channel, msg.messageId) while channel.messageHistory.len > rm.config.maxMessageHistory: var firstKey: SdsMessageID for k in channel.messageHistory.keys: firstKey = k break channel.messageHistory.del(firstKey) - await rm.persistence.removeLogEntry(channelId, firstKey) + queueHistoryEvict(channel, firstKey) + ok() except CatchableError: error "Failed to add to history", channelId = channelId, msgId = msg.messageId, error = getCurrentExceptionMsg() + err(ReliabilityError.reInternalError) proc updateLamportTimestamp*( rm: ReliabilityManager, msgTs: int64, channelId: SdsChannelID -) {.async: (raises: []).} = +): Future[Result[void, ReliabilityError]] {.async: (raises: []).} = + ## Pure in-memory update (phase 2B). The new lamport value is captured + ## by the op-end `trySaveMeta` issued by the calling protocol op; no + ## per-mutation persistence call here. try: if channelId in rm.channels: let channel = rm.channels[channelId] channel.lamportTimestamp = max(msgTs, channel.lamportTimestamp) + 1 - await rm.persistence.saveLamport(channelId, channel.lamportTimestamp) + ok() except CatchableError: error "Failed to update lamport timestamp", channelId = channelId, msgTs = msgTs, error = getCurrentExceptionMsg() + err(ReliabilityError.reInternalError) proc newHistoryEntry*( messageId: SdsMessageID, retrievalHint: seq[byte] = @[] @@ -167,7 +328,7 @@ proc isInResponseGroup*( proc getRecentHistoryEntries*( rm: ReliabilityManager, n: int, channelId: SdsChannelID -): Future[seq[HistoryEntry]] {.async: (raises: []).} = +): Future[Result[seq[HistoryEntry], ReliabilityError]] {.async: (raises: []).} = ## Get recent history entries for sending in causal history. ## Populates retrieval hints and senderId (SDS-R) for each entry. try: @@ -184,16 +345,24 @@ proc getRecentHistoryEntries*( {.cast(raises: []).}: entry.retrievalHint = rm.onRetrievalHint(msgId) if entry.retrievalHint.len > 0: - await rm.persistence.setRetrievalHint(msgId, entry.retrievalHint) + # Phase 2B: best-effort hint persistence via V2. Non-fatal — + # hints are an optimisation; a missing hint just means the + # peer falls back to slower retrieval. + let hintRes = await rm.persistence.setRetrievalHint( + msgId, entry.retrievalHint + ) + if hintRes.isErr: + warn "retrieval hint save failed; continuing", + msgId = msgId, detail = hintRes.error entry.senderId = channel.messageHistory[msgId].senderId entries.add(entry) - return entries + ok(entries) else: - return @[] + ok(newSeq[HistoryEntry]()) except CatchableError: error "Failed to get recent history entries", channelId = channelId, n = n, error = getCurrentExceptionMsg() - return @[] + err(ReliabilityError.reInternalError) proc checkDependencies*( rm: ReliabilityManager, deps: seq[HistoryEntry], channelId: SdsChannelID @@ -269,11 +438,16 @@ proc getIncomingBuffer*( proc getOrCreateChannel*( rm: ReliabilityManager, channelId: SdsChannelID -): Future[ChannelContext] {.async: (raises: [CatchableError]).} = +): Future[Result[ChannelContext, ReliabilityError]] {.async: (raises: []).} = ## Returns the channel context, creating and bootstrapping it from the ## persistence backend if it does not yet exist in memory. The bloom filter ## is rebuilt deterministically from the loaded message history rather than ## persisted directly. Caller is expected to hold rm.lock. + ## + ## Phase 2C: bootstrap via `persistenceV2.loadChannel`. Bootstrap DOES + ## propagate err on load failure — the caller asked us to materialise a + ## channel and we cannot do that without knowing the prior state. See + ## PLAN §8. try: if channelId notin rm.channels: let channel = ChannelContext.new( @@ -281,25 +455,28 @@ proc getOrCreateChannel*( rm.config.bloomFilterCapacity, rm.config.bloomFilterErrorRate ) ) - let snapshot = await rm.persistence.loadAllForChannel(channelId) - channel.lamportTimestamp = snapshot.lamportTimestamp - for msg in snapshot.messageHistory: + let data = (await rm.persistence.loadChannel(channelId)).valueOr: + return err(reliabilityErr(error)) + channel.lamportTimestamp = data.meta.lamportTimestamp + # Backend contract: messageHistory MUST be ordered oldest-first. + # If a backend violates this, FIFO eviction breaks across restarts. + for msg in data.messageHistory: channel.messageHistory[msg.messageId] = msg channel.bloomFilter.add(msg.messageId) - for unack in snapshot.outgoingBuffer: + for unack in data.meta.outgoingBuffer: channel.outgoingBuffer.add(unack) - for incoming in snapshot.incomingBuffer: + for incoming in data.meta.incomingBuffer: channel.incomingBuffer[incoming.message.messageId] = incoming - for (msgId, entry) in snapshot.outgoingRepairBuffer: - channel.outgoingRepairBuffer[msgId] = entry - for (msgId, entry) in snapshot.incomingRepairBuffer: - channel.incomingRepairBuffer[msgId] = entry + for kv in data.meta.outgoingRepairBuffer: + channel.outgoingRepairBuffer[kv.messageId] = kv.entry + for kv in data.meta.incomingRepairBuffer: + channel.incomingRepairBuffer[kv.messageId] = kv.entry rm.channels[channelId] = channel - return rm.channels[channelId] - except CatchableError as e: + ok(rm.channels[channelId]) + except CatchableError: error "Failed to get or create channel", channelId = channelId, error = getCurrentExceptionMsg() - raise e + err(ReliabilityError.reInternalError) proc ensureChannel*( rm: ReliabilityManager, channelId: SdsChannelID @@ -307,13 +484,9 @@ proc ensureChannel*( try: await rm.lock.acquire() try: - try: - discard await rm.getOrCreateChannel(channelId) - return ok() - except CatchableError: - error "Failed to ensure channel", - channelId = channelId, msg = getCurrentExceptionMsg() - return err(ReliabilityError.reInternalError) + (await rm.getOrCreateChannel(channelId)).isOkOr: + return err(error) + return ok() finally: rm.lock.release() except CatchableError: @@ -330,12 +503,15 @@ proc removeChannel*( try: if channelId in rm.channels: let channel = rm.channels[channelId] - await rm.dropChannelFromPersistence(channelId) + (await rm.dropChannelFromPersistence(channelId)).isOkOr: + return err(error) channel.outgoingBuffer.setLen(0) channel.incomingBuffer.clear() channel.messageHistory.clear() channel.outgoingRepairBuffer.clear() channel.incomingRepairBuffer.clear() + channel.pendingHistoryAppends.clear() + channel.pendingHistoryEvicts.clear() rm.channels.del(channelId) return ok() except CatchableError: diff --git a/sds/snapshot_codec.nim b/sds/snapshot_codec.nim new file mode 100644 index 0000000..7b626c6 --- /dev/null +++ b/sds/snapshot_codec.nim @@ -0,0 +1,326 @@ +## Storage encoding for the snapshot persistence types. +## +## This is the codec nim-sds runs on its side of the persistence boundary +## to turn a `ChannelMeta` (or `ChannelData`, or `HistoryUpdate`) into the +## opaque `seq[byte]` blob the KV persistence backend stores. The KV +## backend treats the blob as fully opaque. See PLAN_SNAPSHOT_PERSISTENCE.md +## §1.5 for why this codec exists at all and §6 for the choice of protobuf. +## +## This is NOT the SDS network wire format — that lives in `sds/protobuf.nim` +## and is unchanged. Encoders for `SdsMessage` and `HistoryEntry` are reused +## from there to avoid maintaining two codecs for the same shape. + +{.push raises: [].} + +import std/[sets, times] +import libp2p/protobuf/minprotobuf +import ./types/[ + channel_meta, history_update, sds_message, sds_message_id, history_entry, + unacknowledged_message, incoming_message, repair_entry, reliability_error, +] +import ./protobufutil +import ./protobuf as wire + +export channel_meta, history_update + +# --------------------------------------------------------------------------- +# Time <-> int64 unix milliseconds +# --------------------------------------------------------------------------- +# The protocol uses `getTime()` (wall clock). For wire stability we encode +# as unix milliseconds in int64 (zigzag not needed — pre-1970 values do not +# occur in practice). Sub-millisecond precision is intentionally dropped: +# the protocol's repair backoff windows are seconds-scale. + +proc toUnixMs(t: Time): int64 = + t.toUnix * 1000'i64 + int64(t.nanosecond div 1_000_000) + +proc fromUnixMs(ms: int64): Time = + let secs = ms div 1000 + let nanos = (ms mod 1000).int * 1_000_000 + initTime(secs, nanos) + +# --------------------------------------------------------------------------- +# UnacknowledgedMessage +# --------------------------------------------------------------------------- + +proc encodeUnacked(u: UnacknowledgedMessage): ProtoBuffer = + var pb = initProtoBuffer() + let msgPb = wire.encode(u.message) + pb.write(1, msgPb.buffer) + pb.write(2, uint64(u.sendTime.toUnixMs)) + pb.write(3, uint32(u.resendAttempts)) + pb.finish() + pb + +proc decodeUnacked(buf: seq[byte]): ProtobufResult[UnacknowledgedMessage] = + let pb = initProtoBuffer(buf) + var msgBytes: seq[byte] + if not ?pb.getField(1, msgBytes): + return err(ProtobufError.missingRequiredField("UnacknowledgedMessage.message")) + let msg = SdsMessage.decode(msgBytes).valueOr: + return err(ProtobufError.missingRequiredField("UnacknowledgedMessage.message")) + var sendMs: uint64 + if not ?pb.getField(2, sendMs): + return err(ProtobufError.missingRequiredField("UnacknowledgedMessage.sendTime")) + var attempts: uint32 + discard pb.getField(3, attempts) + ok( + UnacknowledgedMessage.init( + message = msg, + sendTime = fromUnixMs(int64(sendMs)), + resendAttempts = int(attempts), + ) + ) + +# --------------------------------------------------------------------------- +# IncomingMessage +# --------------------------------------------------------------------------- + +proc encodeIncoming(m: IncomingMessage): ProtoBuffer = + var pb = initProtoBuffer() + let msgPb = wire.encode(m.message) + pb.write(1, msgPb.buffer) + for dep in m.missingDeps: + pb.write(2, dep) # SdsMessageID is string + pb.finish() + pb + +proc decodeIncoming(buf: seq[byte]): ProtobufResult[IncomingMessage] = + let pb = initProtoBuffer(buf) + var msgBytes: seq[byte] + if not ?pb.getField(1, msgBytes): + return err(ProtobufError.missingRequiredField("IncomingMessage.message")) + let msg = SdsMessage.decode(msgBytes).valueOr: + return err(ProtobufError.missingRequiredField("IncomingMessage.message")) + var deps: seq[SdsMessageID] + discard pb.getRepeatedField(2, deps) + var depSet = initHashSet[SdsMessageID]() + for d in deps: + depSet.incl(d) + ok(IncomingMessage.init(message = msg, missingDeps = depSet)) + +# --------------------------------------------------------------------------- +# OutgoingRepairEntry / OutgoingRepairKV +# --------------------------------------------------------------------------- + +proc encodeOutRepairEntry(e: OutgoingRepairEntry): ProtoBuffer = + var pb = initProtoBuffer() + let histPb = wire.encodeHistoryEntry(e.outHistEntry) + pb.write(1, histPb.buffer) + pb.write(2, uint64(e.minTimeRepairReq.toUnixMs)) + pb.finish() + pb + +proc decodeOutRepairEntry(buf: seq[byte]): ProtobufResult[OutgoingRepairEntry] = + let pb = initProtoBuffer(buf) + var histBytes: seq[byte] + if not ?pb.getField(1, histBytes): + return err(ProtobufError.missingRequiredField("OutgoingRepairEntry.outHistEntry")) + let histPb = initProtoBuffer(histBytes) + let entry = ?wire.decodeHistoryEntry(histPb) + var ms: uint64 + if not ?pb.getField(2, ms): + return err(ProtobufError.missingRequiredField("OutgoingRepairEntry.minTimeRepairReq")) + ok( + OutgoingRepairEntry.init( + outHistEntry = entry, minTimeRepairReq = fromUnixMs(int64(ms)) + ) + ) + +proc encodeOutRepairKV(kv: OutgoingRepairKV): ProtoBuffer = + var pb = initProtoBuffer() + pb.write(1, kv.messageId) + let entryPb = encodeOutRepairEntry(kv.entry) + pb.write(2, entryPb.buffer) + pb.finish() + pb + +proc decodeOutRepairKV(buf: seq[byte]): ProtobufResult[OutgoingRepairKV] = + let pb = initProtoBuffer(buf) + var msgId: SdsMessageID + if not ?pb.getField(1, msgId): + return err(ProtobufError.missingRequiredField("OutgoingRepairKV.messageId")) + var entryBytes: seq[byte] + if not ?pb.getField(2, entryBytes): + return err(ProtobufError.missingRequiredField("OutgoingRepairKV.entry")) + let entry = ?decodeOutRepairEntry(entryBytes) + ok(OutgoingRepairKV(messageId: msgId, entry: entry)) + +# --------------------------------------------------------------------------- +# IncomingRepairEntry / IncomingRepairKV +# --------------------------------------------------------------------------- + +proc encodeInRepairEntry(e: IncomingRepairEntry): ProtoBuffer = + var pb = initProtoBuffer() + let histPb = wire.encodeHistoryEntry(e.inHistEntry) + pb.write(1, histPb.buffer) + pb.write(2, e.cachedMessage) + pb.write(3, uint64(e.minTimeRepairResp.toUnixMs)) + pb.finish() + pb + +proc decodeInRepairEntry(buf: seq[byte]): ProtobufResult[IncomingRepairEntry] = + let pb = initProtoBuffer(buf) + var histBytes: seq[byte] + if not ?pb.getField(1, histBytes): + return err(ProtobufError.missingRequiredField("IncomingRepairEntry.inHistEntry")) + let histPb = initProtoBuffer(histBytes) + let entry = ?wire.decodeHistoryEntry(histPb) + var cached: seq[byte] + if not ?pb.getField(2, cached): + return err(ProtobufError.missingRequiredField("IncomingRepairEntry.cachedMessage")) + var ms: uint64 + if not ?pb.getField(3, ms): + return err(ProtobufError.missingRequiredField("IncomingRepairEntry.minTimeRepairResp")) + ok( + IncomingRepairEntry.init( + inHistEntry = entry, + cachedMessage = cached, + minTimeRepairResp = fromUnixMs(int64(ms)), + ) + ) + +proc encodeInRepairKV(kv: IncomingRepairKV): ProtoBuffer = + var pb = initProtoBuffer() + pb.write(1, kv.messageId) + let entryPb = encodeInRepairEntry(kv.entry) + pb.write(2, entryPb.buffer) + pb.finish() + pb + +proc decodeInRepairKV(buf: seq[byte]): ProtobufResult[IncomingRepairKV] = + let pb = initProtoBuffer(buf) + var msgId: SdsMessageID + if not ?pb.getField(1, msgId): + return err(ProtobufError.missingRequiredField("IncomingRepairKV.messageId")) + var entryBytes: seq[byte] + if not ?pb.getField(2, entryBytes): + return err(ProtobufError.missingRequiredField("IncomingRepairKV.entry")) + let entry = ?decodeInRepairEntry(entryBytes) + ok(IncomingRepairKV(messageId: msgId, entry: entry)) + +# --------------------------------------------------------------------------- +# ChannelMeta (top-level snapshot) +# --------------------------------------------------------------------------- + +proc encode*(meta: ChannelMeta): ProtoBuffer = + var pb = initProtoBuffer() + pb.write(1, meta.schemaVersion) + pb.write(2, uint64(meta.lamportTimestamp)) + for u in meta.outgoingBuffer: + let entryPb = encodeUnacked(u) + pb.write(3, entryPb.buffer) + for m in meta.incomingBuffer: + let entryPb = encodeIncoming(m) + pb.write(4, entryPb.buffer) + for kv in meta.outgoingRepairBuffer: + let entryPb = encodeOutRepairKV(kv) + pb.write(5, entryPb.buffer) + for kv in meta.incomingRepairBuffer: + let entryPb = encodeInRepairKV(kv) + pb.write(6, entryPb.buffer) + pb.finish() + pb + +proc decode*(T: type ChannelMeta, buf: seq[byte]): ProtobufResult[T] = + let pb = initProtoBuffer(buf) + var meta = ChannelMeta.init() + + var ver: uint32 + if not ?pb.getField(1, ver): + return err(ProtobufError.missingRequiredField("ChannelMeta.schemaVersion")) + if ver != ChannelMetaSchemaVersion: + # Per the contract: refuse loudly rather than silently truncating. + return err(ProtobufError.missingRequiredField( + "ChannelMeta.schemaVersion(unsupported)" + )) + meta.schemaVersion = ver + + var lts: uint64 + if not ?pb.getField(2, lts): + return err(ProtobufError.missingRequiredField("ChannelMeta.lamportTimestamp")) + meta.lamportTimestamp = int64(lts) + + var outBufs, inBufs, outRepBufs, inRepBufs: seq[seq[byte]] + discard pb.getRepeatedField(3, outBufs) + for b in outBufs: + meta.outgoingBuffer.add(?decodeUnacked(b)) + discard pb.getRepeatedField(4, inBufs) + for b in inBufs: + meta.incomingBuffer.add(?decodeIncoming(b)) + discard pb.getRepeatedField(5, outRepBufs) + for b in outRepBufs: + meta.outgoingRepairBuffer.add(?decodeOutRepairKV(b)) + discard pb.getRepeatedField(6, inRepBufs) + for b in inRepBufs: + meta.incomingRepairBuffer.add(?decodeInRepairKV(b)) + ok(meta) + +proc serialize*(meta: ChannelMeta): Result[seq[byte], ReliabilityError] = + ok(encode(meta).buffer) + +proc deserializeChannelMeta*( + data: seq[byte] +): Result[ChannelMeta, ReliabilityError] = + let m = ChannelMeta.decode(data).valueOr: + return err(ReliabilityError.reDeserializationError) + ok(m) + +# --------------------------------------------------------------------------- +# ChannelData (bootstrap payload) +# --------------------------------------------------------------------------- + +proc encode*(d: ChannelData): ProtoBuffer = + var pb = initProtoBuffer() + let metaPb = encode(d.meta) + pb.write(1, metaPb.buffer) + for m in d.messageHistory: + let msgPb = wire.encode(m) + pb.write(2, msgPb.buffer) + pb.finish() + pb + +proc decode*(T: type ChannelData, buf: seq[byte]): ProtobufResult[T] = + let pb = initProtoBuffer(buf) + var d = ChannelData.init() + var metaBytes: seq[byte] + if not ?pb.getField(1, metaBytes): + return err(ProtobufError.missingRequiredField("ChannelData.meta")) + d.meta = ?ChannelMeta.decode(metaBytes) + var histBufs: seq[seq[byte]] + discard pb.getRepeatedField(2, histBufs) + for b in histBufs: + let m = SdsMessage.decode(b).valueOr: + return err(ProtobufError.missingRequiredField("ChannelData.messageHistory[i]")) + d.messageHistory.add(m) + ok(d) + +# --------------------------------------------------------------------------- +# HistoryUpdate +# --------------------------------------------------------------------------- + +proc encode*(u: HistoryUpdate): ProtoBuffer = + var pb = initProtoBuffer() + for m in u.append: + let msgPb = wire.encode(m) + pb.write(1, msgPb.buffer) + for id in u.evict: + pb.write(2, id) + pb.finish() + pb + +proc decode*(T: type HistoryUpdate, buf: seq[byte]): ProtobufResult[T] = + let pb = initProtoBuffer(buf) + var u = HistoryUpdate.init() + var appBufs: seq[seq[byte]] + discard pb.getRepeatedField(1, appBufs) + for b in appBufs: + let m = SdsMessage.decode(b).valueOr: + return err(ProtobufError.missingRequiredField("HistoryUpdate.append[i]")) + u.append.add(m) + var ev: seq[SdsMessageID] + discard pb.getRepeatedField(2, ev) + u.evict = ev + ok(u) + +{.pop.} diff --git a/sds/types/channel_context.nim b/sds/types/channel_context.nim index 3f7bee6..b5455e7 100644 --- a/sds/types/channel_context.nim +++ b/sds/types/channel_context.nim @@ -1,4 +1,4 @@ -import std/tables +import std/[sets, tables] import ./sds_message_id import ./sds_message import ./rolling_bloom_filter @@ -23,6 +23,31 @@ type ChannelContext* = ref object ## SDS-R buffers outgoingRepairBuffer*: Table[SdsMessageID, OutgoingRepairEntry] incomingRepairBuffer*: Table[SdsMessageID, IncomingRepairEntry] + ## R2 pending-write queue for history (see PLAN §8 + PR #72 review). + ## When `updateHistory` fails, the failed (append, evict) batch is parked + ## here and merged with the next op's batch on the next `tryUpdateHistory` + ## call. Cleared on successful flush. NOT persisted — runtime-only state; + ## on a crash the in-memory `messageHistory` is also lost and the next + ## `loadChannel` brings whatever made it to disk. + ## + ## INVARIANT (relied on by the flush): every id in `pendingHistoryAppends` + ## is also present in `messageHistory`. The full `SdsMessage` is NOT + ## stored here — it is looked up from `messageHistory` at flush time. + ## Storing only the id avoids the ~1 KB-per-entry duplication of + ## SdsMessage that an OrderedTable would carry. + pendingHistoryAppends*: OrderedSet[SdsMessageID] + ## Pending appends, in insertion order so the on-disk log stays + ## oldest-first across retries. + pendingHistoryEvicts*: HashSet[SdsMessageID] + ## Pending evictions. Set semantics — evicting the same id twice is a + ## no-op. + ## + ## Merge rule with `pendingHistoryAppends`: **latest operation wins.** + ## Queuing an append cancels any pending evict for the same id; + ## queuing an evict cancels any pending append. This handles the + ## "evict-then-re-add" sequence correctly (e.g. SDS-R repair + ## re-delivers a message that was previously evicted while the + ## backend was unreachable). proc new*(T: type ChannelContext, bloomFilter: RollingBloomFilter): T = return T( @@ -33,4 +58,6 @@ proc new*(T: type ChannelContext, bloomFilter: RollingBloomFilter): T = incomingBuffer: initTable[SdsMessageID, IncomingMessage](), outgoingRepairBuffer: initTable[SdsMessageID, OutgoingRepairEntry](), incomingRepairBuffer: initTable[SdsMessageID, IncomingRepairEntry](), + pendingHistoryAppends: initOrderedSet[SdsMessageID](), + pendingHistoryEvicts: initHashSet[SdsMessageID](), ) diff --git a/sds/types/channel_meta.nim b/sds/types/channel_meta.nim new file mode 100644 index 0000000..0d1f1fb --- /dev/null +++ b/sds/types/channel_meta.nim @@ -0,0 +1,82 @@ +## Atomic snapshot types for the per-channel protocol state. +## +## These types replace the fine-grained mutation operations of the original +## Persistence interface with a single self-contained blob per channel. +## See PLAN_SNAPSHOT_PERSISTENCE.md §1 for the rationale, §6 for the codec +## choice, §7 for size estimates. +## +## Bloom filter is intentionally absent — rebuilt from the message log on +## bootstrap. Message history is also absent — persisted separately via +## `HistoryUpdate` because it is large and append-mostly. + +import ./sds_message_id +import ./sds_message +import ./unacknowledged_message +import ./incoming_message +import ./repair_entry +export + sds_message_id, sds_message, unacknowledged_message, incoming_message, + repair_entry + +const ChannelMetaSchemaVersion* = 1'u32 + ## On-disk format version for ChannelMeta. Decoders MUST refuse to load a + ## blob whose version they don't know how to interpret, rather than + ## silently truncating or zero-filling unknown fields. + +type + OutgoingRepairKV* = object + ## Flattened (key, value) entry from the in-memory + ## `outgoingRepairBuffer: Table[SdsMessageID, OutgoingRepairEntry]`. + ## Protobuf has no first-class map type in the minprotobuf subset we + ## use; even proto3 `map` is wire-encoded as repeated KV messages. + ## Flattening to a `seq[KV]` makes that shape explicit. + messageId*: SdsMessageID + entry*: OutgoingRepairEntry + + IncomingRepairKV* = object + ## Flattened (key, value) entry from + ## `incomingRepairBuffer: Table[SdsMessageID, IncomingRepairEntry]`. + messageId*: SdsMessageID + entry*: IncomingRepairEntry + + ChannelMeta* = object + ## Atomic snapshot of the fast-changing per-channel protocol state. + ## Persisted as one blob per `saveChannelMeta` call. The `Table`-backed + ## buffers in `ChannelContext` are flattened to `seq`s here for stable + ## serialization and deterministic ordering on disk. + schemaVersion*: uint32 + lamportTimestamp*: int64 + outgoingBuffer*: seq[UnacknowledgedMessage] + ## Sent-but-not-yet-acked. Order matches insertion order in + ## ChannelContext.outgoingBuffer; preserved on save/load. + incomingBuffer*: seq[IncomingMessage] + ## Received-but-not-yet-deliverable; key in memory is + ## `message.messageId`, so no KV wrapper is needed. + outgoingRepairBuffer*: seq[OutgoingRepairKV] + incomingRepairBuffer*: seq[IncomingRepairKV] + + ChannelData* = object + ## Returned by `loadChannel` on `getOrCreateChannel` bootstrap. + ## Carries everything needed to rebuild the in-memory `ChannelContext` + ## from a clean restart. + meta*: ChannelMeta + messageHistory*: seq[SdsMessage] + ## MUST be ordered oldest-first (lamportTimestamp ASC, tie-break + ## msg_id ASC). Bloom filter is rebuilt from this on load; FIFO + ## eviction at maxMessageHistory relies on this ordering. Backend + ## contract; the loader SHOULD validate. + +proc init*(T: type ChannelMeta): T = + ## Empty snapshot with current schema version. Used as the bootstrap + ## payload when no on-disk state exists for a channel. + T( + schemaVersion: ChannelMetaSchemaVersion, + lamportTimestamp: 0, + outgoingBuffer: @[], + incomingBuffer: @[], + outgoingRepairBuffer: @[], + incomingRepairBuffer: @[], + ) + +proc init*(T: type ChannelData): T = + T(meta: ChannelMeta.init(), messageHistory: @[]) diff --git a/sds/types/history_update.nim b/sds/types/history_update.nim new file mode 100644 index 0000000..422bbef --- /dev/null +++ b/sds/types/history_update.nim @@ -0,0 +1,30 @@ +## Combined append/evict payload for the persistence message log. +## +## One protocol operation may deliver multiple messages in sequence (e.g. +## `unwrapReceivedMessage` followed by a `processIncomingBuffer` cascade +## that unblocks several buffered messages), and may also evict the oldest +## entries past `maxMessageHistory` in the same operation. Bundling all of +## those into a single `HistoryUpdate` lets the persistence backend execute +## the append + evict as one atomic batch alongside the matching +## `saveChannelMeta` call. + +import ./sds_message_id +import ./sds_message +export sds_message_id, sds_message + +type HistoryUpdate* = object + ## When BOTH `append` and `evict` are empty, callers SHOULD skip the + ## persistence call entirely. The Persistence interface treats an + ## "empty" update as a no-op but the round-trip is not free. + append*: seq[SdsMessage] + ## New delivered messages, in delivery order. Order matters for the + ## backend's append-only log; nim-sds preserves causal ordering when + ## populating this list. + evict*: seq[SdsMessageID] + ## Oldest messages now past `maxMessageHistory`. Backend deletes by id. + +proc init*(T: type HistoryUpdate): T = + T(append: @[], evict: @[]) + +proc isEmpty*(u: HistoryUpdate): bool = + u.append.len == 0 and u.evict.len == 0 diff --git a/sds/types/persistence.nim b/sds/types/persistence.nim index 6ae34e9..6f5e459 100644 --- a/sds/types/persistence.nim +++ b/sds/types/persistence.nim @@ -1,163 +1,94 @@ -import chronos +## Snapshot-based persistence interface (5 procs). +## +## Each protocol op issues AT MOST one `saveChannelMeta` and one +## `updateHistory` call at the end of the op, under the channel lock. The +## meta blob is the complete current per-channel state (lamport clock, +## outgoing/incoming buffers, SDS-R repair buffers); the history update +## carries (append, evict) for the message log. Bloom filter is rebuilt +## from history on bootstrap, never persisted. +## +## Atomicity expectation: nim-sds issues `saveChannelMeta` and (when +## non-empty) `updateHistory` back-to-back with NO intervening +## `await`-of-other-work. The backend MAY treat the pair as one +## transaction. The pair is keyed on the same `channelId`. +## +## Failure policy: a failed `saveChannelMeta` or `updateHistory` MUST NOT +## abort the protocol op. The next op's save is fully self-contained and +## will re-synchronise on-disk state. See PLAN_SNAPSHOT_PERSISTENCE.md §8. +## `loadChannel` and `dropChannel` DO surface errors — they're the +## durability-intent ops. + +import chronos, results import ./sds_message_id -import ./sds_message -import ./unacknowledged_message -import ./incoming_message -import ./repair_entry -export - sds_message_id, sds_message, unacknowledged_message, incoming_message, repair_entry +import ./channel_meta +import ./history_update +export results, sds_message_id, channel_meta, history_update -## SDS state persistence interface (issue #64). -## -## Defines WHAT operations a persistence backend must provide. The actual -## storage technology (SQLite, encrypted file, in-memory) is supplied by the -## caller — nim-sds knows nothing about it. Every state-mutating proc in the -## protocol calls into one of these procs immediately after the in-memory -## change, so on-disk state stays in lockstep with in-memory state. -## -## All proc fields are async (return `Future`) so backends can do real I/O -## without blocking the Chronos event loop the manager runs on. -## -## Bloom filter is intentionally not persisted: it is rebuilt from the local -## history log on bootstrap. Async timers are likewise recomputed from the -## absolute timestamps stored in the repair buffer entries. +type Persistence* = object + ## Pluggable durability backend. Supplied at `newReliabilityManager` + ## construction time; defaults to `noOpPersistence()` when not given. -type - ChannelSnapshot* = object - ## Returned by `loadAllForChannel` on bootstrap. Carries the entire - ## per-channel state needed to repopulate a `ChannelContext`. The bloom - ## filter is NOT in the snapshot — callers rebuild it from `messageHistory`. - lamportTimestamp*: int64 - messageHistory*: seq[SdsMessage] - ## MUST be ordered oldest-first. FIFO eviction relies on insertion order; - ## skipping ORDER BY corrupts the log across restarts. - outgoingBuffer*: seq[UnacknowledgedMessage] - incomingBuffer*: seq[IncomingMessage] - outgoingRepairBuffer*: seq[(SdsMessageID, OutgoingRepairEntry)] - incomingRepairBuffer*: seq[(SdsMessageID, IncomingRepairEntry)] + saveChannelMeta*: proc( + channelId: SdsChannelID, meta: ChannelMeta + ): Future[Result[void, string]] {.async: (raises: []), gcsafe.} + ## Persist the complete current per-channel snapshot. Idempotent: the + ## blob is the full state, so a missed write is recovered by any later + ## successful write. - Persistence* = object - ## Pluggable persistence contract. The caller supplies an instance of this - ## type at `newReliabilityManager` construction time. Each proc field is - ## invoked by nim-sds at the corresponding state-mutation point. - ## All fields are async; nim-sds awaits each call to keep on-disk and - ## in-memory state in lockstep without blocking the event loop. + updateHistory*: proc( + channelId: SdsChannelID, update: HistoryUpdate + ): Future[Result[void, string]] {.async: (raises: []), gcsafe.} + ## Append newly-delivered messages and evict oldest ones past the + ## maxMessageHistory cap. Callers SHOULD skip this call entirely when + ## `update.isEmpty`. - # Per-channel lamport clock - saveLamport*: proc(channelId: SdsChannelID, lamport: int64): Future[void] {. - async: (raises: []), gcsafe - .} + loadChannel*: proc( + channelId: SdsChannelID + ): Future[Result[ChannelData, string]] {.async: (raises: []), gcsafe.} + ## Bootstrap on `getOrCreateChannel`. Returns the full prior state, or + ## an empty `ChannelData` if the channel is new on disk. Failure + ## propagates to the caller — bootstrap is a durability-intent op. - # Local log (delivered messages) - appendLogEntry*: proc(channelId: SdsChannelID, msg: SdsMessage): Future[void] {. - async: (raises: []), gcsafe - .} - removeLogEntry*: proc(channelId: SdsChannelID, msgId: SdsMessageID): Future[void] {. - async: (raises: []), gcsafe - .} - setRetrievalHint*: proc(msgId: SdsMessageID, hint: seq[byte]): Future[void] {. - async: (raises: []), gcsafe - .} + dropChannel*: proc( + channelId: SdsChannelID + ): Future[Result[void, string]] {.async: (raises: []), gcsafe.} + ## Wipe all persisted state for a channel. Called by `removeChannel` / + ## `resetReliabilityManager`. Backends SHOULD execute atomically. + ## Failure propagates to the caller — the caller asked us to confirm a + ## disk wipe and we cannot silently lie. - # Outgoing unacknowledged buffer - saveOutgoing*: proc( - channelId: SdsChannelID, msg: UnacknowledgedMessage - ): Future[void] {.async: (raises: []), gcsafe.} - removeOutgoing*: proc(channelId: SdsChannelID, msgId: SdsMessageID): Future[void] {. - async: (raises: []), gcsafe - .} - - # Incoming dependency-waiting buffer - saveIncoming*: proc(channelId: SdsChannelID, msg: IncomingMessage): Future[void] {. - async: (raises: []), gcsafe - .} - removeIncoming*: proc(channelId: SdsChannelID, msgId: SdsMessageID): Future[void] {. - async: (raises: []), gcsafe - .} - - # SDS-R outgoing repair buffer - saveOutgoingRepair*: proc( - channelId: SdsChannelID, msgId: SdsMessageID, entry: OutgoingRepairEntry - ) {.async: (raises: []).} - removeOutgoingRepair*: proc( - channelId: SdsChannelID, msgId: SdsMessageID - ): Future[void] {.async: (raises: []), gcsafe.} - - # SDS-R incoming repair buffer - saveIncomingRepair*: proc( - channelId: SdsChannelID, msgId: SdsMessageID, entry: IncomingRepairEntry - ) {.async: (raises: []).} - removeIncomingRepair*: proc( - channelId: SdsChannelID, msgId: SdsMessageID - ): Future[void] {.async: (raises: []), gcsafe.} - - # Wipe all persisted state for a channel in one transactional call. - # Called by removeChannel / resetReliabilityManager. Backends should - # implement this atomically (e.g. one BEGIN/COMMIT) — a per-row loop on - # the nim-sds side would mean N fsyncs per drop. - dropChannel*: - proc(channelId: SdsChannelID): Future[void] {.async: (raises: []), gcsafe.} - - # Bootstrap on `addChannel` / `getOrCreateChannel`. - loadAllForChannel*: proc(channelId: SdsChannelID): Future[ChannelSnapshot] {. - async: (raises: []), gcsafe - .} + setRetrievalHint*: proc( + msgId: SdsMessageID, hint: seq[byte] + ): Future[Result[void, string]] {.async: (raises: []), gcsafe.} + ## Record a retrieval hint for a message id. Called from + ## `getRecentHistoryEntries` when an application-supplied hint + ## provider returns a non-empty hint. Out-of-band from the + ## snapshot/history write path because hints are populated lazily + ## during read. Non-fatal on failure. proc noOpPersistence*(): Persistence = - ## Default backend that discards every write and returns an empty snapshot. - ## Used so existing callers (and tests) that don't care about durability - ## keep working without supplying a real backend. + ## Default backend: discards all writes, returns an empty snapshot on + ## load. Used when no real backend is supplied (existing tests and + ## non-durability-needing callers). Persistence( - saveLamport: proc(channelId: SdsChannelID, lamport: int64) {.async: (raises: []).} = - discard, - appendLogEntry: proc( - channelId: SdsChannelID, msg: SdsMessage - ) {.async: (raises: []).} = - discard, - removeLogEntry: proc( - channelId: SdsChannelID, msgId: SdsMessageID - ) {.async: (raises: []).} = - discard, + saveChannelMeta: proc( + channelId: SdsChannelID, meta: ChannelMeta + ): Future[Result[void, string]] {.async: (raises: []).} = + ok(), + updateHistory: proc( + channelId: SdsChannelID, update: HistoryUpdate + ): Future[Result[void, string]] {.async: (raises: []).} = + ok(), + loadChannel: proc( + channelId: SdsChannelID + ): Future[Result[ChannelData, string]] {.async: (raises: []).} = + ok(ChannelData.init()), + dropChannel: proc( + channelId: SdsChannelID + ): Future[Result[void, string]] {.async: (raises: []).} = + ok(), setRetrievalHint: proc( msgId: SdsMessageID, hint: seq[byte] - ) {.async: (raises: []).} = - discard, - saveOutgoing: proc( - channelId: SdsChannelID, msg: UnacknowledgedMessage - ) {.async: (raises: []).} = - discard, - removeOutgoing: proc( - channelId: SdsChannelID, msgId: SdsMessageID - ) {.async: (raises: []).} = - discard, - saveIncoming: proc( - channelId: SdsChannelID, msg: IncomingMessage - ) {.async: (raises: []).} = - discard, - removeIncoming: proc( - channelId: SdsChannelID, msgId: SdsMessageID - ) {.async: (raises: []).} = - discard, - saveOutgoingRepair: proc( - channelId: SdsChannelID, msgId: SdsMessageID, entry: OutgoingRepairEntry - ) {.async: (raises: []).} = - discard, - removeOutgoingRepair: proc( - channelId: SdsChannelID, msgId: SdsMessageID - ) {.async: (raises: []).} = - discard, - saveIncomingRepair: proc( - channelId: SdsChannelID, msgId: SdsMessageID, entry: IncomingRepairEntry - ) {.async: (raises: []).} = - discard, - removeIncomingRepair: proc( - channelId: SdsChannelID, msgId: SdsMessageID - ) {.async: (raises: []).} = - discard, - dropChannel: proc(channelId: SdsChannelID) {.async: (raises: []).} = - discard, - loadAllForChannel: proc( - channelId: SdsChannelID - ): Future[ChannelSnapshot] {.async: (raises: []).} = - return ChannelSnapshot(), + ): Future[Result[void, string]] {.async: (raises: []).} = + ok(), ) diff --git a/sds/types/reliability_error.nim b/sds/types/reliability_error.nim index 43af2f7..71dc494 100644 --- a/sds/types/reliability_error.nim +++ b/sds/types/reliability_error.nim @@ -5,3 +5,4 @@ type ReliabilityError* {.pure.} = enum reSerializationError reDeserializationError reMessageTooLarge + rePersistenceError ## A persistence backend operation (read or write) failed. diff --git a/tests/in_memory_persistence.nim b/tests/in_memory_persistence.nim index b0834e8..087545e 100644 --- a/tests/in_memory_persistence.nim +++ b/tests/in_memory_persistence.nim @@ -1,12 +1,17 @@ -import std/tables +## Test-only Persistence backend backed by Nim tables. Adapts the +## snapshot-based `Persistence` interface onto a denormalised +## `InMemoryStore` shape so test assertions can inspect individual buffers +## (`store.outgoing`, `store.log`, etc.) directly. The adapter +## decomposes the meta blob on save and reconstructs it on load. +## +## `failingOps` injects backend failures. Op names match the `Persistence` +## field names: "saveChannelMeta", "updateHistory", "loadChannel", +## "dropChannel", "setRetrievalHint". + +import std/[tables, sets] import chronos import sds -## Test-only Persistence backend backed by Nim tables. Lets tests verify the -## full write → restart → read-back loop without depending on SQLite (or any -## real storage technology). Exposes the underlying store so tests can assert -## on what got saved. - type InMemoryStore* = ref object lamports*: Table[SdsChannelID, int64] log*: Table[SdsChannelID, OrderedTable[SdsMessageID, SdsMessage]] @@ -16,94 +21,95 @@ type InMemoryStore* = ref object outgoingRepair*: Table[SdsChannelID, OrderedTable[SdsMessageID, OutgoingRepairEntry]] incomingRepair*: Table[SdsChannelID, OrderedTable[SdsMessageID, IncomingRepairEntry]] dropChannelCalls*: Table[SdsChannelID, int] - ## Per-channel counter; lets tests assert dropChannel is invoked exactly - ## once per logical drop (not N times — see PR #66 review). + ## Per-channel counter; lets tests assert dropChannel is invoked + ## exactly once per logical drop. + failingOps*: HashSet[string] + ## Op names that should return an injected backend error. proc newInMemoryStore*(): InMemoryStore = - InMemoryStore() + InMemoryStore(failingOps: initHashSet[string]()) proc newInMemoryPersistence*(store: InMemoryStore): Persistence = Persistence( - saveLamport: proc(channelId: SdsChannelID, lamport: int64) {.async: (raises: []).} = - store.lamports[channelId] = lamport, - appendLogEntry: proc( - channelId: SdsChannelID, msg: SdsMessage - ) {.async: (raises: []).} = + saveChannelMeta: proc( + channelId: SdsChannelID, meta: ChannelMeta + ): Future[Result[void, string]] {.async: (raises: []).} = + if "saveChannelMeta" in store.failingOps: + return err("injected backend failure: saveChannelMeta") + {.cast(raises: []).}: + # Lamport. + store.lamports[channelId] = meta.lamportTimestamp + + # Outgoing buffer — replace existing rows wholesale (snapshot is + # the complete state, not a delta). + store.outgoing[channelId] = + initOrderedTable[SdsMessageID, UnacknowledgedMessage]() + for u in meta.outgoingBuffer: + store.outgoing[channelId][u.message.messageId] = u + + # Incoming buffer. + store.incoming[channelId] = + initOrderedTable[SdsMessageID, IncomingMessage]() + for m in meta.incomingBuffer: + store.incoming[channelId][m.message.messageId] = m + + # Repair buffers. + store.outgoingRepair[channelId] = + initOrderedTable[SdsMessageID, OutgoingRepairEntry]() + for kv in meta.outgoingRepairBuffer: + store.outgoingRepair[channelId][kv.messageId] = kv.entry + store.incomingRepair[channelId] = + initOrderedTable[SdsMessageID, IncomingRepairEntry]() + for kv in meta.incomingRepairBuffer: + store.incomingRepair[channelId][kv.messageId] = kv.entry + ok(), + updateHistory: proc( + channelId: SdsChannelID, update: HistoryUpdate + ): Future[Result[void, string]] {.async: (raises: []).} = + if "updateHistory" in store.failingOps: + return err("injected backend failure: updateHistory") {.cast(raises: []).}: if channelId notin store.log: store.log[channelId] = initOrderedTable[SdsMessageID, SdsMessage]() - store.log[channelId][msg.messageId] = msg, - removeLogEntry: proc( - channelId: SdsChannelID, msgId: SdsMessageID - ) {.async: (raises: []).} = - {.cast(raises: []).}: - if channelId in store.log: - store.log[channelId].del(msgId) - , - setRetrievalHint: proc( - msgId: SdsMessageID, hint: seq[byte] - ) {.async: (raises: []).} = - store.hints[msgId] = hint, - saveOutgoing: proc( - channelId: SdsChannelID, msg: UnacknowledgedMessage - ) {.async: (raises: []).} = - {.cast(raises: []).}: - if channelId notin store.outgoing: - store.outgoing[channelId] = - initOrderedTable[SdsMessageID, UnacknowledgedMessage]() - store.outgoing[channelId][msg.message.messageId] = msg, - removeOutgoing: proc( - channelId: SdsChannelID, msgId: SdsMessageID - ) {.async: (raises: []).} = + for m in update.append: + store.log[channelId][m.messageId] = m + for id in update.evict: + store.log[channelId].del(id) + ok(), + loadChannel: proc( + channelId: SdsChannelID + ): Future[Result[ChannelData, string]] {.async: (raises: []).} = + if "loadChannel" in store.failingOps: + return err("injected backend failure: loadChannel") {.cast(raises: []).}: + var data = ChannelData.init() + if channelId in store.lamports: + data.meta.lamportTimestamp = store.lamports[channelId] if channelId in store.outgoing: - store.outgoing[channelId].del(msgId) - , - saveIncoming: proc( - channelId: SdsChannelID, msg: IncomingMessage - ) {.async: (raises: []).} = - {.cast(raises: []).}: - if channelId notin store.incoming: - store.incoming[channelId] = initOrderedTable[SdsMessageID, IncomingMessage]() - store.incoming[channelId][msg.message.messageId] = msg, - removeIncoming: proc( - channelId: SdsChannelID, msgId: SdsMessageID - ) {.async: (raises: []).} = - {.cast(raises: []).}: + for u in store.outgoing[channelId].values: + data.meta.outgoingBuffer.add(u) if channelId in store.incoming: - store.incoming[channelId].del(msgId) - , - saveOutgoingRepair: proc( - channelId: SdsChannelID, msgId: SdsMessageID, entry: OutgoingRepairEntry - ) {.async: (raises: []).} = - {.cast(raises: []).}: - if channelId notin store.outgoingRepair: - store.outgoingRepair[channelId] = - initOrderedTable[SdsMessageID, OutgoingRepairEntry]() - store.outgoingRepair[channelId][msgId] = entry, - removeOutgoingRepair: proc( - channelId: SdsChannelID, msgId: SdsMessageID - ) {.async: (raises: []).} = - {.cast(raises: []).}: + for m in store.incoming[channelId].values: + data.meta.incomingBuffer.add(m) if channelId in store.outgoingRepair: - store.outgoingRepair[channelId].del(msgId) - , - saveIncomingRepair: proc( - channelId: SdsChannelID, msgId: SdsMessageID, entry: IncomingRepairEntry - ) {.async: (raises: []).} = - {.cast(raises: []).}: - if channelId notin store.incomingRepair: - store.incomingRepair[channelId] = - initOrderedTable[SdsMessageID, IncomingRepairEntry]() - store.incomingRepair[channelId][msgId] = entry, - removeIncomingRepair: proc( - channelId: SdsChannelID, msgId: SdsMessageID - ) {.async: (raises: []).} = - {.cast(raises: []).}: + for id, e in store.outgoingRepair[channelId].pairs: + data.meta.outgoingRepairBuffer.add( + OutgoingRepairKV(messageId: id, entry: e) + ) if channelId in store.incomingRepair: - store.incomingRepair[channelId].del(msgId) - , - dropChannel: proc(channelId: SdsChannelID) {.async: (raises: []).} = + for id, e in store.incomingRepair[channelId].pairs: + data.meta.incomingRepairBuffer.add( + IncomingRepairKV(messageId: id, entry: e) + ) + if channelId in store.log: + for m in store.log[channelId].values: + data.messageHistory.add(m) + return ok(data), + dropChannel: proc( + channelId: SdsChannelID + ): Future[Result[void, string]] {.async: (raises: []).} = + if "dropChannel" in store.failingOps: + return err("injected backend failure: dropChannel") {.cast(raises: []).}: store.lamports.del(channelId) store.log.del(channelId) @@ -112,28 +118,13 @@ proc newInMemoryPersistence*(store: InMemoryStore): Persistence = store.outgoingRepair.del(channelId) store.incomingRepair.del(channelId) store.dropChannelCalls[channelId] = - store.dropChannelCalls.getOrDefault(channelId) + 1, - loadAllForChannel: proc( - channelId: SdsChannelID - ): Future[ChannelSnapshot] {.async: (raises: []).} = - {.cast(raises: []).}: - var snap = ChannelSnapshot() - if channelId in store.lamports: - snap.lamportTimestamp = store.lamports[channelId] - if channelId in store.log: - for msg in store.log[channelId].values: - snap.messageHistory.add(msg) - if channelId in store.outgoing: - for unack in store.outgoing[channelId].values: - snap.outgoingBuffer.add(unack) - if channelId in store.incoming: - for incoming in store.incoming[channelId].values: - snap.incomingBuffer.add(incoming) - if channelId in store.outgoingRepair: - for msgId, entry in store.outgoingRepair[channelId]: - snap.outgoingRepairBuffer.add((msgId, entry)) - if channelId in store.incomingRepair: - for msgId, entry in store.incomingRepair[channelId]: - snap.incomingRepairBuffer.add((msgId, entry)) - return snap, + store.dropChannelCalls.getOrDefault(channelId) + 1 + ok(), + setRetrievalHint: proc( + msgId: SdsMessageID, hint: seq[byte] + ): Future[Result[void, string]] {.async: (raises: []).} = + if "setRetrievalHint" in store.failingOps: + return err("injected backend failure: setRetrievalHint") + store.hints[msgId] = hint + ok(), ) diff --git a/tests/test_persistence.nim b/tests/test_persistence.nim index 89f085f..613e174 100644 --- a/tests/test_persistence.nim +++ b/tests/test_persistence.nim @@ -8,11 +8,23 @@ converter toParticipantID(s: string): SdsParticipantID = const testChannel = "testChannel" +# Helper: build a ReliabilityManager wired only to the V2 in-memory +# persistence (no legacy backend). Mirrors how production callers will +# construct the manager once phase 3 deletes the legacy field. +proc newV2Manager( + store: InMemoryStore, config = defaultConfig() +): ReliabilityManager = + newReliabilityManager( + participantId = "alice", + config = config, + persistence = newInMemoryPersistence(store), + ) + .get() + suite "Persistence: write → restart → read-back": asyncTest "outgoing buffer survives restart": let store = newInMemoryStore() - let p1 = newInMemoryPersistence(store) - let rm1 = newReliabilityManager(participantId = "alice", persistence = p1).get() + let rm1 = newV2Manager(store) check (await rm1.ensureChannel(testChannel)).isOk() let wrapped = await rm1.wrapOutgoingMessage(@[1.byte, 2, 3], "msg-1", testChannel) check wrapped.isOk() @@ -20,9 +32,8 @@ suite "Persistence: write → restart → read-back": check "msg-1" in store.outgoing[testChannel] await rm1.cleanup() - # Simulate restart: fresh manager, same backend - let p2 = newInMemoryPersistence(store) - let rm2 = newReliabilityManager(participantId = "alice", persistence = p2).get() + # Simulate restart: fresh manager, same backend. + let rm2 = newV2Manager(store) check (await rm2.ensureChannel(testChannel)).isOk() let buf = await rm2.getOutgoingBuffer(testChannel) check buf.len == 1 @@ -31,22 +42,25 @@ suite "Persistence: write → restart → read-back": asyncTest "lamport clock survives restart": let store = newInMemoryStore() - let p1 = newInMemoryPersistence(store) - let rm1 = newReliabilityManager(participantId = "alice", persistence = p1).get() + let rm1 = newV2Manager(store) check (await rm1.ensureChannel(testChannel)).isOk() - await rm1.updateLamportTimestamp(42, testChannel) - check store.lamports[testChannel] == 43 # max(42, 0) + 1 + check (await rm1.updateLamportTimestamp(42, testChannel)).isOk() + # updateLamportTimestamp is now pure; the mutation is persisted by the + # next op-end save. Drive a wrap to force a trySaveMeta. + discard await rm1.wrapOutgoingMessage(@[byte(1)], "tick", testChannel) + # max(42,0)+1 then max(getTime().toUnix, 43)+1; whatever wrap sets is + # what we'll see. We just assert it stayed monotonic. + check store.lamports[testChannel] >= 43 + let savedLamport = store.lamports[testChannel] await rm1.cleanup() - let p2 = newInMemoryPersistence(store) - let rm2 = newReliabilityManager(participantId = "alice", persistence = p2).get() + let rm2 = newV2Manager(store) check (await rm2.ensureChannel(testChannel)).isOk() - check rm2.channels[testChannel].lamportTimestamp == 43 + check rm2.channels[testChannel].lamportTimestamp == savedLamport asyncTest "delivered messages survive restart and rebuild bloom": let store = newInMemoryStore() - let p1 = newInMemoryPersistence(store) - let rm1 = newReliabilityManager(participantId = "alice", persistence = p1).get() + let rm1 = newV2Manager(store) check (await rm1.ensureChannel(testChannel)).isOk() let msg = SdsMessage.init( messageId = "delivered-1", @@ -57,28 +71,30 @@ suite "Persistence: write → restart → read-back": bloomFilter = @[], senderId = "alice", ) - await rm1.addToHistory(msg, testChannel) + check (await rm1.addToHistory(msg, testChannel)).isOk() + # New design: addToHistory queues; tryUpdateHistory flushes. Tests + # that drive addToHistory directly must follow with an explicit flush + # (in production, the public protocol op issues the flush at op end). + await rm1.tryUpdateHistory(testChannel) check store.log[testChannel].len == 1 await rm1.cleanup() - let p2 = newInMemoryPersistence(store) - let rm2 = newReliabilityManager(participantId = "alice", persistence = p2).get() + let rm2 = newV2Manager(store) check (await rm2.ensureChannel(testChannel)).isOk() let ch = rm2.channels[testChannel] check ch.messageHistory.len == 1 check "delivered-1" in ch.messageHistory - # Bloom filter rebuilt from log on bootstrap + # Bloom filter rebuilt from log on bootstrap. check ch.bloomFilter.contains("delivered-1") asyncTest "ack removes outgoing entry from persistence": let store = newInMemoryStore() - let p = newInMemoryPersistence(store) - let rm = newReliabilityManager(participantId = "alice", persistence = p).get() + let rm = newV2Manager(store) check (await rm.ensureChannel(testChannel)).isOk() discard await rm.wrapOutgoingMessage(@[1.byte], "msg-x", testChannel) check "msg-x" in store.outgoing[testChannel] - # Synthesize an incoming message that ACKs msg-x via causal history + # Synthesize an incoming message that ACKs msg-x via causal history. let ackMsg = SdsMessage.init( messageId = "ack-bearer", lamportTimestamp = 5, @@ -95,10 +111,9 @@ suite "Persistence: write → restart → read-back": asyncTest "removeChannel issues exactly one dropChannel call and wipes all state": # Regression for PR #66 review: removal must be a single transactional - # drop, not N per-row removes — otherwise SQLite eats N fsyncs per drop. + # drop, not N per-row removes. let store = newInMemoryStore() - let p = newInMemoryPersistence(store) - let rm = newReliabilityManager(participantId = "alice", persistence = p).get() + let rm = newV2Manager(store) check (await rm.ensureChannel(testChannel)).isOk() discard await rm.wrapOutgoingMessage(@[1.byte], "msg-r", testChannel) check store.outgoing[testChannel].len == 1 @@ -116,7 +131,7 @@ suite "Persistence: write → restart → read-back": asyncTest "noOpPersistence keeps existing manager working": let rm = newReliabilityManager(participantId = "alice").get() - # default no-op persistence + # default no-op persistence (both legacy and V2) check (await rm.ensureChannel(testChannel)).isOk() let wrapped = await rm.wrapOutgoingMessage(@[1.byte], "msg-n", testChannel) check wrapped.isOk() @@ -126,8 +141,7 @@ suite "Persistence: write → restart → read-back": asyncTest "continue operating after restart: lamport stays monotonic": let store = newInMemoryStore() - let p1 = newInMemoryPersistence(store) - let rm1 = newReliabilityManager(participantId = "alice", persistence = p1).get() + let rm1 = newV2Manager(store) check (await rm1.ensureChannel(testChannel)).isOk() discard await rm1.wrapOutgoingMessage(@[1.byte], "m1", testChannel) let lamportAfterSession1 = store.lamports[testChannel] @@ -135,8 +149,7 @@ suite "Persistence: write → restart → read-back": await rm1.cleanup() # Restart and send another message — lamport must not regress. - let p2 = newInMemoryPersistence(store) - let rm2 = newReliabilityManager(participantId = "alice", persistence = p2).get() + let rm2 = newV2Manager(store) check (await rm2.ensureChannel(testChannel)).isOk() check rm2.channels[testChannel].lamportTimestamp == lamportAfterSession1 discard await rm2.wrapOutgoingMessage(@[2.byte], "m2", testChannel) @@ -148,16 +161,13 @@ suite "Persistence: write → restart → read-back": asyncTest "multiple restart cycles preserve state": let store = newInMemoryStore() for i in 1 .. 3: - let p = newInMemoryPersistence(store) - let rm = newReliabilityManager(participantId = "alice", persistence = p).get() + let rm = newV2Manager(store) check (await rm.ensureChannel(testChannel)).isOk() discard await rm.wrapOutgoingMessage(@[byte(i)], "m" & $i, testChannel) await rm.cleanup() # Final session: all three messages must be in the buffer. - let pFinal = newInMemoryPersistence(store) - let rmFinal = - newReliabilityManager(participantId = "alice", persistence = pFinal).get() + let rmFinal = newV2Manager(store) check (await rmFinal.ensureChannel(testChannel)).isOk() let buf = await rmFinal.getOutgoingBuffer(testChannel) check buf.len == 3 @@ -171,8 +181,7 @@ suite "Persistence: write → restart → read-back": asyncTest "incoming dep-waiting buffer survives restart with missingDeps intact": let store = newInMemoryStore() - let p1 = newInMemoryPersistence(store) - let rm1 = newReliabilityManager(participantId = "alice", persistence = p1).get() + let rm1 = newV2Manager(store) check (await rm1.ensureChannel(testChannel)).isOk() # Receive a message whose causal-history references an unknown predecessor. @@ -191,8 +200,7 @@ suite "Persistence: write → restart → read-back": await rm1.cleanup() # Restart — buffered message and its missing-deps set must be back. - let p2 = newInMemoryPersistence(store) - let rm2 = newReliabilityManager(participantId = "alice", persistence = p2).get() + let rm2 = newV2Manager(store) check (await rm2.ensureChannel(testChannel)).isOk() let inbuf = await rm2.getIncomingBuffer(testChannel) check "msg-with-deps" in inbuf @@ -200,11 +208,8 @@ suite "Persistence: write → restart → read-back": await rm2.cleanup() asyncTest "removeChannel + recreate does not inherit stale lamport": - # Regression: dropChannel must wipe the lamport row; otherwise a recreate - # of the same channelId after restart picks up the old timestamp. let store = newInMemoryStore() - let p1 = newInMemoryPersistence(store) - let rm1 = newReliabilityManager(participantId = "alice", persistence = p1).get() + let rm1 = newV2Manager(store) check (await rm1.ensureChannel(testChannel)).isOk() discard await rm1.wrapOutgoingMessage(@[1.byte], "m-old", testChannel) check store.lamports[testChannel] > 0 @@ -213,8 +218,7 @@ suite "Persistence: write → restart → read-back": await rm1.cleanup() # Recreate the same channelId after a restart — must start fresh. - let p2 = newInMemoryPersistence(store) - let rm2 = newReliabilityManager(participantId = "alice", persistence = p2).get() + let rm2 = newV2Manager(store) check (await rm2.ensureChannel(testChannel)).isOk() check rm2.channels[testChannel].lamportTimestamp == 0 let buf = await rm2.getOutgoingBuffer(testChannel) @@ -223,11 +227,9 @@ suite "Persistence: write → restart → read-back": asyncTest "SDS-R outgoing repair buffer survives restart with absolute t_req_at": let store = newInMemoryStore() - let p1 = newInMemoryPersistence(store) - let rm1 = newReliabilityManager(participantId = "alice", persistence = p1).get() + let rm1 = newV2Manager(store) check (await rm1.ensureChannel(testChannel)).isOk() - # Receive a message that references an unknown dep — triggers SDS-R repair. let depMsg = SdsMessage.init( messageId = "msg-needs-repair", lamportTimestamp = 5, @@ -244,13 +246,16 @@ suite "Persistence: write → restart → read-back": check originalTReqAt.toUnix > 0 await rm1.cleanup() - # Restart — repair entry must be back with the SAME absolute time, not "now". - let p2 = newInMemoryPersistence(store) - let rm2 = newReliabilityManager(participantId = "alice", persistence = p2).get() + # Restart — repair entry must be back with the SAME absolute time. + # Codec serialises Time as int64 unix milliseconds (PLAN §1.5), so the + # restored Time may differ by sub-millisecond precision from the + # original. Compare at second resolution which is what the protocol + # actually relies on. + let rm2 = newV2Manager(store) check (await rm2.ensureChannel(testChannel)).isOk() let buf = rm2.channels[testChannel].outgoingRepairBuffer check "missing-dep" in buf - check buf["missing-dep"].minTimeRepairReq == originalTReqAt + check buf["missing-dep"].minTimeRepairReq.toUnix == originalTReqAt.toUnix await rm2.cleanup() asyncTest "FIFO eviction state survives restart": @@ -259,11 +264,7 @@ suite "Persistence: write → restart → read-back": smallCfg.maxMessageHistory = 3 smallCfg.bloomFilterCapacity = 3 - let p1 = newInMemoryPersistence(store) - let rm1 = newReliabilityManager( - participantId = "alice", config = smallCfg, persistence = p1 - ) - .get() + let rm1 = newV2Manager(store, smallCfg) check (await rm1.ensureChannel(testChannel)).isOk() # Add 5 delivered messages — first 2 should be evicted by FIFO. for i in 1 .. 5: @@ -276,18 +277,15 @@ suite "Persistence: write → restart → read-back": bloomFilter = @[], senderId = "alice", ) - await rm1.addToHistory(m, testChannel) + check (await rm1.addToHistory(m, testChannel)).isOk() + await rm1.tryUpdateHistory(testChannel) check store.log[testChannel].len == 3 check "m1" notin store.log[testChannel] check "m2" notin store.log[testChannel] await rm1.cleanup() # Restart — evicted entries must NOT come back; survivors keep order. - let p2 = newInMemoryPersistence(store) - let rm2 = newReliabilityManager( - participantId = "alice", config = smallCfg, persistence = p2 - ) - .get() + let rm2 = newV2Manager(store, smallCfg) check (await rm2.ensureChannel(testChannel)).isOk() let history = rm2.channels[testChannel].messageHistory check history.len == 3 @@ -295,7 +293,7 @@ suite "Persistence: write → restart → read-back": check "m2" notin history check "m3" in history check "m5" in history - # FIFO continues correctly after restart: adding m6 evicts m3, not a stale entry. + # FIFO continues correctly after restart: adding m6 evicts m3. let m6 = SdsMessage.init( messageId = "m6", lamportTimestamp = 6, @@ -305,15 +303,15 @@ suite "Persistence: write → restart → read-back": bloomFilter = @[], senderId = "alice", ) - await rm2.addToHistory(m6, testChannel) + check (await rm2.addToHistory(m6, testChannel)).isOk() + await rm2.tryUpdateHistory(testChannel) check "m3" notin store.log[testChannel] check "m6" in store.log[testChannel] await rm2.cleanup() asyncTest "dep-clear cascade resumes correctly across a restart": let store = newInMemoryStore() - let p1 = newInMemoryPersistence(store) - let rm1 = newReliabilityManager(participantId = "alice", persistence = p1).get() + let rm1 = newV2Manager(store) check (await rm1.ensureChannel(testChannel)).isOk() # Receive c (deps on b), then b (deps on a). Both must buffer. @@ -341,9 +339,8 @@ suite "Persistence: write → restart → read-back": check "b" in store.incoming[testChannel] await rm1.cleanup() - # Restart — both still buffered, with intact missingDeps. - let p2 = newInMemoryPersistence(store) - let rm2 = newReliabilityManager(participantId = "alice", persistence = p2).get() + # Restart — both still buffered with intact missingDeps. + let rm2 = newV2Manager(store) check (await rm2.ensureChannel(testChannel)).isOk() let inbuf = await rm2.getIncomingBuffer(testChannel) check "c" in inbuf @@ -364,7 +361,196 @@ suite "Persistence: write → restart → read-back": check "a" in history check "b" in history check "c" in history - # Buffer should be drained. let inbufFinal = await rm2.getIncomingBuffer(testChannel) check inbufFinal.len == 0 await rm2.cleanup() + +suite "Persistence: failure policy": + asyncTest "loadChannel failure surfaces as rePersistenceError on bootstrap": + # Bootstrap durability is the semantic intent of getOrCreateChannel — + # the caller asked us to materialise a channel and we can't do that + # without knowing prior state. So this op DOES propagate err on load + # failure (PLAN §8). + let store = newInMemoryStore() + store.failingOps.incl("loadChannel") + let rm = newReliabilityManager( + participantId = "alice", persistence = newInMemoryPersistence(store) + ) + .get() + let res = await rm.ensureChannel(testChannel) + check res.isErr() + check res.error == ReliabilityError.rePersistenceError + + asyncTest "saveChannelMeta failure during send does NOT surface — non-fatal policy": + # PLAN §8: persistence failures during foreground ops are logged but + # MUST NOT abort the op. The in-memory state is the source of truth; + # the next op's snapshot will re-synchronise on-disk state. This test + # is the inversion of the legacy "write failure surfaces as err" — + # the new policy is deliberate. + let store = newInMemoryStore() + let rm = newReliabilityManager( + participantId = "alice", persistence = newInMemoryPersistence(store) + ) + .get() + check (await rm.ensureChannel(testChannel)).isOk() + store.failingOps.incl("saveChannelMeta") + let res = await rm.wrapOutgoingMessage(@[byte(1)], "m1", testChannel) + # Op succeeds: bytes were produced, protocol state is correct in + # memory, the FFI caller is unaffected. + check res.isOk() + # In-memory state is correct even though disk save was rejected. + let buf = await rm.getOutgoingBuffer(testChannel) + check buf.len == 1 + check buf[0].message.messageId == "m1" + # Recovery: clear the failure, drive another op, disk catches up. + store.failingOps.excl("saveChannelMeta") + let res2 = await rm.wrapOutgoingMessage(@[byte(2)], "m2", testChannel) + check res2.isOk() + check "m1" in store.outgoing[testChannel] + check "m2" in store.outgoing[testChannel] + + asyncTest "updateHistory failure during send does NOT surface — non-fatal policy": + # Same policy applied to the history-update path. + let store = newInMemoryStore() + let rm = newReliabilityManager( + participantId = "alice", persistence = newInMemoryPersistence(store) + ) + .get() + check (await rm.ensureChannel(testChannel)).isOk() + store.failingOps.incl("updateHistory") + let res = await rm.wrapOutgoingMessage(@[byte(1)], "m1", testChannel) + check res.isOk() + check rm.channels[testChannel].messageHistory.len == 1 + + asyncTest "updateHistory failure is retried via R2 pending-write queue": + # Fix for PR #72 review comment #1: a failed history write must not + # silently drop the delta. The pending-write queue parks failed + # entries and retries them on the next op end. Once the backend + # recovers, the disk catches up automatically — no caller action + # needed, no err surfaced. + let store = newInMemoryStore() + let rm = newReliabilityManager( + participantId = "alice", persistence = newInMemoryPersistence(store) + ) + .get() + check (await rm.ensureChannel(testChannel)).isOk() + + # Failure 1: send m1 while updateHistory is broken. + store.failingOps.incl("updateHistory") + discard await rm.wrapOutgoingMessage(@[byte(1)], "m1", testChannel) + # In-memory state is correct; disk has no log entry for m1 yet. + check rm.channels[testChannel].messageHistory.len == 1 + check testChannel notin store.log or "m1" notin store.log[testChannel] + # Pending queue should be holding m1 for retry. + check rm.channels[testChannel].pendingHistoryAppends.len == 1 + check "m1" in rm.channels[testChannel].pendingHistoryAppends + + # Failure 2: send m2 while still broken. Pending should now hold both. + discard await rm.wrapOutgoingMessage(@[byte(2)], "m2", testChannel) + check rm.channels[testChannel].pendingHistoryAppends.len == 2 + check "m1" in rm.channels[testChannel].pendingHistoryAppends + check "m2" in rm.channels[testChannel].pendingHistoryAppends + # Still nothing on disk. + check testChannel notin store.log or store.log[testChannel].len == 0 + + # Recovery: clear the backend failure, send m3. The op-end flush + # should drain ALL pending entries plus the new one in a single call. + store.failingOps.excl("updateHistory") + discard await rm.wrapOutgoingMessage(@[byte(3)], "m3", testChannel) + check rm.channels[testChannel].pendingHistoryAppends.len == 0 + check "m1" in store.log[testChannel] + check "m2" in store.log[testChannel] + check "m3" in store.log[testChannel] + + asyncTest "evict-then-re-add merge rule preserves the re-added message on disk": + # Regression: with the original "evict-wins" merge rule, a message + # re-added (e.g. via SDS-R repair) after being evicted during a + # backend outage would have its append silently dropped because the + # id was still in pendingHistoryEvicts. The "latest-wins" rule fixes + # this — the re-add cancels the pending evict. + let store = newInMemoryStore() + var smallCfg = defaultConfig() + smallCfg.maxMessageHistory = 2 + smallCfg.bloomFilterCapacity = 2 + let rm = newReliabilityManager( + participantId = "alice", + config = smallCfg, + persistence = newInMemoryPersistence(store), + ) + .get() + check (await rm.ensureChannel(testChannel)).isOk() + + proc mkMsg(id: string, ts: int64): SdsMessage = + SdsMessage.init( + messageId = id, + lamportTimestamp = ts, + causalHistory = @[], + channelId = testChannel, + content = @[byte(ts)], + bloomFilter = @[], + senderId = "alice", + ) + + # Break the backend, then fill the channel past maxMessageHistory so + # m1 gets evicted while we have no successful flush yet. + store.failingOps.incl("updateHistory") + check (await rm.addToHistory(mkMsg("m1", 1), testChannel)).isOk() + await rm.tryUpdateHistory(testChannel) # fails — m1 queued + check (await rm.addToHistory(mkMsg("m2", 2), testChannel)).isOk() + check (await rm.addToHistory(mkMsg("m3", 3), testChannel)).isOk() + # m1 evicted by FIFO; pending should now have m2,m3 as appends and m1 as evict. + check "m1" notin rm.channels[testChannel].messageHistory + check "m1" in rm.channels[testChannel].pendingHistoryEvicts + check "m1" notin rm.channels[testChannel].pendingHistoryAppends + + # SDS-R-style re-delivery of m1. With latest-wins, this MUST cancel + # the pending evict and re-queue the append. + check (await rm.addToHistory(mkMsg("m1", 4), testChannel)).isOk() + check "m1" in rm.channels[testChannel].messageHistory + check "m1" notin rm.channels[testChannel].pendingHistoryEvicts + check "m1" in rm.channels[testChannel].pendingHistoryAppends + + # Recover and flush. m1 must land on disk. + store.failingOps.excl("updateHistory") + await rm.tryUpdateHistory(testChannel) + check "m1" in store.log[testChannel] + + asyncTest "pending queue survives idle ops (flush on next op without history changes)": + # Even if the next op makes no history changes of its own, it must + # still flush the pending queue at op end — otherwise a failed write + # could sit indefinitely if the application only ever does + # mark-deps-met-style ops after a failure. + let store = newInMemoryStore() + let rm = newReliabilityManager( + participantId = "alice", persistence = newInMemoryPersistence(store) + ) + .get() + check (await rm.ensureChannel(testChannel)).isOk() + + # Stage a pending entry by failing one send. + store.failingOps.incl("updateHistory") + discard await rm.wrapOutgoingMessage(@[byte(1)], "m1", testChannel) + check rm.channels[testChannel].pendingHistoryAppends.len == 1 + + # Now clear the failure and drive a markDependenciesMet on a no-op + # input — it has no history changes of its own but its op-end flush + # must still retry the queue. + store.failingOps.excl("updateHistory") + check (await rm.markDependenciesMet(@["nonexistent"], testChannel)).isOk() + check rm.channels[testChannel].pendingHistoryAppends.len == 0 + check "m1" in store.log[testChannel] + + asyncTest "dropChannel failure during removeChannel surfaces as rePersistenceError": + # Durability is the semantic intent of removeChannel — the caller + # asked us to confirm a disk wipe. We cannot silently lie. So this op + # DOES propagate err on failure (PLAN §8). + let store = newInMemoryStore() + let rm = newReliabilityManager( + participantId = "alice", persistence = newInMemoryPersistence(store) + ) + .get() + check (await rm.ensureChannel(testChannel)).isOk() + store.failingOps.incl("dropChannel") + let res = await rm.removeChannel(testChannel) + check res.isErr() + check res.error == ReliabilityError.rePersistenceError diff --git a/tests/test_reliability.nim b/tests/test_reliability.nim index 9c14fe5..915189e 100644 --- a/tests/test_reliability.nim +++ b/tests/test_reliability.nim @@ -1550,7 +1550,7 @@ suite "SDS-R: Lifecycle and State": discard await rm.wrapOutgoingMessage(@[byte(1)], "m1", testChannel) discard await rm.wrapOutgoingMessage(@[byte(2)], "m2", testChannel) - let entries = await rm.getRecentHistoryEntries(10, testChannel) + let entries = (await rm.getRecentHistoryEntries(10, testChannel)).get() check: entries.len == 2 entries[0].senderId == "alice" @@ -1704,7 +1704,7 @@ suite "SDS-R: Repair Sweep": minTimeRepairResp: getTime() + initDuration(minutes = 10), # far future ) - await rm.runRepairSweep() + check (await rm.runRepairSweep()).isOk() check: fireCount == 1 @@ -1733,7 +1733,7 @@ suite "SDS-R: Repair Sweep": minTimeRepairReq: getTime(), ) - await rm.runRepairSweep() + check (await rm.runRepairSweep()).isOk() check: "m-stale" notin channel.outgoingRepairBuffer @@ -1751,7 +1751,7 @@ suite "SDS-R: Repair Sweep": onRepairReady = proc(bytes: seq[byte], ch: SdsChannelID) {.gcsafe.} = fireCount += 1, ) - await rm.runRepairSweep() + check (await rm.runRepairSweep()).isOk() check fireCount == 0 # --- Multi-participant in-process bus for integration tests --------------- @@ -1890,7 +1890,7 @@ suite "SDS-R: Multi-Participant Integration": # Force alice's tResp to past just to be safe (it's already 0 for self), # then run her sweep. She rebroadcasts M1. alice.forceIncomingExpired("m1") - await alice.runRepairSweep() + check (await alice.runRepairSweep()).isOk() await bus.drain() # Bob now has M1 and M2 delivered. @@ -1922,7 +1922,7 @@ suite "SDS-R: Multi-Participant Integration": # Alice fires first (T_resp =0 for self). Her rebroadcast should cancel Carol's # pending entry when Carol receives the rebroadcast. alice.forceIncomingExpired("m1") - await alice.runRepairSweep() + check (await alice.runRepairSweep()).isOk() await bus.drain() # Carol's pending response must have been cleared by the dedup-path cleanup. @@ -1930,7 +1930,7 @@ suite "SDS-R: Multi-Participant Integration": # Even if we now force-run Carol's sweep, nothing should fire. let wireCountBefore = bus.wireLog.len - await carol.runRepairSweep() + check (await carol.runRepairSweep()).isOk() await bus.drain() check bus.wireLog.len == wireCountBefore diff --git a/tests/test_snapshot_codec.nim b/tests/test_snapshot_codec.nim new file mode 100644 index 0000000..be52365 --- /dev/null +++ b/tests/test_snapshot_codec.nim @@ -0,0 +1,235 @@ +## Round-trip tests for the snapshot persistence codec. +## Each `encode` → `decode` cycle must preserve every field exactly. + +import std/[times, sets, unittest] +import results +import ../sds/snapshot_codec +import + ../sds/types/[ + sds_message, sds_message_id, history_entry, unacknowledged_message, + incoming_message, repair_entry, + ] + +converter toParticipantID(s: string): SdsParticipantID = + s.SdsParticipantID + +proc mkMsg(id: string, ts: int64 = 1, content: seq[byte] = @[]): SdsMessage = + SdsMessage.init( + messageId = id, + lamportTimestamp = ts, + causalHistory = @[], + channelId = "chan", + content = content, + bloomFilter = @[], + senderId = "alice", + repairRequest = @[], + ) + +proc mkHistEntry(id: string): HistoryEntry = + HistoryEntry.init(messageId = id, senderId = "alice") + +suite "snapshot codec — ChannelMeta": + test "empty meta round-trips": + let m = ChannelMeta.init() + let buf = encode(m).buffer + let dec = ChannelMeta.decode(buf).get() + check: + dec.schemaVersion == ChannelMetaSchemaVersion + dec.lamportTimestamp == 0 + dec.outgoingBuffer.len == 0 + dec.incomingBuffer.len == 0 + dec.outgoingRepairBuffer.len == 0 + dec.incomingRepairBuffer.len == 0 + + test "meta with lamport and single outgoing entry": + var m = ChannelMeta.init() + m.lamportTimestamp = 42 + m.outgoingBuffer.add( + UnacknowledgedMessage.init( + message = mkMsg("m1", 42, @[1.byte, 2, 3]), + sendTime = fromUnix(1_700_000_000), + resendAttempts = 2, + ) + ) + let buf = encode(m).buffer + let dec = ChannelMeta.decode(buf).get() + check: + dec.lamportTimestamp == 42 + dec.outgoingBuffer.len == 1 + dec.outgoingBuffer[0].message.messageId == "m1" + dec.outgoingBuffer[0].message.content == @[1.byte, 2, 3] + dec.outgoingBuffer[0].resendAttempts == 2 + dec.outgoingBuffer[0].sendTime.toUnix == 1_700_000_000 + + test "meta with incoming entry carrying missing deps": + var m = ChannelMeta.init() + var deps = initHashSet[SdsMessageID]() + deps.incl("dep1") + deps.incl("dep2") + m.incomingBuffer.add( + IncomingMessage.init(message = mkMsg("m2"), missingDeps = deps) + ) + let buf = encode(m).buffer + let dec = ChannelMeta.decode(buf).get() + check: + dec.incomingBuffer.len == 1 + dec.incomingBuffer[0].message.messageId == "m2" + dec.incomingBuffer[0].missingDeps == deps + + test "meta with both repair buffers populated": + var m = ChannelMeta.init() + m.outgoingRepairBuffer.add( + OutgoingRepairKV( + messageId: "missing1", + entry: OutgoingRepairEntry.init( + outHistEntry = mkHistEntry("missing1"), + minTimeRepairReq = fromUnix(1_700_000_100), + ), + ) + ) + m.incomingRepairBuffer.add( + IncomingRepairKV( + messageId: "requested1", + entry: IncomingRepairEntry.init( + inHistEntry = mkHistEntry("requested1"), + cachedMessage = @[9.byte, 8, 7, 6], + minTimeRepairResp = fromUnix(1_700_000_200), + ), + ) + ) + let buf = encode(m).buffer + let dec = ChannelMeta.decode(buf).get() + check: + dec.outgoingRepairBuffer.len == 1 + dec.outgoingRepairBuffer[0].messageId == "missing1" + dec.outgoingRepairBuffer[0].entry.minTimeRepairReq.toUnix == + 1_700_000_100 + dec.incomingRepairBuffer.len == 1 + dec.incomingRepairBuffer[0].messageId == "requested1" + dec.incomingRepairBuffer[0].entry.cachedMessage == @[9.byte, 8, 7, 6] + dec.incomingRepairBuffer[0].entry.minTimeRepairResp.toUnix == + 1_700_000_200 + + test "fully-populated meta — multiple entries each buffer": + var m = ChannelMeta.init() + m.lamportTimestamp = 999 + for i in 0 ..< 5: + m.outgoingBuffer.add( + UnacknowledgedMessage.init( + message = mkMsg("o" & $i, int64(i), @[byte(i)]), + sendTime = fromUnix(1_700_000_000 + i.int64), + resendAttempts = i, + ) + ) + for i in 0 ..< 3: + var deps = initHashSet[SdsMessageID]() + deps.incl("dep" & $i) + m.incomingBuffer.add( + IncomingMessage.init( + message = mkMsg("i" & $i, int64(100 + i)), missingDeps = deps + ) + ) + for i in 0 ..< 4: + m.outgoingRepairBuffer.add( + OutgoingRepairKV( + messageId: "or" & $i, + entry: OutgoingRepairEntry.init( + outHistEntry = mkHistEntry("or" & $i), + minTimeRepairReq = fromUnix(1_700_000_300 + i.int64), + ), + ) + ) + for i in 0 ..< 2: + m.incomingRepairBuffer.add( + IncomingRepairKV( + messageId: "ir" & $i, + entry: IncomingRepairEntry.init( + inHistEntry = mkHistEntry("ir" & $i), + cachedMessage = @[byte(i), byte(i + 1)], + minTimeRepairResp = fromUnix(1_700_000_400 + i.int64), + ), + ) + ) + let buf = encode(m).buffer + let dec = ChannelMeta.decode(buf).get() + check: + dec.lamportTimestamp == 999 + dec.outgoingBuffer.len == 5 + dec.incomingBuffer.len == 3 + dec.outgoingRepairBuffer.len == 4 + dec.incomingRepairBuffer.len == 2 + dec.outgoingBuffer[4].message.messageId == "o4" + dec.outgoingBuffer[4].resendAttempts == 4 + dec.outgoingRepairBuffer[3].messageId == "or3" + dec.incomingRepairBuffer[1].entry.cachedMessage == @[1.byte, 2] + + test "decoder rejects unknown schemaVersion": + var m = ChannelMeta.init() + m.schemaVersion = 999'u32 + let buf = encode(m).buffer + check ChannelMeta.decode(buf).isErr + +suite "snapshot codec — ChannelData": + test "empty channel data round-trips": + let d = ChannelData.init() + let buf = encode(d).buffer + let dec = ChannelData.decode(buf).get() + check: + dec.meta.schemaVersion == ChannelMetaSchemaVersion + dec.messageHistory.len == 0 + + test "channel data with meta and history preserves order": + var d = ChannelData.init() + d.meta.lamportTimestamp = 17 + d.messageHistory.add(mkMsg("h1", 1)) + d.messageHistory.add(mkMsg("h2", 2)) + d.messageHistory.add(mkMsg("h3", 3)) + let buf = encode(d).buffer + let dec = ChannelData.decode(buf).get() + check: + dec.meta.lamportTimestamp == 17 + dec.messageHistory.len == 3 + dec.messageHistory[0].messageId == "h1" + dec.messageHistory[1].messageId == "h2" + dec.messageHistory[2].messageId == "h3" + +suite "snapshot codec — HistoryUpdate": + test "empty update reports isEmpty (callers skip persistence)": + # By contract (HistoryUpdate doc): when both append and evict are + # empty, callers MUST skip the persistence call entirely. The codec + # is not required to round-trip an empty update — minprotobuf's + # finish() refuses an empty buffer, by design. + let u = HistoryUpdate.init() + check u.isEmpty + + test "append-only update": + var u = HistoryUpdate.init() + u.append.add(mkMsg("a1")) + u.append.add(mkMsg("a2")) + let buf = encode(u).buffer + let dec = HistoryUpdate.decode(buf).get() + check: + dec.append.len == 2 + dec.append[0].messageId == "a1" + dec.append[1].messageId == "a2" + dec.evict.len == 0 + + test "evict-only update": + var u = HistoryUpdate.init() + u.evict = @["e1", "e2", "e3"] + let buf = encode(u).buffer + let dec = HistoryUpdate.decode(buf).get() + check: + dec.append.len == 0 + dec.evict == @["e1", "e2", "e3"] + + test "mixed append + evict update": + var u = HistoryUpdate.init() + u.append.add(mkMsg("new")) + u.evict = @["old1", "old2"] + let buf = encode(u).buffer + let dec = HistoryUpdate.decode(buf).get() + check: + dec.append.len == 1 + dec.append[0].messageId == "new" + dec.evict == @["old1", "old2"]