From f0cd0c362415cf4f6b8440fc9a58cf703a1d6357 Mon Sep 17 00:00:00 2001 From: shash256 <111925100+shash256@users.noreply.github.com> Date: Mon, 17 Feb 2025 14:47:01 +0530 Subject: [PATCH] feat: add reliability.nim --- src/reliability.nim | 346 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 346 insertions(+) create mode 100644 src/reliability.nim diff --git a/src/reliability.nim b/src/reliability.nim new file mode 100644 index 0000000..0cc490e --- /dev/null +++ b/src/reliability.nim @@ -0,0 +1,346 @@ +import std/[times, locks, tables, sets] +import chronos, results +import ../src/[message, protobuf, reliability_utils, rolling_bloom_filter] + +proc newReliabilityManager*( + channelId: string, config: ReliabilityConfig = defaultConfig() +): Result[ReliabilityManager, ReliabilityError] = + ## Creates a new ReliabilityManager with the specified channel ID and configuration. + ## + ## Parameters: + ## - channelId: A unique identifier for the communication channel. + ## - config: Configuration options for the ReliabilityManager. If not provided, default configuration is used. + ## + ## Returns: + ## A Result containing either a new ReliabilityManager instance or an error. + if channelId.len == 0: + return err(reInvalidArgument) + + try: + let bloomFilter = newRollingBloomFilter( + config.bloomFilterCapacity, config.bloomFilterErrorRate, config.bloomFilterWindow + ) + + let rm = ReliabilityManager( + lamportTimestamp: 0, + messageHistory: @[], + bloomFilter: bloomFilter, + outgoingBuffer: @[], + incomingBuffer: @[], + channelId: channelId, + config: config, + ) + initLock(rm.lock) + return ok(rm) + except: + return err(reOutOfMemory) + +proc reviewAckStatus(rm: ReliabilityManager, msg: Message) = + var i = 0 + while i < rm.outgoingBuffer.len: + var acknowledged = false + let outMsg = rm.outgoingBuffer[i] + + # Check if message is in causal history + for msgID in msg.causalHistory: + if outMsg.message.messageId == msgID: + acknowledged = true + break + + # Check bloom filter if not already acknowledged + if not acknowledged and msg.bloomFilter.len > 0: + let bfResult = deserializeBloomFilter(msg.bloomFilter) + if bfResult.isOk: + var rbf = RollingBloomFilter( + filter: bfResult.get(), window: rm.bloomFilter.window, messages: @[] + ) + if rbf.contains(outMsg.message.messageId): + acknowledged = true + else: + logError("Failed to deserialize bloom filter") + + if acknowledged: + if rm.onMessageSent != nil: + rm.onMessageSent(outMsg.message.messageId) + rm.outgoingBuffer.delete(i) + else: + inc i + +proc wrapOutgoingMessage*( + rm: ReliabilityManager, message: seq[byte], messageId: MessageID +): Result[seq[byte], ReliabilityError] = + ## Wraps an outgoing message with reliability metadata. + ## + ## Parameters: + ## - message: The content of the message to be sent. + ## + ## Returns: + ## A Result containing either a Message object with reliability metadata or an error. + if message.len == 0: + return err(reInvalidArgument) + if message.len > MaxMessageSize: + return err(reMessageTooLarge) + + withLock rm.lock: + try: + rm.updateLamportTimestamp(getTime().toUnix) + + # Serialize current bloom filter + var bloomBytes: seq[byte] + let bfResult = serializeBloomFilter(rm.bloomFilter.filter) + if bfResult.isErr: + logError("Failed to serialize bloom filter") + bloomBytes = @[] + else: + bloomBytes = bfResult.get() + + let msg = Message( + messageId: messageId, + lamportTimestamp: rm.lamportTimestamp, + causalHistory: rm.getRecentMessageIDs(rm.config.maxCausalHistory), + channelId: rm.channelId, + content: message, + bloomFilter: bloomBytes, + ) + + # Add to outgoing buffer + rm.outgoingBuffer.add( + UnacknowledgedMessage(message: msg, sendTime: getTime(), resendAttempts: 0) + ) + + # Add to causal history and bloom filter + rm.bloomFilter.add(msg.messageId) + rm.addToHistory(msg.messageId) + + return serializeMessage(msg) + except: + return err(reInternalError) + +proc processIncomingBuffer(rm: ReliabilityManager) = + withLock rm.lock: + if rm.incomingBuffer.len == 0: + return + + # Create dependency map + var dependencies = initTable[MessageID, seq[MessageID]]() + var readyToProcess: seq[MessageID] = @[] + + # Build dependency graph and find initially ready messages + for msg in rm.incomingBuffer: + var hasMissingDeps = false + for depId in msg.causalHistory: + if not rm.bloomFilter.contains(depId): + hasMissingDeps = true + if depId notin dependencies: + dependencies[depId] = @[] + dependencies[depId].add(msg.messageId) + + if not hasMissingDeps: + readyToProcess.add(msg.messageId) + + # Process ready messages and their dependents + var newIncomingBuffer: seq[Message] = @[] + var processed = initHashSet[MessageID]() + + while readyToProcess.len > 0: + let msgId = readyToProcess.pop() + if msgId in processed: + continue + + # Process this message + for msg in rm.incomingBuffer: + if msg.messageId == msgId: + rm.addToHistory(msg.messageId) + if rm.onMessageReady != nil: + rm.onMessageReady(msg.messageId) + processed.incl(msgId) + + # Add any dependent messages that might now be ready + if msgId in dependencies: + for dependentId in dependencies[msgId]: + readyToProcess.add(dependentId) + break + + # Update incomingBuffer with remaining messages + for msg in rm.incomingBuffer: + if msg.messageId notin processed: + newIncomingBuffer.add(msg) + + rm.incomingBuffer = newIncomingBuffer + +proc unwrapReceivedMessage*( + rm: ReliabilityManager, message: seq[byte] +): Result[tuple[message: seq[byte], missingDeps: seq[MessageID]], ReliabilityError] = + ## Unwraps a received message and processes its reliability metadata. + ## + ## Parameters: + ## - message: The received Message object. + ## + ## Returns: + ## A Result containing either a tuple with the processed message and missing dependencies, or an error. + try: + let msgResult = deserializeMessage(message) + if not msgResult.isOk: + return err(msgResult.error) + + let msg = msgResult.get + if rm.bloomFilter.contains(msg.messageId): + return ok((msg.content, @[])) + + rm.bloomFilter.add(msg.messageId) + + # Update Lamport timestamp + rm.updateLamportTimestamp(msg.lamportTimestamp) + + # Review ACK status for outgoing messages + rm.reviewAckStatus(msg) + + var missingDeps: seq[MessageID] = @[] + for depId in msg.causalHistory: + if not rm.bloomFilter.contains(depId): + missingDeps.add(depId) + + if missingDeps.len == 0: + # Check if any dependencies are still in incoming buffer + var depsInBuffer = false + for bufferedMsg in rm.incomingBuffer: + if bufferedMsg.messageId in msg.causalHistory: + depsInBuffer = true + break + if depsInBuffer: + rm.incomingBuffer.add(msg) + else: + # All dependencies met, add to history + rm.addToHistory(msg.messageId) + rm.processIncomingBuffer() + if rm.onMessageReady != nil: + rm.onMessageReady(msg.messageId) + else: + # Buffer message and request missing dependencies + rm.incomingBuffer.add(msg) + if rm.onMissingDependencies != nil: + rm.onMissingDependencies(msg.messageId, missingDeps) + + return ok((msg.content, missingDeps)) + except: + return err(reInternalError) + +proc markDependenciesMet*( + rm: ReliabilityManager, messageIds: seq[MessageID] +): Result[void, ReliabilityError] = + ## Marks the specified message dependencies as met. + ## + ## Parameters: + ## - messageIds: A sequence of message IDs to mark as met. + ## + ## Returns: + ## A Result indicating success or an error. + try: + # Add all messageIds to bloom filter + for msgId in messageIds: + if not rm.bloomFilter.contains(msgId): + rm.bloomFilter.add(msgId) + # rm.addToHistory(msgId) -- not needed as this proc usually called when msg in long-term storage of application? + rm.processIncomingBuffer() + + return ok() + except: + return err(reInternalError) + +proc setCallbacks*( + rm: ReliabilityManager, + onMessageReady: proc(messageId: MessageID) {.gcsafe.}, + onMessageSent: proc(messageId: MessageID) {.gcsafe.}, + onMissingDependencies: + proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.}, + onPeriodicSync: PeriodicSyncCallback = nil, +) = + ## Sets the callback functions for various events in the ReliabilityManager. + ## + ## Parameters: + ## - onMessageReady: Callback function called when a message is ready to be processed. + ## - onMessageSent: Callback function called when a message is confirmed as sent. + ## - onMissingDependencies: Callback function called when a message has missing dependencies. + ## - onPeriodicSync: Callback function called to notify about periodic sync + withLock rm.lock: + rm.onMessageReady = onMessageReady + rm.onMessageSent = onMessageSent + rm.onMissingDependencies = onMissingDependencies + rm.onPeriodicSync = onPeriodicSync + +proc checkUnacknowledgedMessages*(rm: ReliabilityManager) {.raises: [].} = + ## Checks and processes unacknowledged messages in the outgoing buffer. + withLock rm.lock: + let now = getTime() + var newOutgoingBuffer: seq[UnacknowledgedMessage] = @[] + + try: + for unackMsg in rm.outgoingBuffer: + let elapsed = now - unackMsg.sendTime + if elapsed > rm.config.resendInterval: + # Time to attempt resend + if unackMsg.resendAttempts < rm.config.maxResendAttempts: + var updatedMsg = unackMsg + updatedMsg.resendAttempts += 1 + updatedMsg.sendTime = now + newOutgoingBuffer.add(updatedMsg) + else: + if rm.onMessageSent != nil: + rm.onMessageSent(unackMsg.message.messageId) + else: + newOutgoingBuffer.add(unackMsg) + + rm.outgoingBuffer = newOutgoingBuffer + except Exception as e: + logError("Error in checking unacknowledged messages: " & e.msg) + +proc periodicBufferSweep(rm: ReliabilityManager) {.async: (raises: [CancelledError]).} = + ## Periodically sweeps the buffer to clean up and check unacknowledged messages. + while true: + {.gcsafe.}: + try: + rm.checkUnacknowledgedMessages() + rm.cleanBloomFilter() + except Exception as e: + logError("Error in periodic buffer sweep: " & e.msg) + + await sleepAsync(chronos.milliseconds(rm.config.bufferSweepInterval.inMilliseconds)) + +proc periodicSyncMessage(rm: ReliabilityManager) {.async: (raises: [CancelledError]).} = + ## Periodically notifies to send a sync message to maintain connectivity. + while true: + {.gcsafe.}: + try: + if rm.onPeriodicSync != nil: + rm.onPeriodicSync() + except Exception as e: + logError("Error in periodic sync: " & e.msg) + await sleepAsync(chronos.seconds(rm.config.syncMessageInterval.inSeconds)) + +proc startPeriodicTasks*(rm: ReliabilityManager) = + ## Starts the periodic tasks for buffer sweeping and sync message sending. + ## + ## This procedure should be called after creating a ReliabilityManager to enable automatic maintenance. + asyncSpawn rm.periodicBufferSweep() + asyncSpawn rm.periodicSyncMessage() + +proc resetReliabilityManager*(rm: ReliabilityManager): Result[void, ReliabilityError] = + ## Resets the ReliabilityManager to its initial state. + ## + ## This procedure clears all buffers and resets the Lamport timestamp. + ## + ## Returns: + ## A Result indicating success or an error if the Bloom filter initialization fails. + withLock rm.lock: + try: + rm.lamportTimestamp = 0 + rm.messageHistory.setLen(0) + rm.outgoingBuffer.setLen(0) + rm.incomingBuffer.setLen(0) + rm.bloomFilter = newRollingBloomFilter( + rm.config.bloomFilterCapacity, rm.config.bloomFilterErrorRate, + rm.config.bloomFilterWindow, + ) + return ok() + except: + return err(reInternalError)