mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-06-26 11:29:28 +00:00
parent
c3090fb62f
commit
066838aa03
@ -403,8 +403,18 @@ proc onMessageReceived(
|
||||
|
||||
## SDS returns every payload deliverable now, in causal order — the
|
||||
## message itself plus any parked segments it released. Empty = consumed
|
||||
## by SDS; `err` = not a decodable SDS envelope. Both drop here.
|
||||
## by SDS (parked or duplicate). `err` is a real ingress failure here: the
|
||||
## marker/contentTopic filter already ran, so surface it as an error event
|
||||
## rather than dropping it silently.
|
||||
let deliverable = (await self.sdsHandler.handleIncoming(plaintextBytes)).valueOr:
|
||||
MessageErrorEvent.emit(
|
||||
self.brokerCtx,
|
||||
MessageErrorEvent(
|
||||
requestId: RequestId(""),
|
||||
messageHash: messageHash,
|
||||
error: "SDS handleIncoming failed: " & error,
|
||||
),
|
||||
)
|
||||
return
|
||||
for content in deliverable:
|
||||
self.reportReceived(content)
|
||||
|
||||
@ -9,10 +9,10 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import std/[options, tables]
|
||||
from std/times import initDuration, getTime, toUnix, nanosecond
|
||||
import results, chronos, chronicles
|
||||
import nimcrypto/keccak
|
||||
import stew/byteutils
|
||||
from std/times import initDuration, getTime, toUnix, nanosecond
|
||||
|
||||
import sds
|
||||
import message as sds_message
|
||||
@ -27,8 +27,10 @@ const
|
||||
DefaultAcknowledgementTimeoutMs* = 5_000
|
||||
DefaultMaxRetransmissions* = 5
|
||||
DefaultCausalHistorySize* = 2
|
||||
MaxPendingContent = 1024
|
||||
MaxPendingContent = 32
|
||||
## Bound on segments parked while their causal dependencies are missing.
|
||||
## Kept small on purpose: at ~100KiB max per segment, 32 caps the stash
|
||||
## at ~3MiB per channel. Raise only if a real backlog need shows up.
|
||||
|
||||
type
|
||||
SdsConfig* = object
|
||||
@ -43,7 +45,7 @@ type
|
||||
## Invoked with a full SDS envelope to rebroadcast (SDS-R repair).
|
||||
|
||||
SdsHandler* = ref object
|
||||
rm: ReliabilityManager
|
||||
reliabilityManager: ReliabilityManager
|
||||
channelId: SdsChannelID
|
||||
pendingContent: OrderedTable[SdsMessageID, seq[byte]]
|
||||
## Segments parked until their causal dependencies arrive.
|
||||
@ -74,9 +76,11 @@ proc computeMessageId(self: SdsHandler, payload: seq[byte]): SdsMessageID =
|
||||
proc installCallbacks(self: SdsHandler) =
|
||||
## Direct field assignment is race-free here: no periodic task or protocol
|
||||
## op has started yet.
|
||||
self.rm.onMessageReady = proc(
|
||||
self.reliabilityManager.onMessageReady = proc(
|
||||
messageId: SdsMessageID, channelId: SdsChannelID
|
||||
) {.gcsafe.} =
|
||||
## An SDS "message" is one channel segment here — each segment is wrapped
|
||||
## into its own SDS message — so this is effectively `onSegmentReady`.
|
||||
## Fires during unwrap, under the manager lock — must stay synchronous.
|
||||
## Collect only; `handleIncoming` delivers after the direct content.
|
||||
## The manager owns a single channel, so `channelId` is always ours; the
|
||||
@ -86,12 +90,12 @@ proc installCallbacks(self: SdsHandler) =
|
||||
self.released.add(self.pendingContent.getOrDefault(messageId))
|
||||
self.pendingContent.del(messageId)
|
||||
|
||||
self.rm.onMessageSent = proc(
|
||||
self.reliabilityManager.onMessageSent = proc(
|
||||
messageId: SdsMessageID, channelId: SdsChannelID
|
||||
) {.gcsafe.} =
|
||||
debug "SDS message acknowledged", channelId, messageId
|
||||
|
||||
self.rm.onMissingDependencies = proc(
|
||||
self.reliabilityManager.onMissingDependencies = proc(
|
||||
messageId: SdsMessageID, missingDeps: seq[HistoryEntry], channelId: SdsChannelID
|
||||
) {.gcsafe.} =
|
||||
## Recovery via SDS sync / SDS-R for now; targeted store fetch by
|
||||
@ -99,7 +103,9 @@ proc installCallbacks(self: SdsHandler) =
|
||||
debug "SDS message has missing dependencies",
|
||||
channelId, messageId, missing = missingDeps.len
|
||||
|
||||
self.rm.onRepairReady = proc(message: seq[byte], channelId: SdsChannelID) {.gcsafe.} =
|
||||
self.reliabilityManager.onRepairReady = proc(
|
||||
message: seq[byte], channelId: SdsChannelID
|
||||
) {.gcsafe.} =
|
||||
if not self.onRebroadcast.isNil():
|
||||
self.onRebroadcast(message)
|
||||
|
||||
@ -120,7 +126,7 @@ proc new*(
|
||||
participantId, reliabilityConfig, config.persistence.get(noOpPersistence())
|
||||
)
|
||||
let handler = T(
|
||||
rm: rm,
|
||||
reliabilityManager: rm,
|
||||
channelId: channelId,
|
||||
pendingContent: initOrderedTable[SdsMessageID, seq[byte]](),
|
||||
released: @[],
|
||||
@ -135,11 +141,11 @@ proc start*(self: SdsHandler) =
|
||||
## lazily on first use: `wrapOutgoing` and `handleIncoming` both ensure
|
||||
## the channel, and `handleIncoming` loads before its duplicate check so a
|
||||
## replay right after a restart is still caught.
|
||||
self.rm.startPeriodicTasks()
|
||||
self.reliabilityManager.startPeriodicTasks()
|
||||
|
||||
proc stop*(self: SdsHandler) {.async: (raises: []).} =
|
||||
## Cancels the background loops. Persisted state is left intact.
|
||||
await self.rm.cleanup()
|
||||
await self.reliabilityManager.cleanup()
|
||||
|
||||
proc wrapOutgoing*(
|
||||
self: SdsHandler, payload: seq[byte]
|
||||
@ -147,7 +153,7 @@ proc wrapOutgoing*(
|
||||
## Wraps a segment with reliability metadata and registers it in the SDS
|
||||
## outgoing buffer awaiting end-to-end acknowledgement.
|
||||
let wrapped = (
|
||||
await self.rm.wrapOutgoingMessage(
|
||||
await self.reliabilityManager.wrapOutgoingMessage(
|
||||
payload, self.computeMessageId(payload), self.channelId
|
||||
)
|
||||
).valueOr:
|
||||
@ -160,7 +166,7 @@ proc handleIncoming*(
|
||||
## Returns the payloads deliverable now, in causal order. Empty when SDS
|
||||
## consumed the message; `err` when the bytes are not an SDS envelope.
|
||||
let msg = deserializeMessage(wire).valueOr:
|
||||
return err("SDS deserialization failed")
|
||||
return err("SDS deserialization failed: " & $error)
|
||||
|
||||
## Pre-filter: `unwrapReceivedMessage` auto-creates the channel it sees on
|
||||
## the wire, so foreign traffic must not reach it.
|
||||
@ -169,48 +175,63 @@ proc handleIncoming*(
|
||||
channelId = self.channelId, wireChannelId = msg.channelId
|
||||
return ok(newSeq[seq[byte]]())
|
||||
|
||||
## Only the lock acquisition can raise (CancelledError); the unwrap work
|
||||
## below is `raises: []`, so the try stays scoped to exactly the acquire.
|
||||
try:
|
||||
await self.ingressLock.acquire()
|
||||
try:
|
||||
## Load persisted state before the duplicate check, so a replay right
|
||||
## after a restart is not re-delivered. Idempotent, cheap once loaded.
|
||||
(await self.rm.ensureChannel(self.channelId)).isOkOr:
|
||||
return err("SDS ensureChannel failed: " & $error)
|
||||
except CancelledError:
|
||||
return err("SDS handleIncoming cancelled before acquiring ingress lock")
|
||||
|
||||
## The unwrap result does not distinguish first delivery from
|
||||
## duplicate, so capture delivered-before up front.
|
||||
let ctx = self.rm.channels.getOrDefault(self.channelId)
|
||||
let isDuplicate = not ctx.isNil() and msg.messageId in ctx.messageHistory
|
||||
## Funnel every unwrap outcome into `res` so the lock is released once on
|
||||
## the tail path, where `releaseIngressLock` can surface its own error.
|
||||
var res: Result[seq[seq[byte]], string]
|
||||
block ingress:
|
||||
## Load persisted state before the duplicate check, so a replay right
|
||||
## after a restart is not re-delivered. Idempotent, cheap once loaded.
|
||||
(await self.reliabilityManager.ensureChannel(self.channelId)).isOkOr:
|
||||
res = err("SDS ensureChannel failed: " & $error)
|
||||
break ingress
|
||||
|
||||
self.released.setLen(0)
|
||||
let unwrapped = (await self.rm.unwrapReceivedMessage(wire)).valueOr:
|
||||
return err("SDS unwrap failed: " & $error)
|
||||
## The unwrap result does not distinguish first delivery from
|
||||
## duplicate, so capture delivered-before up front.
|
||||
let ctx = self.reliabilityManager.channels.getOrDefault(self.channelId)
|
||||
let isDuplicate = not ctx.isNil() and msg.messageId in ctx.messageHistory
|
||||
|
||||
if isDuplicate:
|
||||
return ok(newSeq[seq[byte]]())
|
||||
self.released.setLen(0)
|
||||
let unwrapped = (await self.reliabilityManager.unwrapReceivedMessage(wire)).valueOr:
|
||||
res = err("SDS unwrap failed: " & $error)
|
||||
break ingress
|
||||
|
||||
if unwrapped.missingDeps.len > 0:
|
||||
if self.pendingContent.len >= MaxPendingContent:
|
||||
var oldest: SdsMessageID
|
||||
for k in self.pendingContent.keys:
|
||||
oldest = k
|
||||
break
|
||||
self.pendingContent.del(oldest)
|
||||
warn "SDS pending-content stash full, dropping oldest entry",
|
||||
channelId = self.channelId, dropped = oldest
|
||||
self.pendingContent[msg.messageId] = unwrapped.message
|
||||
return ok(newSeq[seq[byte]]())
|
||||
if isDuplicate:
|
||||
res = ok(newSeq[seq[byte]]())
|
||||
break ingress
|
||||
|
||||
var deliverable = newSeq[seq[byte]]()
|
||||
if unwrapped.message.len > 0:
|
||||
## Empty content is sync traffic: causal metadata only.
|
||||
deliverable.add(unwrapped.message)
|
||||
deliverable.add(self.released)
|
||||
self.released.setLen(0)
|
||||
return ok(deliverable)
|
||||
finally:
|
||||
self.ingressLock.release()
|
||||
except CatchableError:
|
||||
return err("SDS handleIncoming failed: " & getCurrentExceptionMsg())
|
||||
if unwrapped.missingDeps.len > 0:
|
||||
if self.pendingContent.len >= MaxPendingContent:
|
||||
var oldest: SdsMessageID
|
||||
for k in self.pendingContent.keys:
|
||||
oldest = k
|
||||
break
|
||||
self.pendingContent.del(oldest)
|
||||
warn "SDS pending-content stash full, dropping oldest entry",
|
||||
channelId = self.channelId, dropped = oldest
|
||||
self.pendingContent[msg.messageId] = unwrapped.message
|
||||
res = ok(newSeq[seq[byte]]())
|
||||
break ingress
|
||||
|
||||
var deliverable = newSeq[seq[byte]]()
|
||||
if unwrapped.message.len > 0:
|
||||
## Empty content is sync traffic: causal metadata only.
|
||||
deliverable.add(unwrapped.message)
|
||||
deliverable.add(self.released)
|
||||
self.released.setLen(0)
|
||||
res = ok(deliverable)
|
||||
|
||||
try:
|
||||
self.ingressLock.release()
|
||||
except AsyncLockError as e:
|
||||
return err("SDS ingress lock release failed: " & e.msg)
|
||||
|
||||
return res
|
||||
|
||||
{.pop.}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user