From 77c30db82ee1f93a8cef60428295ef1618c176f5 Mon Sep 17 00:00:00 2001 From: Jazz Turner-Baggs <473256+jazzz@users.noreply.github.com> Date: Mon, 25 Aug 2025 15:11:02 -0700 Subject: [PATCH] Export required types --- src/reliability.nim | 35 ++++++++++++++++++++++------------- 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/src/reliability.nim b/src/reliability.nim index a39fac3..199a598 100644 --- a/src/reliability.nim +++ b/src/reliability.nim @@ -2,6 +2,8 @@ import std/[times, locks, tables, sets, options] import chronos, results, chronicles import ./[message, protobuf, reliability_utils, rolling_bloom_filter] +export message, reliability_utils, protobuf + proc newReliabilityManager*( config: ReliabilityConfig = defaultConfig() ): Result[ReliabilityManager, ReliabilityError] = @@ -47,12 +49,12 @@ proc reviewAckStatus(rm: ReliabilityManager, msg: SdsMessage) {.gcsafe.} = capacity: bfResult.get().capacity, minCapacity: ( bfResult.get().capacity.float * (100 - CapacityFlexPercent).float / 100.0 - ).int, - maxCapacity: ( - bfResult.get().capacity.float * (100 + CapacityFlexPercent).float / 100.0 - ).int, - messages: @[], - ) + ).int, + maxCapacity: ( + bfResult.get().capacity.float * (100 + CapacityFlexPercent).float / 100.0 + ).int, + messages: @[], + ) ) else: error "Failed to deserialize bloom filter", error = bfResult.error @@ -112,14 +114,16 @@ proc wrapOutgoingMessage*( let msg = SdsMessage( messageId: messageId, lamportTimestamp: channel.lamportTimestamp, - causalHistory: rm.getRecentSdsMessageIDs(rm.config.maxCausalHistory, channelId), + causalHistory: rm.getRecentSdsMessageIDs(rm.config.maxCausalHistory, + channelId), channelId: channelId, content: message, bloomFilter: bfResult.get(), ) channel.outgoingBuffer.add( - UnacknowledgedMessage(message: msg, sendTime: getTime(), resendAttempts: 0) + UnacknowledgedMessage(message: msg, sendTime: getTime(), + resendAttempts: 0) ) # Add to causal history and bloom filter @@ -132,7 +136,8 @@ proc wrapOutgoingMessage*( channelId = channelId, msg = getCurrentExceptionMsg() return err(ReliabilityError.reSerializationError) -proc processIncomingBuffer(rm: ReliabilityManager, channelId: SdsChannelID) {.gcsafe.} = +proc processIncomingBuffer(rm: ReliabilityManager, + channelId: SdsChannelID) {.gcsafe.} = withLock rm.lock: if channelId notin rm.channels: error "Channel does not exist", channelId = channelId @@ -176,7 +181,8 @@ proc processIncomingBuffer(rm: ReliabilityManager, channelId: SdsChannelID) {.gc proc unwrapReceivedMessage*( rm: ReliabilityManager, message: seq[byte] ): Result[ - tuple[message: seq[byte], missingDeps: seq[SdsMessageID], channelId: SdsChannelID], + tuple[message: seq[byte], missingDeps: seq[SdsMessageID], + channelId: SdsChannelID], ReliabilityError, ] = ## Unwraps a received message and processes its reliability metadata. @@ -234,7 +240,8 @@ proc unwrapReceivedMessage*( return err(ReliabilityError.reDeserializationError) proc markDependenciesMet*( - rm: ReliabilityManager, messageIds: seq[SdsMessageID], channelId: SdsChannelID + rm: ReliabilityManager, messageIds: seq[SdsMessageID], + channelId: SdsChannelID ): Result[void, ReliabilityError] = ## Marks the specified message dependencies as met. ## @@ -330,7 +337,8 @@ proc periodicBufferSweep( except Exception: error "Error in periodic buffer sweep", msg = getCurrentExceptionMsg() - await sleepAsync(chronos.milliseconds(rm.config.bufferSweepInterval.inMilliseconds)) + await sleepAsync(chronos.milliseconds( + rm.config.bufferSweepInterval.inMilliseconds)) proc periodicSyncMessage( rm: ReliabilityManager @@ -351,7 +359,8 @@ proc startPeriodicTasks*(rm: ReliabilityManager) = asyncSpawn rm.periodicBufferSweep() asyncSpawn rm.periodicSyncMessage() -proc resetReliabilityManager*(rm: ReliabilityManager): Result[void, ReliabilityError] = +proc resetReliabilityManager*(rm: ReliabilityManager): Result[void, + ReliabilityError] = ## Resets the ReliabilityManager to its initial state. ## ## This procedure clears all buffers and resets the Lamport timestamp.