diff --git a/.gitignore b/.gitignore index ab1f938..0b392bf 100644 --- a/.gitignore +++ b/.gitignore @@ -1,11 +1,9 @@ .DS_Store -tests/test_reliability -tests/bloom nph docs for_reference do_not_commit -build/* +/build/ result sds.nims /.update.timestamp diff --git a/sds.nim b/sds.nim index 2e16c74..1f11dd1 100644 --- a/sds.nim +++ b/sds.nim @@ -85,7 +85,9 @@ proc wrapOutgoingMessage*( message: seq[byte], messageId: SdsMessageID, channelId: SdsChannelID, -): Future[Result[seq[byte], ReliabilityError]] {.async: (raises: []), gcsafe.} = +): Future[Result[seq[byte], ReliabilityError]] {. + async: (raises: [CancelledError]), gcsafe +.} = ## Wraps an outgoing message with reliability metadata. if message.len == 0: return err(ReliabilityError.reInvalidArgument) @@ -156,6 +158,8 @@ proc wrapOutgoingMessage*( return err(ReliabilityError.reSerializationError) finally: rm.lock.release() + except CancelledError as e: + raise e except CatchableError: error "Failed to wrap message (lock)", channelId = channelId, msg = getCurrentExceptionMsg() @@ -219,7 +223,7 @@ proc unwrapReceivedMessage*( ): Future[Result[ tuple[message: seq[byte], missingDeps: seq[HistoryEntry], channelId: SdsChannelID], ReliabilityError, -]] {.async: (raises: []), gcsafe.} = +]] {.async: (raises: [CancelledError]), gcsafe.} = ## Unwraps a received message and processes its reliability metadata. try: let channelId = extractChannelId(message).valueOr: @@ -330,13 +334,15 @@ proc unwrapReceivedMessage*( await rm.persistence.saveOutgoingRepair(channelId, dep.messageId, outEntry) return ok((msg.content, missingDeps, channelId)) + except CancelledError as e: + raise e except CatchableError: error "Failed to unwrap message", msg = getCurrentExceptionMsg() return err(ReliabilityError.reDeserializationError) proc markDependenciesMet*( rm: ReliabilityManager, messageIds: seq[SdsMessageID], channelId: SdsChannelID -): Future[Result[void, ReliabilityError]] {.async: (raises: []), gcsafe.} = +): Future[Result[void, ReliabilityError]] {.async: (raises: [CancelledError]), gcsafe.} = ## Marks the specified message dependencies as met. try: if channelId notin rm.channels: @@ -364,6 +370,8 @@ proc markDependenciesMet*( await rm.processIncomingBuffer(channelId) return ok() + except CancelledError as e: + raise e except CatchableError: error "Failed to mark dependencies as met", channelId = channelId, msg = getCurrentExceptionMsg() @@ -377,7 +385,7 @@ proc setCallbacks*( onPeriodicSync: PeriodicSyncCallback = nil, onRetrievalHint: RetrievalHintProvider = nil, onRepairReady: RepairReadyCallback = nil, -): Future[void] {.async: (raises: []), gcsafe.} = +): Future[void] {.async: (raises: [CancelledError]), gcsafe.} = ## Sets the callback functions for various events in the ReliabilityManager. try: await rm.lock.acquire() @@ -390,12 +398,14 @@ proc setCallbacks*( rm.onRepairReady = onRepairReady finally: rm.lock.release() + except CancelledError as e: + raise e except CatchableError: error "Failed to set callbacks", msg = getCurrentExceptionMsg() proc checkUnacknowledgedMessages( rm: ReliabilityManager, channelId: SdsChannelID -): Future[void] {.async: (raises: []), gcsafe.} = +): Future[void] {.async: (raises: [CancelledError]), gcsafe.} = try: await rm.lock.acquire() try: @@ -427,6 +437,8 @@ proc checkUnacknowledgedMessages( channel.outgoingBuffer = newOutgoingBuffer finally: rm.lock.release() + except CancelledError as e: + raise e except CatchableError: error "Failed to check unacknowledged messages", channelId = channelId, msg = getCurrentExceptionMsg() @@ -460,7 +472,7 @@ proc periodicSyncMessage( proc runRepairSweep*( rm: ReliabilityManager -): Future[void] {.async: (raises: []), gcsafe.} = +): Future[void] {.async: (raises: [CancelledError]), gcsafe.} = ## SDS-R: Runs a single pass of the repair sweep. ## - Incoming: fires onRepairReady for expired T_resp entries and removes them ## - Outgoing: drops entries past T_max window @@ -501,6 +513,8 @@ proc runRepairSweep*( channelId = channelId, msg = getCurrentExceptionMsg() finally: rm.lock.release() + except CancelledError as e: + raise e except CatchableError: error "Error in repair sweep", msg = getCurrentExceptionMsg() @@ -523,7 +537,7 @@ proc startPeriodicTasks*(rm: ReliabilityManager) = proc resetReliabilityManager*( rm: ReliabilityManager -): Future[Result[void, ReliabilityError]] {.async: (raises: []), gcsafe.} = +): Future[Result[void, ReliabilityError]] {.async: (raises: [CancelledError]), gcsafe.} = ## Resets the ReliabilityManager to its initial state. try: await rm.lock.acquire() @@ -547,6 +561,8 @@ proc resetReliabilityManager*( return err(ReliabilityError.reInternalError) finally: rm.lock.release() + except CancelledError as e: + raise e except CatchableError: error "Failed to reset ReliabilityManager (lock)", msg = getCurrentExceptionMsg() return err(ReliabilityError.reInternalError) diff --git a/sds.nimble b/sds.nimble index a3c75b3..6139576 100644 --- a/sds.nimble +++ b/sds.nimble @@ -65,9 +65,9 @@ proc getMyCpu(): string = # Tasks task test, "Run the test suite": - exec "nim c -r tests/test_bloom.nim" - exec "nim c -r tests/test_reliability.nim" - exec "nim c -r tests/test_persistence.nim" + exec "nim c -r --outdir:build tests/test_bloom.nim" + exec "nim c -r --outdir:build tests/test_reliability.nim" + exec "nim c -r --outdir:build tests/test_persistence.nim" task libsdsDynamicWindows, "Generate bindings": let outLibNameAndExt = "libsds.dll" diff --git a/sds/sds_utils.nim b/sds/sds_utils.nim index 0e3a07d..38de29e 100644 --- a/sds/sds_utils.nim +++ b/sds/sds_utils.nim @@ -56,7 +56,7 @@ proc cleanup*( proc cleanBloomFilter*( rm: ReliabilityManager, channelId: SdsChannelID -): Future[void] {.async: (raises: []), gcsafe.} = +): Future[void] {.async: (raises: [CancelledError]), gcsafe.} = try: await rm.lock.acquire() try: @@ -64,6 +64,8 @@ proc cleanBloomFilter*( rm.channels[channelId].bloomFilter.clean() finally: rm.lock.release() + except CancelledError as e: + raise e except CatchableError: error "Failed to clean bloom filter", error = getCurrentExceptionMsg(), channelId = channelId @@ -215,7 +217,7 @@ proc checkDependencies*( proc getMessageHistory*( rm: ReliabilityManager, channelId: SdsChannelID -): Future[seq[SdsMessageID]] {.async: (raises: []), gcsafe.} = +): Future[seq[SdsMessageID]] {.async: (raises: [CancelledError]), gcsafe.} = try: await rm.lock.acquire() try: @@ -228,6 +230,8 @@ proc getMessageHistory*( return @[] finally: rm.lock.release() + except CancelledError as e: + raise e except CatchableError: error "Failed to get message history", channelId = channelId, error = getCurrentExceptionMsg() @@ -235,7 +239,7 @@ proc getMessageHistory*( proc getOutgoingBuffer*( rm: ReliabilityManager, channelId: SdsChannelID -): Future[seq[UnacknowledgedMessage]] {.async: (raises: []), gcsafe.} = +): Future[seq[UnacknowledgedMessage]] {.async: (raises: [CancelledError]), gcsafe.} = try: await rm.lock.acquire() try: @@ -245,6 +249,8 @@ proc getOutgoingBuffer*( return @[] finally: rm.lock.release() + except CancelledError as e: + raise e except CatchableError: error "Failed to get outgoing buffer", channelId = channelId, error = getCurrentExceptionMsg() @@ -252,7 +258,9 @@ proc getOutgoingBuffer*( proc getIncomingBuffer*( rm: ReliabilityManager, channelId: SdsChannelID -): Future[Table[SdsMessageID, IncomingMessage]] {.async: (raises: []), gcsafe.} = +): Future[Table[SdsMessageID, IncomingMessage]] {. + async: (raises: [CancelledError]), gcsafe +.} = try: await rm.lock.acquire() try: @@ -262,6 +270,8 @@ proc getIncomingBuffer*( return initTable[SdsMessageID, IncomingMessage]() finally: rm.lock.release() + except CancelledError as e: + raise e except CatchableError: error "Failed to get incoming buffer", channelId = channelId, error = getCurrentExceptionMsg() @@ -301,19 +311,23 @@ proc getOrCreateChannel*( proc ensureChannel*( rm: ReliabilityManager, channelId: SdsChannelID -): Future[Result[void, ReliabilityError]] {.async: (raises: []), gcsafe.} = +): Future[Result[void, ReliabilityError]] {.async: (raises: [CancelledError]), gcsafe.} = try: await rm.lock.acquire() try: try: discard await rm.getOrCreateChannel(channelId) return ok() + except CancelledError as e: + raise e except CatchableError: error "Failed to ensure channel", channelId = channelId, msg = getCurrentExceptionMsg() return err(ReliabilityError.reInternalError) finally: rm.lock.release() + except CancelledError as e: + raise e except CatchableError: error "Failed to ensure channel (lock)", channelId = channelId, msg = getCurrentExceptionMsg() @@ -321,7 +335,7 @@ proc ensureChannel*( proc removeChannel*( rm: ReliabilityManager, channelId: SdsChannelID -): Future[Result[void, ReliabilityError]] {.async: (raises: []), gcsafe.} = +): Future[Result[void, ReliabilityError]] {.async: (raises: [CancelledError]), gcsafe.} = try: await rm.lock.acquire() try: @@ -342,6 +356,8 @@ proc removeChannel*( return err(ReliabilityError.reInternalError) finally: rm.lock.release() + except CancelledError as e: + raise e except CatchableError: error "Failed to remove channel (lock)", channelId = channelId, msg = getCurrentExceptionMsg() diff --git a/tests/test_bloom b/tests/test_bloom deleted file mode 100755 index ed43b69..0000000 Binary files a/tests/test_bloom and /dev/null differ diff --git a/tests/test_persistence b/tests/test_persistence deleted file mode 100755 index 69d512c..0000000 Binary files a/tests/test_persistence and /dev/null differ diff --git a/tests/test_reliability.nim b/tests/test_reliability.nim index c81abdd..20b7022 100644 --- a/tests/test_reliability.nim +++ b/tests/test_reliability.nim @@ -1667,7 +1667,7 @@ proc deliverExcept( senderId: SdsParticipantID, bytes: seq[byte], exclude: seq[SdsParticipantID], -): Future[void] {.async: (raises: []), gcsafe.} = +): Future[void] {.async: (raises: [CancelledError]), gcsafe.} = for pid, peer in bus.peers: if pid == senderId or pid in exclude: continue