diff --git a/src/reliability.nim b/src/reliability.nim index 0cc490e..ebb533d 100644 --- a/src/reliability.nim +++ b/src/reliability.nim @@ -1,9 +1,9 @@ -import std/[times, locks, tables, sets] -import chronos, results -import ../src/[message, protobuf, reliability_utils, rolling_bloom_filter] +import std/[times, locks, tables, sets, sequtils] +import chronos, results, chronicles +import ./[message, protobuf, reliability_utils, rolling_bloom_filter] proc newReliabilityManager*( - channelId: string, config: ReliabilityConfig = defaultConfig() + channelId: SdsChannelID, config: ReliabilityConfig = defaultConfig() ): Result[ReliabilityManager, ReliabilityError] = ## Creates a new ReliabilityManager with the specified channel ID and configuration. ## @@ -14,12 +14,11 @@ proc newReliabilityManager*( ## Returns: ## A Result containing either a new ReliabilityManager instance or an error. if channelId.len == 0: - return err(reInvalidArgument) + return err(ReliabilityError.reInvalidArgument) try: - let bloomFilter = newRollingBloomFilter( - config.bloomFilterCapacity, config.bloomFilterErrorRate, config.bloomFilterWindow - ) + let bloomFilter = + newRollingBloomFilter(config.bloomFilterCapacity, config.bloomFilterErrorRate) let rm = ReliabilityManager( lamportTimestamp: 0, @@ -32,10 +31,11 @@ proc newReliabilityManager*( ) initLock(rm.lock) return ok(rm) - except: - return err(reOutOfMemory) + except Exception: + error "Failed to create ReliabilityManager", msg = getCurrentExceptionMsg() + return err(ReliabilityError.reOutOfMemory) -proc reviewAckStatus(rm: ReliabilityManager, msg: Message) = +proc reviewAckStatus(rm: ReliabilityManager, msg: SdsMessage) {.gcsafe.} = var i = 0 while i < rm.outgoingBuffer.len: var acknowledged = false @@ -50,57 +50,62 @@ proc reviewAckStatus(rm: ReliabilityManager, msg: Message) = # Check bloom filter if not already acknowledged if not acknowledged and msg.bloomFilter.len > 0: let bfResult = deserializeBloomFilter(msg.bloomFilter) - if bfResult.isOk: + if bfResult.isOk(): var rbf = RollingBloomFilter( - filter: bfResult.get(), window: rm.bloomFilter.window, messages: @[] + filter: bfResult.get(), + capacity: bfResult.get().capacity, + minCapacity: ( + bfResult.get().capacity.float * (100 - CapacityFlexPercent).float / 100.0 + ).int, + maxCapacity: ( + bfResult.get().capacity.float * (100 + CapacityFlexPercent).float / 100.0 + ).int, + messages: @[], ) if rbf.contains(outMsg.message.messageId): acknowledged = true else: - logError("Failed to deserialize bloom filter") + error "Failed to deserialize bloom filter", error = bfResult.error if acknowledged: - if rm.onMessageSent != nil: + if not rm.onMessageSent.isNil(): rm.onMessageSent(outMsg.message.messageId) rm.outgoingBuffer.delete(i) else: inc i proc wrapOutgoingMessage*( - rm: ReliabilityManager, message: seq[byte], messageId: MessageID + rm: ReliabilityManager, message: seq[byte], messageId: SdsMessageID ): Result[seq[byte], ReliabilityError] = ## Wraps an outgoing message with reliability metadata. ## ## Parameters: ## - message: The content of the message to be sent. + ## - messageId: Unique identifier for the message ## ## Returns: - ## A Result containing either a Message object with reliability metadata or an error. + ## A Result containing either wrapped message bytes or an error. if message.len == 0: - return err(reInvalidArgument) + return err(ReliabilityError.reInvalidArgument) if message.len > MaxMessageSize: - return err(reMessageTooLarge) + return err(ReliabilityError.reMessageTooLarge) withLock rm.lock: try: rm.updateLamportTimestamp(getTime().toUnix) - # Serialize current bloom filter - var bloomBytes: seq[byte] let bfResult = serializeBloomFilter(rm.bloomFilter.filter) if bfResult.isErr: - logError("Failed to serialize bloom filter") - bloomBytes = @[] - else: - bloomBytes = bfResult.get() + error "Failed to serialize bloom filter" + return err(ReliabilityError.reSerializationError) - let msg = Message( + let msg = SdsMessage( messageId: messageId, lamportTimestamp: rm.lamportTimestamp, - causalHistory: rm.getRecentMessageIDs(rm.config.maxCausalHistory), + causalHistory: rm.getRecentSdsMessageIDs(rm.config.maxCausalHistory), channelId: rm.channelId, content: message, - bloomFilter: bloomBytes, + bloomFilter: bfResult.get(), ) # Add to outgoing buffer @@ -113,17 +118,19 @@ proc wrapOutgoingMessage*( rm.addToHistory(msg.messageId) return serializeMessage(msg) - except: - return err(reInternalError) + except Exception: + error "Failed to wrap message", msg = getCurrentExceptionMsg() + return err(ReliabilityError.reSerializationError) -proc processIncomingBuffer(rm: ReliabilityManager) = +proc processIncomingBuffer(rm: ReliabilityManager) {.gcsafe.} = withLock rm.lock: if rm.incomingBuffer.len == 0: return # Create dependency map - var dependencies = initTable[MessageID, seq[MessageID]]() - var readyToProcess: seq[MessageID] = @[] + var dependencies = initTable[SdsMessageID, seq[SdsMessageID]]() + var readyToProcess: seq[SdsMessageID] = @[] + var processed = initHashSet[SdsMessageID]() # Build dependency graph and find initially ready messages for msg in rm.incomingBuffer: @@ -138,10 +145,6 @@ proc processIncomingBuffer(rm: ReliabilityManager) = if not hasMissingDeps: readyToProcess.add(msg.messageId) - # Process ready messages and their dependents - var newIncomingBuffer: seq[Message] = @[] - var processed = initHashSet[MessageID]() - while readyToProcess.len > 0: let msgId = readyToProcess.pop() if msgId in processed: @@ -151,39 +154,31 @@ proc processIncomingBuffer(rm: ReliabilityManager) = for msg in rm.incomingBuffer: if msg.messageId == msgId: rm.addToHistory(msg.messageId) - if rm.onMessageReady != nil: + if not rm.onMessageReady.isNil(): rm.onMessageReady(msg.messageId) processed.incl(msgId) # Add any dependent messages that might now be ready if msgId in dependencies: - for dependentId in dependencies[msgId]: - readyToProcess.add(dependentId) + readyToProcess.add(dependencies[msgId]) break - # Update incomingBuffer with remaining messages - for msg in rm.incomingBuffer: - if msg.messageId notin processed: - newIncomingBuffer.add(msg) - - rm.incomingBuffer = newIncomingBuffer + rm.incomingBuffer = rm.incomingBuffer.filterIt(it.messageId notin processed) proc unwrapReceivedMessage*( rm: ReliabilityManager, message: seq[byte] -): Result[tuple[message: seq[byte], missingDeps: seq[MessageID]], ReliabilityError] = +): Result[tuple[message: seq[byte], missingDeps: seq[SdsMessageID]], ReliabilityError] = ## Unwraps a received message and processes its reliability metadata. ## ## Parameters: - ## - message: The received Message object. + ## - message: The received message bytes ## ## Returns: - ## A Result containing either a tuple with the processed message and missing dependencies, or an error. + ## A Result containing either tuple of (processed message, missing dependencies) or an error. try: - let msgResult = deserializeMessage(message) - if not msgResult.isOk: - return err(msgResult.error) + let msg = deserializeMessage(message).valueOr: + return err(ReliabilityError.reDeserializationError) - let msg = msgResult.get if rm.bloomFilter.contains(msg.messageId): return ok((msg.content, @[])) @@ -195,7 +190,7 @@ proc unwrapReceivedMessage*( # Review ACK status for outgoing messages rm.reviewAckStatus(msg) - var missingDeps: seq[MessageID] = @[] + var missingDeps: seq[SdsMessageID] = @[] for depId in msg.causalHistory: if not rm.bloomFilter.contains(depId): missingDeps.add(depId) @@ -207,26 +202,27 @@ proc unwrapReceivedMessage*( if bufferedMsg.messageId in msg.causalHistory: depsInBuffer = true break + if depsInBuffer: rm.incomingBuffer.add(msg) else: # All dependencies met, add to history rm.addToHistory(msg.messageId) rm.processIncomingBuffer() - if rm.onMessageReady != nil: + if not rm.onMessageReady.isNil(): rm.onMessageReady(msg.messageId) else: - # Buffer message and request missing dependencies rm.incomingBuffer.add(msg) - if rm.onMissingDependencies != nil: + if not rm.onMissingDependencies.isNil(): rm.onMissingDependencies(msg.messageId, missingDeps) return ok((msg.content, missingDeps)) - except: - return err(reInternalError) + except Exception: + error "Failed to unwrap message", msg = getCurrentExceptionMsg() + return err(ReliabilityError.reDeserializationError) proc markDependenciesMet*( - rm: ReliabilityManager, messageIds: seq[MessageID] + rm: ReliabilityManager, messageIds: seq[SdsMessageID] ): Result[void, ReliabilityError] = ## Marks the specified message dependencies as met. ## @@ -241,18 +237,19 @@ proc markDependenciesMet*( if not rm.bloomFilter.contains(msgId): rm.bloomFilter.add(msgId) # rm.addToHistory(msgId) -- not needed as this proc usually called when msg in long-term storage of application? - rm.processIncomingBuffer() + rm.processIncomingBuffer() return ok() - except: - return err(reInternalError) + except Exception: + error "Failed to mark dependencies as met", msg = getCurrentExceptionMsg() + return err(ReliabilityError.reInternalError) proc setCallbacks*( rm: ReliabilityManager, - onMessageReady: proc(messageId: MessageID) {.gcsafe.}, - onMessageSent: proc(messageId: MessageID) {.gcsafe.}, + onMessageReady: proc(messageId: SdsMessageID) {.gcsafe.}, + onMessageSent: proc(messageId: SdsMessageID) {.gcsafe.}, onMissingDependencies: - proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.}, + proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID]) {.gcsafe.}, onPeriodicSync: PeriodicSyncCallback = nil, ) = ## Sets the callback functions for various events in the ReliabilityManager. @@ -268,53 +265,52 @@ proc setCallbacks*( rm.onMissingDependencies = onMissingDependencies rm.onPeriodicSync = onPeriodicSync -proc checkUnacknowledgedMessages*(rm: ReliabilityManager) {.raises: [].} = +proc checkUnacknowledgedMessages(rm: ReliabilityManager) {.gcsafe.} = ## Checks and processes unacknowledged messages in the outgoing buffer. withLock rm.lock: let now = getTime() var newOutgoingBuffer: seq[UnacknowledgedMessage] = @[] - try: - for unackMsg in rm.outgoingBuffer: - let elapsed = now - unackMsg.sendTime - if elapsed > rm.config.resendInterval: - # Time to attempt resend - if unackMsg.resendAttempts < rm.config.maxResendAttempts: - var updatedMsg = unackMsg - updatedMsg.resendAttempts += 1 - updatedMsg.sendTime = now - newOutgoingBuffer.add(updatedMsg) - else: - if rm.onMessageSent != nil: - rm.onMessageSent(unackMsg.message.messageId) + for unackMsg in rm.outgoingBuffer: + let elapsed = now - unackMsg.sendTime + if elapsed > rm.config.resendInterval: + # Time to attempt resend + if unackMsg.resendAttempts < rm.config.maxResendAttempts: + var updatedMsg = unackMsg + updatedMsg.resendAttempts += 1 + updatedMsg.sendTime = now + newOutgoingBuffer.add(updatedMsg) else: - newOutgoingBuffer.add(unackMsg) + if not rm.onMessageSent.isNil(): + rm.onMessageSent(unackMsg.message.messageId) + else: + newOutgoingBuffer.add(unackMsg) - rm.outgoingBuffer = newOutgoingBuffer - except Exception as e: - logError("Error in checking unacknowledged messages: " & e.msg) + rm.outgoingBuffer = newOutgoingBuffer -proc periodicBufferSweep(rm: ReliabilityManager) {.async: (raises: [CancelledError]).} = +proc periodicBufferSweep( + rm: ReliabilityManager +) {.async: (raises: [CancelledError]), gcsafe.} = ## Periodically sweeps the buffer to clean up and check unacknowledged messages. while true: - {.gcsafe.}: - try: - rm.checkUnacknowledgedMessages() - rm.cleanBloomFilter() - except Exception as e: - logError("Error in periodic buffer sweep: " & e.msg) + try: + rm.checkUnacknowledgedMessages() + rm.cleanBloomFilter() + except Exception: + error "Error in periodic buffer sweep", msg = getCurrentExceptionMsg() await sleepAsync(chronos.milliseconds(rm.config.bufferSweepInterval.inMilliseconds)) -proc periodicSyncMessage(rm: ReliabilityManager) {.async: (raises: [CancelledError]).} = +proc periodicSyncMessage( + rm: ReliabilityManager +) {.async: (raises: [CancelledError]), gcsafe.} = ## Periodically notifies to send a sync message to maintain connectivity. while true: - {.gcsafe.}: - try: - if rm.onPeriodicSync != nil: - rm.onPeriodicSync() - except Exception as e: - logError("Error in periodic sync: " & e.msg) + try: + if not rm.onPeriodicSync.isNil(): + rm.onPeriodicSync() + except Exception: + error "Error in periodic sync", msg = getCurrentExceptionMsg() await sleepAsync(chronos.seconds(rm.config.syncMessageInterval.inSeconds)) proc startPeriodicTasks*(rm: ReliabilityManager) = @@ -328,9 +324,6 @@ proc resetReliabilityManager*(rm: ReliabilityManager): Result[void, ReliabilityE ## Resets the ReliabilityManager to its initial state. ## ## This procedure clears all buffers and resets the Lamport timestamp. - ## - ## Returns: - ## A Result indicating success or an error if the Bloom filter initialization fails. withLock rm.lock: try: rm.lamportTimestamp = 0 @@ -338,9 +331,9 @@ proc resetReliabilityManager*(rm: ReliabilityManager): Result[void, ReliabilityE rm.outgoingBuffer.setLen(0) rm.incomingBuffer.setLen(0) rm.bloomFilter = newRollingBloomFilter( - rm.config.bloomFilterCapacity, rm.config.bloomFilterErrorRate, - rm.config.bloomFilterWindow, + rm.config.bloomFilterCapacity, rm.config.bloomFilterErrorRate ) return ok() - except: - return err(reInternalError) + except Exception: + error "Failed to reset ReliabilityManager", msg = getCurrentExceptionMsg() + return err(ReliabilityError.reInternalError)