diff --git a/library/libsds.h b/library/libsds.h index 590e6a4..ff637d8 100644 --- a/library/libsds.h +++ b/library/libsds.h @@ -39,6 +39,12 @@ int WrapOutgoingMessage(void* ctx, SdsCallBack callback, void* userData); +int UnwrapReceivedMessage(void* ctx, + void* message, + size_t messageLen, + SdsCallBack callback, + void* userData); + #ifdef __cplusplus } diff --git a/library/libsds.nim b/library/libsds.nim index cbe3e93..8fea804 100644 --- a/library/libsds.nim +++ b/library/libsds.nim @@ -214,5 +214,33 @@ proc WrapOutgoingMessage( userData, ) +proc UnwrapReceivedMessage( + ctx: ptr SdsContext, + message: pointer, + messageLen: csize_t, + callback: SdsCallBack, + userData: pointer, +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibsdsParams(ctx, callback, userData) + + if message == nil and messageLen > 0: + let msg = "libsds error: " & "message pointer is NULL but length > 0" + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + return RET_ERR + + var msg = allocSharedSeqFromCArray(cast[ptr byte](message), messageLen.int) + + defer: + deallocSharedSeq(msg) + + handleRequest( + ctx, + RequestType.MESSAGE, + SdsMessageRequest.createShared(SdsMessageMsgType.UNWRAP_MESSAGE, msg, messageLen), + callback, + userData, + ) + ### End of exported procs ################################################################################ diff --git a/library/sds_thread/inter_thread_communication/requests/sds_message_request.nim b/library/sds_thread/inter_thread_communication/requests/sds_message_request.nim index 99fddc0..77ec25f 100644 --- a/library/sds_thread/inter_thread_communication/requests/sds_message_request.nim +++ b/library/sds_thread/inter_thread_communication/requests/sds_message_request.nim @@ -6,6 +6,7 @@ import ../../../../src/[reliability_utils, reliability, message] type SdsMessageMsgType* = enum WRAP_MESSAGE + UNWRAP_MESSAGE type SdsMessageRequest* = object operation: SdsMessageMsgType @@ -13,6 +14,10 @@ type SdsMessageRequest* = object messageLen: csize_t messageId: cstring +type SdsUnwrapResponse* = object + message*: seq[byte] + missingDeps*: seq[MessageID] + proc createShared*( T: type SdsMessageRequest, op: SdsMessageMsgType, @@ -48,5 +53,16 @@ proc process*( # returns a comma-separates string of bytes return ok(wrappedMessage.mapIt($it).join(",")) + of UNWRAP_MESSAGE: + let messageBytes = self.message.toSeq() + + let (unwrappedMessage, missingDeps) = unwrapReceivedMessage(rm[], messageBytes).valueOr: + error "UNWRAP_MESSAGE failed", error = error + return err("error processing UNWRAP_MESSAGE request: " & $error) + + let res = SdsUnwrapResponse(message: unwrappedMessage, missingDeps: missingDeps) + + # return the result as a json string + return ok($(%*(res))) return ok("") diff --git a/src/reliability.nim b/src/reliability.nim index 29e3c36..0ad08e0 100644 --- a/src/reliability.nim +++ b/src/reliability.nim @@ -2,7 +2,9 @@ import std/[times, locks, tables, sets] import chronos, results import ../src/[message, protobuf, reliability_utils, rolling_bloom_filter] -proc newReliabilityManager*(channelId: string, config: ReliabilityConfig = defaultConfig()): Result[ReliabilityManager, ReliabilityError] = +proc newReliabilityManager*( + channelId: string, config: ReliabilityConfig = defaultConfig() +): Result[ReliabilityManager, ReliabilityError] = ## Creates a new ReliabilityManager with the specified channel ID and configuration. ## ## Parameters: @@ -13,14 +15,12 @@ proc newReliabilityManager*(channelId: string, config: ReliabilityConfig = defau ## A Result containing either a new ReliabilityManager instance or an error. if channelId.len == 0: return err(reInvalidArgument) - + try: let bloomFilter = newRollingBloomFilter( - config.bloomFilterCapacity, - config.bloomFilterErrorRate, - config.bloomFilterWindow + config.bloomFilterCapacity, config.bloomFilterErrorRate, config.bloomFilterWindow ) - + let rm = ReliabilityManager( lamportTimestamp: 0, messageHistory: @[], @@ -28,7 +28,7 @@ proc newReliabilityManager*(channelId: string, config: ReliabilityConfig = defau outgoingBuffer: @[], incomingBuffer: @[], channelId: channelId, - config: config + config: config, ) initLock(rm.lock) return ok(rm) @@ -40,27 +40,25 @@ proc reviewAckStatus(rm: ReliabilityManager, msg: Message) = while i < rm.outgoingBuffer.len: var acknowledged = false let outMsg = rm.outgoingBuffer[i] - + # Check if message is in causal history for msgID in msg.causalHistory: if outMsg.message.messageId == msgID: acknowledged = true break - + # Check bloom filter if not already acknowledged if not acknowledged and msg.bloomFilter.len > 0: let bfResult = deserializeBloomFilter(msg.bloomFilter) if bfResult.isOk: var rbf = RollingBloomFilter( - filter: bfResult.get(), - window: rm.bloomFilter.window, - messages: @[] + filter: bfResult.get(), window: rm.bloomFilter.window, messages: @[] ) if rbf.contains(outMsg.message.messageId): acknowledged = true else: logError("Failed to deserialize bloom filter") - + if acknowledged: if rm.onMessageSent != nil: rm.onMessageSent(outMsg.message.messageId) @@ -68,7 +66,9 @@ proc reviewAckStatus(rm: ReliabilityManager, msg: Message) = else: inc i -proc wrapOutgoingMessage*(rm: ReliabilityManager, message: seq[byte], messageId: MessageID): Result[seq[byte], ReliabilityError] = +proc wrapOutgoingMessage*( + rm: ReliabilityManager, message: seq[byte], messageId: MessageID +): Result[seq[byte], ReliabilityError] = ## Wraps an outgoing message with reliability metadata. ## ## Parameters: @@ -84,7 +84,7 @@ proc wrapOutgoingMessage*(rm: ReliabilityManager, message: seq[byte], messageId: withLock rm.lock: try: rm.updateLamportTimestamp(getTime().toUnix) - + # Serialize current bloom filter var bloomBytes: seq[byte] let bfResult = serializeBloomFilter(rm.bloomFilter.filter) @@ -100,15 +100,13 @@ proc wrapOutgoingMessage*(rm: ReliabilityManager, message: seq[byte], messageId: causalHistory: rm.getRecentMessageIDs(rm.config.maxCausalHistory), channelId: rm.channelId, content: message, - bloomFilter: bloomBytes + bloomFilter: bloomBytes, ) # Add to outgoing buffer - rm.outgoingBuffer.add(UnacknowledgedMessage( - message: msg, - sendTime: getTime(), - resendAttempts: 0 - )) + rm.outgoingBuffer.add( + UnacknowledgedMessage(message: msg, sendTime: getTime(), resendAttempts: 0) + ) # Add to causal history and bloom filter rm.bloomFilter.add(msg.messageId) @@ -156,7 +154,7 @@ proc processIncomingBuffer(rm: ReliabilityManager) = if rm.onMessageReady != nil: rm.onMessageReady(msg.messageId) processed.incl(msgId) - + # Add any dependent messages that might now be ready if msgId in dependencies: for dependentId in dependencies[msgId]: @@ -170,7 +168,11 @@ proc processIncomingBuffer(rm: ReliabilityManager) = rm.incomingBuffer = newIncomingBuffer -proc unwrapReceivedMessage*(rm: ReliabilityManager, message: seq[byte]): Result[tuple[message: seq[byte], missingDeps: seq[MessageID]], ReliabilityError] = +proc unwrapReceivedMessage*( + rm: ReliabilityManager, message: seq[byte] +): Result[tuple[message: seq[byte], missingDeps: seq[MessageID]], ReliabilityError] {. + gcsafe +.} = ## Unwraps a received message and processes its reliability metadata. ## ## Parameters: @@ -182,7 +184,7 @@ proc unwrapReceivedMessage*(rm: ReliabilityManager, message: seq[byte]): Result[ let msgResult = deserializeMessage(message) if not msgResult.isOk: return err(msgResult.error) - + let msg = msgResult.get if rm.bloomFilter.contains(msg.messageId): return ok((msg.content, @[])) @@ -225,7 +227,9 @@ proc unwrapReceivedMessage*(rm: ReliabilityManager, message: seq[byte]): Result[ except: return err(reInternalError) -proc markDependenciesMet*(rm: ReliabilityManager, messageIds: seq[MessageID]): Result[void, ReliabilityError] = +proc markDependenciesMet*( + rm: ReliabilityManager, messageIds: seq[MessageID] +): Result[void, ReliabilityError] = ## Marks the specified message dependencies as met. ## ## Parameters: @@ -240,16 +244,19 @@ proc markDependenciesMet*(rm: ReliabilityManager, messageIds: seq[MessageID]): R rm.bloomFilter.add(msgId) # rm.addToHistory(msgId) -- not needed as this proc usually called when msg in long-term storage of application? rm.processIncomingBuffer() - + return ok() except: return err(reInternalError) -proc setCallbacks*(rm: ReliabilityManager, - onMessageReady: proc(messageId: MessageID) {.gcsafe.}, - onMessageSent: proc(messageId: MessageID) {.gcsafe.}, - onMissingDependencies: proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.}, - onPeriodicSync: PeriodicSyncCallback = nil) = +proc setCallbacks*( + rm: ReliabilityManager, + onMessageReady: proc(messageId: MessageID) {.gcsafe.}, + onMessageSent: proc(messageId: MessageID) {.gcsafe.}, + onMissingDependencies: + proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.}, + onPeriodicSync: PeriodicSyncCallback = nil, +) = ## Sets the callback functions for various events in the ReliabilityManager. ## ## Parameters: @@ -268,7 +275,7 @@ proc checkUnacknowledgedMessages*(rm: ReliabilityManager) {.raises: [].} = withLock rm.lock: let now = getTime() var newOutgoingBuffer: seq[UnacknowledgedMessage] = @[] - + try: for unackMsg in rm.outgoingBuffer: let elapsed = now - unackMsg.sendTime @@ -298,7 +305,7 @@ proc periodicBufferSweep(rm: ReliabilityManager) {.async: (raises: [CancelledErr rm.cleanBloomFilter() except Exception as e: logError("Error in periodic buffer sweep: " & e.msg) - + await sleepAsync(chronos.milliseconds(rm.config.bufferSweepInterval.inMilliseconds)) proc periodicSyncMessage(rm: ReliabilityManager) {.async: (raises: [CancelledError]).} = @@ -333,10 +340,9 @@ proc resetReliabilityManager*(rm: ReliabilityManager): Result[void, ReliabilityE rm.outgoingBuffer.setLen(0) rm.incomingBuffer.setLen(0) rm.bloomFilter = newRollingBloomFilter( - rm.config.bloomFilterCapacity, - rm.config.bloomFilterErrorRate, - rm.config.bloomFilterWindow + rm.config.bloomFilterCapacity, rm.config.bloomFilterErrorRate, + rm.config.bloomFilterWindow, ) return ok() except: - return err(reInternalError) \ No newline at end of file + return err(reInternalError) diff --git a/src/reliability_utils.nim b/src/reliability_utils.nim index ec576dc..25abb6d 100644 --- a/src/reliability_utils.nim +++ b/src/reliability_utils.nim @@ -24,9 +24,10 @@ type channelId*: string config*: ReliabilityConfig lock*: Lock - onMessageReady*: proc(messageId: MessageID) - onMessageSent*: proc(messageId: MessageID) - onMissingDependencies*: proc(messageId: MessageID, missingDeps: seq[MessageID]) + onMessageReady*: proc(messageId: MessageID) {.gcsafe.} + onMessageSent*: proc(messageId: MessageID) {.gcsafe.} + onMissingDependencies*: + proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} onPeriodicSync*: proc() ReliabilityError* = enum