mirror of
https://github.com/logos-messaging/nim-sds.git
synced 2026-07-02 22:10:13 +00:00
Tackle review result of #72
This commit is contained in:
parent
abc36f61aa
commit
67ccdaf9e3
@ -1,7 +1,7 @@
|
||||
<!-- gitnexus:start -->
|
||||
# GitNexus — Code Intelligence
|
||||
|
||||
This project is indexed by GitNexus as **nim-sds** (1100 symbols, 1820 relationships, 66 execution flows). Use the GitNexus MCP tools to understand code, assess impact, and navigate safely.
|
||||
This project is indexed by GitNexus as **nim-sds** (1039 symbols, 1731 relationships, 62 execution flows). Use the GitNexus MCP tools to understand code, assess impact, and navigate safely.
|
||||
|
||||
> If any GitNexus tool warns the index is stale, run `npx gitnexus analyze` in terminal first.
|
||||
|
||||
|
||||
@ -166,7 +166,7 @@ If using Nix, also recalculate the fixed-output hash in `nix/deps.nix` after upd
|
||||
<!-- gitnexus:start -->
|
||||
# GitNexus — Code Intelligence
|
||||
|
||||
This project is indexed by GitNexus as **nim-sds** (1100 symbols, 1820 relationships, 66 execution flows). Use the GitNexus MCP tools to understand code, assess impact, and navigate safely.
|
||||
This project is indexed by GitNexus as **nim-sds** (1039 symbols, 1731 relationships, 62 execution flows). Use the GitNexus MCP tools to understand code, assess impact, and navigate safely.
|
||||
|
||||
> If any GitNexus tool warns the index is stale, run `npx gitnexus analyze` in terminal first.
|
||||
|
||||
|
||||
2
sds.nim
2
sds.nim
@ -106,7 +106,7 @@ proc wrapOutgoingMessage*(
|
||||
return err(error)
|
||||
|
||||
let bfResult = serializeBloomFilter(channel.bloomFilter.filter)
|
||||
if bfResult.isErr:
|
||||
if bfResult.isErr():
|
||||
error "Failed to serialize bloom filter", channelId = channelId
|
||||
return err(ReliabilityError.reSerializationError)
|
||||
|
||||
|
||||
@ -15,22 +15,7 @@ export
|
||||
proc defaultConfig*(): ReliabilityConfig =
|
||||
return ReliabilityConfig.init()
|
||||
|
||||
proc reliabilityErr*(detail: string): ReliabilityError {.gcsafe, raises: [].} =
|
||||
## Maps a backend-supplied persistence error string onto the
|
||||
## `rePersistenceError` enum value. The enum carries no payload, so the
|
||||
## original detail is logged here — this is the single point where a
|
||||
## persistence failure is recorded, while the enum value travels up the
|
||||
## `Result` chain to the public API caller, who decides what to do.
|
||||
##
|
||||
## With the snapshot-based Persistence interface, most protocol ops no
|
||||
## longer propagate persistence errors at all — they log and continue
|
||||
## (see PLAN_SNAPSHOT_PERSISTENCE.md §8). This helper is still used by
|
||||
## the durability-intent ops (removeChannel, resetReliabilityManager,
|
||||
## getOrCreateChannel) that retain err-on-failure semantics.
|
||||
warn "persistence operation failed", detail = detail
|
||||
ReliabilityError.rePersistenceError
|
||||
|
||||
proc snapshotMeta*(channel: ChannelContext): ChannelMeta {.gcsafe, raises: [].} =
|
||||
proc snapshotMeta(channel: ChannelContext): ChannelMeta {.gcsafe, raises: [].} =
|
||||
## Captures the current in-memory state of a `ChannelContext` as a
|
||||
## `ChannelMeta` blob, suitable for `Persistence.saveChannelMeta`.
|
||||
##
|
||||
@ -39,16 +24,17 @@ proc snapshotMeta*(channel: ChannelContext): ChannelMeta {.gcsafe, raises: [].}
|
||||
## (see PLAN §6). The bloom filter and message history are intentionally
|
||||
## excluded — the former is rebuilt from the latter on bootstrap, and
|
||||
## the latter is persisted separately via `updateHistory`.
|
||||
result = ChannelMeta.init()
|
||||
result.lamportTimestamp = channel.lamportTimestamp
|
||||
var meta = ChannelMeta.init()
|
||||
meta.lamportTimestamp = channel.lamportTimestamp
|
||||
for u in channel.outgoingBuffer:
|
||||
result.outgoingBuffer.add(u)
|
||||
meta.outgoingBuffer.add(u)
|
||||
for _, m in channel.incomingBuffer.pairs:
|
||||
result.incomingBuffer.add(m)
|
||||
meta.incomingBuffer.add(m)
|
||||
for id, e in channel.outgoingRepairBuffer.pairs:
|
||||
result.outgoingRepairBuffer.add(OutgoingRepairKV(messageId: id, entry: e))
|
||||
meta.outgoingRepairBuffer.add(OutgoingRepairKV(messageId: id, entry: e))
|
||||
for id, e in channel.incomingRepairBuffer.pairs:
|
||||
result.incomingRepairBuffer.add(IncomingRepairKV(messageId: id, entry: e))
|
||||
meta.incomingRepairBuffer.add(IncomingRepairKV(messageId: id, entry: e))
|
||||
return meta
|
||||
|
||||
proc trySaveMeta*(
|
||||
rm: ReliabilityManager, channelId: SdsChannelID, channel: ChannelContext
|
||||
@ -59,12 +45,11 @@ proc trySaveMeta*(
|
||||
##
|
||||
## This helper is the single point where snapshot-save failures are
|
||||
## logged; callers do not need to handle the Result.
|
||||
let res = await rm.persistence.saveChannelMeta(channelId, snapshotMeta(channel))
|
||||
if res.isErr:
|
||||
(await rm.persistence.saveChannelMeta(channelId, snapshotMeta(channel))).isOkOr:
|
||||
warn "snapshot save failed; in-memory state authoritative, next op will retry",
|
||||
channelId = channelId, detail = res.error
|
||||
channelId = channelId, detail = error
|
||||
|
||||
proc queueHistoryAppend*(channel: ChannelContext, msgId: SdsMessageID) =
|
||||
proc queueHistoryAppend(channel: ChannelContext, msgId: SdsMessageID) =
|
||||
## Push an append onto the pending history queue. Only the id is
|
||||
## stored — the full SdsMessage is looked up from `messageHistory` at
|
||||
## flush time (invariant: every queued id is present in messageHistory).
|
||||
@ -76,7 +61,7 @@ proc queueHistoryAppend*(channel: ChannelContext, msgId: SdsMessageID) =
|
||||
channel.pendingHistoryEvicts.excl(msgId)
|
||||
channel.pendingHistoryAppends.incl(msgId)
|
||||
|
||||
proc queueHistoryEvict*(channel: ChannelContext, msgId: SdsMessageID) =
|
||||
proc queueHistoryEvict(channel: ChannelContext, msgId: SdsMessageID) =
|
||||
## Push an evict onto the pending history queue. Merge rule symmetric
|
||||
## with `queueHistoryAppend`: cancels any pending append for the same
|
||||
## id (the just-evicted message no longer needs to be persisted as an
|
||||
@ -119,8 +104,7 @@ proc tryUpdateHistory*(
|
||||
except KeyError:
|
||||
return # checked `in` above; unreachable, but tables can raise per spec
|
||||
|
||||
if channel.pendingHistoryAppends.len == 0 and
|
||||
channel.pendingHistoryEvicts.len == 0:
|
||||
if channel.pendingHistoryAppends.len == 0 and channel.pendingHistoryEvicts.len == 0:
|
||||
return # nothing to flush — no round-trip cost
|
||||
|
||||
var batch = HistoryUpdate.init()
|
||||
@ -151,8 +135,7 @@ proc tryUpdateHistory*(
|
||||
detail = res.error
|
||||
if channel.pendingHistoryAppends.len > rm.config.maxMessageHistory:
|
||||
warn "pending history queue exceeds maxMessageHistory; backend may be stuck",
|
||||
channelId = channelId,
|
||||
pendingAppends = channel.pendingHistoryAppends.len
|
||||
channelId = channelId, pendingAppends = channel.pendingHistoryAppends.len
|
||||
|
||||
proc dropChannelFromPersistence*(
|
||||
rm: ReliabilityManager, channelId: SdsChannelID
|
||||
@ -165,7 +148,8 @@ proc dropChannelFromPersistence*(
|
||||
## err on failure (durability is the semantic intent — the caller asked
|
||||
## us to confirm a disk wipe; we cannot silently lie). See PLAN §8.
|
||||
(await rm.persistence.dropChannel(channelId)).isOkOr:
|
||||
return err(reliabilityErr(error))
|
||||
warn "persistence operation failed", cause = error
|
||||
return err(ReliabilityError.rePersistenceError)
|
||||
ok()
|
||||
|
||||
proc cleanup*(rm: ReliabilityManager) {.async: (raises: []).} =
|
||||
@ -348,10 +332,9 @@ proc getRecentHistoryEntries*(
|
||||
# Phase 2B: best-effort hint persistence via V2. Non-fatal —
|
||||
# hints are an optimisation; a missing hint just means the
|
||||
# peer falls back to slower retrieval.
|
||||
let hintRes = await rm.persistence.setRetrievalHint(
|
||||
msgId, entry.retrievalHint
|
||||
)
|
||||
if hintRes.isErr:
|
||||
let hintRes =
|
||||
await rm.persistence.setRetrievalHint(msgId, entry.retrievalHint)
|
||||
if hintRes.isErr():
|
||||
warn "retrieval hint save failed; continuing",
|
||||
msgId = msgId, detail = hintRes.error
|
||||
entry.senderId = channel.messageHistory[msgId].senderId
|
||||
@ -456,7 +439,9 @@ proc getOrCreateChannel*(
|
||||
)
|
||||
)
|
||||
let data = (await rm.persistence.loadChannel(channelId)).valueOr:
|
||||
return err(reliabilityErr(error))
|
||||
warn "persistence operation failed", cause = error
|
||||
return err(ReliabilityError.rePersistenceError)
|
||||
|
||||
channel.lamportTimestamp = data.meta.lamportTimestamp
|
||||
# Backend contract: messageHistory MUST be ordered oldest-first.
|
||||
# If a backend violates this, FIFO eviction breaks across restarts.
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user