Correctly handle future cancellation exceptions, +some housekeeping

This commit is contained in:
NagyZoltanPeter 2026-05-22 15:35:05 +02:00
parent 90a6415849
commit 8ae1a60912
No known key found for this signature in database
GPG Key ID: 3E1F97CF4A7B6F42
7 changed files with 50 additions and 20 deletions

4
.gitignore vendored
View File

@ -1,11 +1,9 @@
.DS_Store
tests/test_reliability
tests/bloom
nph
docs
for_reference
do_not_commit
build/*
/build/
result
sds.nims
/.update.timestamp

30
sds.nim
View File

@ -85,7 +85,9 @@ proc wrapOutgoingMessage*(
message: seq[byte],
messageId: SdsMessageID,
channelId: SdsChannelID,
): Future[Result[seq[byte], ReliabilityError]] {.async: (raises: []), gcsafe.} =
): Future[Result[seq[byte], ReliabilityError]] {.
async: (raises: [CancelledError]), gcsafe
.} =
## Wraps an outgoing message with reliability metadata.
if message.len == 0:
return err(ReliabilityError.reInvalidArgument)
@ -156,6 +158,8 @@ proc wrapOutgoingMessage*(
return err(ReliabilityError.reSerializationError)
finally:
rm.lock.release()
except CancelledError as e:
raise e
except CatchableError:
error "Failed to wrap message (lock)",
channelId = channelId, msg = getCurrentExceptionMsg()
@ -219,7 +223,7 @@ proc unwrapReceivedMessage*(
): Future[Result[
tuple[message: seq[byte], missingDeps: seq[HistoryEntry], channelId: SdsChannelID],
ReliabilityError,
]] {.async: (raises: []), gcsafe.} =
]] {.async: (raises: [CancelledError]), gcsafe.} =
## Unwraps a received message and processes its reliability metadata.
try:
let channelId = extractChannelId(message).valueOr:
@ -330,13 +334,15 @@ proc unwrapReceivedMessage*(
await rm.persistence.saveOutgoingRepair(channelId, dep.messageId, outEntry)
return ok((msg.content, missingDeps, channelId))
except CancelledError as e:
raise e
except CatchableError:
error "Failed to unwrap message", msg = getCurrentExceptionMsg()
return err(ReliabilityError.reDeserializationError)
proc markDependenciesMet*(
rm: ReliabilityManager, messageIds: seq[SdsMessageID], channelId: SdsChannelID
): Future[Result[void, ReliabilityError]] {.async: (raises: []), gcsafe.} =
): Future[Result[void, ReliabilityError]] {.async: (raises: [CancelledError]), gcsafe.} =
## Marks the specified message dependencies as met.
try:
if channelId notin rm.channels:
@ -364,6 +370,8 @@ proc markDependenciesMet*(
await rm.processIncomingBuffer(channelId)
return ok()
except CancelledError as e:
raise e
except CatchableError:
error "Failed to mark dependencies as met",
channelId = channelId, msg = getCurrentExceptionMsg()
@ -377,7 +385,7 @@ proc setCallbacks*(
onPeriodicSync: PeriodicSyncCallback = nil,
onRetrievalHint: RetrievalHintProvider = nil,
onRepairReady: RepairReadyCallback = nil,
): Future[void] {.async: (raises: []), gcsafe.} =
): Future[void] {.async: (raises: [CancelledError]), gcsafe.} =
## Sets the callback functions for various events in the ReliabilityManager.
try:
await rm.lock.acquire()
@ -390,12 +398,14 @@ proc setCallbacks*(
rm.onRepairReady = onRepairReady
finally:
rm.lock.release()
except CancelledError as e:
raise e
except CatchableError:
error "Failed to set callbacks", msg = getCurrentExceptionMsg()
proc checkUnacknowledgedMessages(
rm: ReliabilityManager, channelId: SdsChannelID
): Future[void] {.async: (raises: []), gcsafe.} =
): Future[void] {.async: (raises: [CancelledError]), gcsafe.} =
try:
await rm.lock.acquire()
try:
@ -427,6 +437,8 @@ proc checkUnacknowledgedMessages(
channel.outgoingBuffer = newOutgoingBuffer
finally:
rm.lock.release()
except CancelledError as e:
raise e
except CatchableError:
error "Failed to check unacknowledged messages",
channelId = channelId, msg = getCurrentExceptionMsg()
@ -460,7 +472,7 @@ proc periodicSyncMessage(
proc runRepairSweep*(
rm: ReliabilityManager
): Future[void] {.async: (raises: []), gcsafe.} =
): Future[void] {.async: (raises: [CancelledError]), gcsafe.} =
## SDS-R: Runs a single pass of the repair sweep.
## - Incoming: fires onRepairReady for expired T_resp entries and removes them
## - Outgoing: drops entries past T_max window
@ -501,6 +513,8 @@ proc runRepairSweep*(
channelId = channelId, msg = getCurrentExceptionMsg()
finally:
rm.lock.release()
except CancelledError as e:
raise e
except CatchableError:
error "Error in repair sweep", msg = getCurrentExceptionMsg()
@ -523,7 +537,7 @@ proc startPeriodicTasks*(rm: ReliabilityManager) =
proc resetReliabilityManager*(
rm: ReliabilityManager
): Future[Result[void, ReliabilityError]] {.async: (raises: []), gcsafe.} =
): Future[Result[void, ReliabilityError]] {.async: (raises: [CancelledError]), gcsafe.} =
## Resets the ReliabilityManager to its initial state.
try:
await rm.lock.acquire()
@ -547,6 +561,8 @@ proc resetReliabilityManager*(
return err(ReliabilityError.reInternalError)
finally:
rm.lock.release()
except CancelledError as e:
raise e
except CatchableError:
error "Failed to reset ReliabilityManager (lock)", msg = getCurrentExceptionMsg()
return err(ReliabilityError.reInternalError)

View File

@ -65,9 +65,9 @@ proc getMyCpu(): string =
# Tasks
task test, "Run the test suite":
exec "nim c -r tests/test_bloom.nim"
exec "nim c -r tests/test_reliability.nim"
exec "nim c -r tests/test_persistence.nim"
exec "nim c -r --outdir:build tests/test_bloom.nim"
exec "nim c -r --outdir:build tests/test_reliability.nim"
exec "nim c -r --outdir:build tests/test_persistence.nim"
task libsdsDynamicWindows, "Generate bindings":
let outLibNameAndExt = "libsds.dll"

View File

@ -56,7 +56,7 @@ proc cleanup*(
proc cleanBloomFilter*(
rm: ReliabilityManager, channelId: SdsChannelID
): Future[void] {.async: (raises: []), gcsafe.} =
): Future[void] {.async: (raises: [CancelledError]), gcsafe.} =
try:
await rm.lock.acquire()
try:
@ -64,6 +64,8 @@ proc cleanBloomFilter*(
rm.channels[channelId].bloomFilter.clean()
finally:
rm.lock.release()
except CancelledError as e:
raise e
except CatchableError:
error "Failed to clean bloom filter",
error = getCurrentExceptionMsg(), channelId = channelId
@ -215,7 +217,7 @@ proc checkDependencies*(
proc getMessageHistory*(
rm: ReliabilityManager, channelId: SdsChannelID
): Future[seq[SdsMessageID]] {.async: (raises: []), gcsafe.} =
): Future[seq[SdsMessageID]] {.async: (raises: [CancelledError]), gcsafe.} =
try:
await rm.lock.acquire()
try:
@ -228,6 +230,8 @@ proc getMessageHistory*(
return @[]
finally:
rm.lock.release()
except CancelledError as e:
raise e
except CatchableError:
error "Failed to get message history",
channelId = channelId, error = getCurrentExceptionMsg()
@ -235,7 +239,7 @@ proc getMessageHistory*(
proc getOutgoingBuffer*(
rm: ReliabilityManager, channelId: SdsChannelID
): Future[seq[UnacknowledgedMessage]] {.async: (raises: []), gcsafe.} =
): Future[seq[UnacknowledgedMessage]] {.async: (raises: [CancelledError]), gcsafe.} =
try:
await rm.lock.acquire()
try:
@ -245,6 +249,8 @@ proc getOutgoingBuffer*(
return @[]
finally:
rm.lock.release()
except CancelledError as e:
raise e
except CatchableError:
error "Failed to get outgoing buffer",
channelId = channelId, error = getCurrentExceptionMsg()
@ -252,7 +258,9 @@ proc getOutgoingBuffer*(
proc getIncomingBuffer*(
rm: ReliabilityManager, channelId: SdsChannelID
): Future[Table[SdsMessageID, IncomingMessage]] {.async: (raises: []), gcsafe.} =
): Future[Table[SdsMessageID, IncomingMessage]] {.
async: (raises: [CancelledError]), gcsafe
.} =
try:
await rm.lock.acquire()
try:
@ -262,6 +270,8 @@ proc getIncomingBuffer*(
return initTable[SdsMessageID, IncomingMessage]()
finally:
rm.lock.release()
except CancelledError as e:
raise e
except CatchableError:
error "Failed to get incoming buffer",
channelId = channelId, error = getCurrentExceptionMsg()
@ -301,19 +311,23 @@ proc getOrCreateChannel*(
proc ensureChannel*(
rm: ReliabilityManager, channelId: SdsChannelID
): Future[Result[void, ReliabilityError]] {.async: (raises: []), gcsafe.} =
): Future[Result[void, ReliabilityError]] {.async: (raises: [CancelledError]), gcsafe.} =
try:
await rm.lock.acquire()
try:
try:
discard await rm.getOrCreateChannel(channelId)
return ok()
except CancelledError as e:
raise e
except CatchableError:
error "Failed to ensure channel",
channelId = channelId, msg = getCurrentExceptionMsg()
return err(ReliabilityError.reInternalError)
finally:
rm.lock.release()
except CancelledError as e:
raise e
except CatchableError:
error "Failed to ensure channel (lock)",
channelId = channelId, msg = getCurrentExceptionMsg()
@ -321,7 +335,7 @@ proc ensureChannel*(
proc removeChannel*(
rm: ReliabilityManager, channelId: SdsChannelID
): Future[Result[void, ReliabilityError]] {.async: (raises: []), gcsafe.} =
): Future[Result[void, ReliabilityError]] {.async: (raises: [CancelledError]), gcsafe.} =
try:
await rm.lock.acquire()
try:
@ -342,6 +356,8 @@ proc removeChannel*(
return err(ReliabilityError.reInternalError)
finally:
rm.lock.release()
except CancelledError as e:
raise e
except CatchableError:
error "Failed to remove channel (lock)",
channelId = channelId, msg = getCurrentExceptionMsg()

Binary file not shown.

Binary file not shown.

View File

@ -1667,7 +1667,7 @@ proc deliverExcept(
senderId: SdsParticipantID,
bytes: seq[byte],
exclude: seq[SdsParticipantID],
): Future[void] {.async: (raises: []), gcsafe.} =
): Future[void] {.async: (raises: [CancelledError]), gcsafe.} =
for pid, peer in bus.peers:
if pid == senderId or pid in exclude:
continue