diff --git a/logos_delivery/channels/reliable_channel.nim b/logos_delivery/channels/reliable_channel.nim index de4365a3d..5a7ab24d4 100644 --- a/logos_delivery/channels/reliable_channel.nim +++ b/logos_delivery/channels/reliable_channel.nim @@ -403,8 +403,18 @@ proc onMessageReceived( ## SDS returns every payload deliverable now, in causal order — the ## message itself plus any parked segments it released. Empty = consumed - ## by SDS; `err` = not a decodable SDS envelope. Both drop here. + ## by SDS (parked or duplicate). `err` is a real ingress failure here: the + ## marker/contentTopic filter already ran, so surface it as an error event + ## rather than dropping it silently. let deliverable = (await self.sdsHandler.handleIncoming(plaintextBytes)).valueOr: + MessageErrorEvent.emit( + self.brokerCtx, + MessageErrorEvent( + requestId: RequestId(""), + messageHash: messageHash, + error: "SDS handleIncoming failed: " & error, + ), + ) return for content in deliverable: self.reportReceived(content) diff --git a/logos_delivery/channels/scalable_data_sync/scalable_data_sync.nim b/logos_delivery/channels/scalable_data_sync/scalable_data_sync.nim index 975e02461..8da0c8530 100644 --- a/logos_delivery/channels/scalable_data_sync/scalable_data_sync.nim +++ b/logos_delivery/channels/scalable_data_sync/scalable_data_sync.nim @@ -9,10 +9,10 @@ {.push raises: [].} import std/[options, tables] +from std/times import initDuration, getTime, toUnix, nanosecond import results, chronos, chronicles import nimcrypto/keccak import stew/byteutils -from std/times import initDuration, getTime, toUnix, nanosecond import sds import message as sds_message @@ -27,8 +27,10 @@ const DefaultAcknowledgementTimeoutMs* = 5_000 DefaultMaxRetransmissions* = 5 DefaultCausalHistorySize* = 2 - MaxPendingContent = 1024 + MaxPendingContent = 32 ## Bound on segments parked while their causal dependencies are missing. + ## Kept small on purpose: at ~100KiB max per segment, 32 caps the stash + ## at ~3MiB per channel. Raise only if a real backlog need shows up. type SdsConfig* = object @@ -43,7 +45,7 @@ type ## Invoked with a full SDS envelope to rebroadcast (SDS-R repair). SdsHandler* = ref object - rm: ReliabilityManager + reliabilityManager: ReliabilityManager channelId: SdsChannelID pendingContent: OrderedTable[SdsMessageID, seq[byte]] ## Segments parked until their causal dependencies arrive. @@ -74,9 +76,11 @@ proc computeMessageId(self: SdsHandler, payload: seq[byte]): SdsMessageID = proc installCallbacks(self: SdsHandler) = ## Direct field assignment is race-free here: no periodic task or protocol ## op has started yet. - self.rm.onMessageReady = proc( + self.reliabilityManager.onMessageReady = proc( messageId: SdsMessageID, channelId: SdsChannelID ) {.gcsafe.} = + ## An SDS "message" is one channel segment here — each segment is wrapped + ## into its own SDS message — so this is effectively `onSegmentReady`. ## Fires during unwrap, under the manager lock — must stay synchronous. ## Collect only; `handleIncoming` delivers after the direct content. ## The manager owns a single channel, so `channelId` is always ours; the @@ -86,12 +90,12 @@ proc installCallbacks(self: SdsHandler) = self.released.add(self.pendingContent.getOrDefault(messageId)) self.pendingContent.del(messageId) - self.rm.onMessageSent = proc( + self.reliabilityManager.onMessageSent = proc( messageId: SdsMessageID, channelId: SdsChannelID ) {.gcsafe.} = debug "SDS message acknowledged", channelId, messageId - self.rm.onMissingDependencies = proc( + self.reliabilityManager.onMissingDependencies = proc( messageId: SdsMessageID, missingDeps: seq[HistoryEntry], channelId: SdsChannelID ) {.gcsafe.} = ## Recovery via SDS sync / SDS-R for now; targeted store fetch by @@ -99,7 +103,9 @@ proc installCallbacks(self: SdsHandler) = debug "SDS message has missing dependencies", channelId, messageId, missing = missingDeps.len - self.rm.onRepairReady = proc(message: seq[byte], channelId: SdsChannelID) {.gcsafe.} = + self.reliabilityManager.onRepairReady = proc( + message: seq[byte], channelId: SdsChannelID + ) {.gcsafe.} = if not self.onRebroadcast.isNil(): self.onRebroadcast(message) @@ -120,7 +126,7 @@ proc new*( participantId, reliabilityConfig, config.persistence.get(noOpPersistence()) ) let handler = T( - rm: rm, + reliabilityManager: rm, channelId: channelId, pendingContent: initOrderedTable[SdsMessageID, seq[byte]](), released: @[], @@ -135,11 +141,11 @@ proc start*(self: SdsHandler) = ## lazily on first use: `wrapOutgoing` and `handleIncoming` both ensure ## the channel, and `handleIncoming` loads before its duplicate check so a ## replay right after a restart is still caught. - self.rm.startPeriodicTasks() + self.reliabilityManager.startPeriodicTasks() proc stop*(self: SdsHandler) {.async: (raises: []).} = ## Cancels the background loops. Persisted state is left intact. - await self.rm.cleanup() + await self.reliabilityManager.cleanup() proc wrapOutgoing*( self: SdsHandler, payload: seq[byte] @@ -147,7 +153,7 @@ proc wrapOutgoing*( ## Wraps a segment with reliability metadata and registers it in the SDS ## outgoing buffer awaiting end-to-end acknowledgement. let wrapped = ( - await self.rm.wrapOutgoingMessage( + await self.reliabilityManager.wrapOutgoingMessage( payload, self.computeMessageId(payload), self.channelId ) ).valueOr: @@ -160,7 +166,7 @@ proc handleIncoming*( ## Returns the payloads deliverable now, in causal order. Empty when SDS ## consumed the message; `err` when the bytes are not an SDS envelope. let msg = deserializeMessage(wire).valueOr: - return err("SDS deserialization failed") + return err("SDS deserialization failed: " & $error) ## Pre-filter: `unwrapReceivedMessage` auto-creates the channel it sees on ## the wire, so foreign traffic must not reach it. @@ -169,48 +175,63 @@ proc handleIncoming*( channelId = self.channelId, wireChannelId = msg.channelId return ok(newSeq[seq[byte]]()) + ## Only the lock acquisition can raise (CancelledError); the unwrap work + ## below is `raises: []`, so the try stays scoped to exactly the acquire. try: await self.ingressLock.acquire() - try: - ## Load persisted state before the duplicate check, so a replay right - ## after a restart is not re-delivered. Idempotent, cheap once loaded. - (await self.rm.ensureChannel(self.channelId)).isOkOr: - return err("SDS ensureChannel failed: " & $error) + except CancelledError: + return err("SDS handleIncoming cancelled before acquiring ingress lock") - ## The unwrap result does not distinguish first delivery from - ## duplicate, so capture delivered-before up front. - let ctx = self.rm.channels.getOrDefault(self.channelId) - let isDuplicate = not ctx.isNil() and msg.messageId in ctx.messageHistory + ## Funnel every unwrap outcome into `res` so the lock is released once on + ## the tail path, where `releaseIngressLock` can surface its own error. + var res: Result[seq[seq[byte]], string] + block ingress: + ## Load persisted state before the duplicate check, so a replay right + ## after a restart is not re-delivered. Idempotent, cheap once loaded. + (await self.reliabilityManager.ensureChannel(self.channelId)).isOkOr: + res = err("SDS ensureChannel failed: " & $error) + break ingress - self.released.setLen(0) - let unwrapped = (await self.rm.unwrapReceivedMessage(wire)).valueOr: - return err("SDS unwrap failed: " & $error) + ## The unwrap result does not distinguish first delivery from + ## duplicate, so capture delivered-before up front. + let ctx = self.reliabilityManager.channels.getOrDefault(self.channelId) + let isDuplicate = not ctx.isNil() and msg.messageId in ctx.messageHistory - if isDuplicate: - return ok(newSeq[seq[byte]]()) + self.released.setLen(0) + let unwrapped = (await self.reliabilityManager.unwrapReceivedMessage(wire)).valueOr: + res = err("SDS unwrap failed: " & $error) + break ingress - if unwrapped.missingDeps.len > 0: - if self.pendingContent.len >= MaxPendingContent: - var oldest: SdsMessageID - for k in self.pendingContent.keys: - oldest = k - break - self.pendingContent.del(oldest) - warn "SDS pending-content stash full, dropping oldest entry", - channelId = self.channelId, dropped = oldest - self.pendingContent[msg.messageId] = unwrapped.message - return ok(newSeq[seq[byte]]()) + if isDuplicate: + res = ok(newSeq[seq[byte]]()) + break ingress - var deliverable = newSeq[seq[byte]]() - if unwrapped.message.len > 0: - ## Empty content is sync traffic: causal metadata only. - deliverable.add(unwrapped.message) - deliverable.add(self.released) - self.released.setLen(0) - return ok(deliverable) - finally: - self.ingressLock.release() - except CatchableError: - return err("SDS handleIncoming failed: " & getCurrentExceptionMsg()) + if unwrapped.missingDeps.len > 0: + if self.pendingContent.len >= MaxPendingContent: + var oldest: SdsMessageID + for k in self.pendingContent.keys: + oldest = k + break + self.pendingContent.del(oldest) + warn "SDS pending-content stash full, dropping oldest entry", + channelId = self.channelId, dropped = oldest + self.pendingContent[msg.messageId] = unwrapped.message + res = ok(newSeq[seq[byte]]()) + break ingress + + var deliverable = newSeq[seq[byte]]() + if unwrapped.message.len > 0: + ## Empty content is sync traffic: causal metadata only. + deliverable.add(unwrapped.message) + deliverable.add(self.released) + self.released.setLen(0) + res = ok(deliverable) + + try: + self.ingressLock.release() + except AsyncLockError as e: + return err("SDS ingress lock release failed: " & e.msg) + + return res {.pop.}