diff --git a/library/alloc.nim b/library/alloc.nim index e1ba4a4..b185dd7 100644 --- a/library/alloc.nim +++ b/library/alloc.nim @@ -30,6 +30,14 @@ proc allocSharedSeq*[T](s: seq[T]): SharedSeq[T] = return (cast[ptr UncheckedArray[T]](data), s.len) proc deallocSharedSeq*[T](s: var SharedSeq[T]) = + if not s.data.isNil: + when T is cstring: + # For array of cstrings, deallocate each string first + for i in 0 ..< s.len: + if not s.data[i].isNil: + # Deallocate each cstring + deallocShared(s.data[i]) + deallocShared(s.data) s.len = 0 @@ -48,6 +56,18 @@ proc allocSharedSeqFromCArray*[T](arr: ptr T, len: int): SharedSeq[T] = if arr.isNil or len <= 0: return (nil, 0) - let data = allocShared(sizeof(T) * len) - copyMem(data, arr, sizeof(T) * len) - return (cast[ptr UncheckedArray[T]](data), len) + when T is cstring: + # Special handling for arrays of cstrings + let data = cast[ptr UncheckedArray[cstring]](allocShared(sizeof(cstring) * len)) + let cstrArr = cast[ptr UncheckedArray[cstring]](arr) + + for i in 0 ..< len: + # Use the existing alloc proc to properly allocate each cstring + data[i] = cstrArr[i].alloc() + + return (data, len) + else: + # Original handling for non-cstring types + let data = allocShared(sizeof(T) * len) + copyMem(data, arr, sizeof(T) * len) + return (cast[ptr UncheckedArray[T]](data), len) diff --git a/library/libsds.h b/library/libsds.h index ff637d8..63a16c8 100644 --- a/library/libsds.h +++ b/library/libsds.h @@ -45,6 +45,13 @@ int UnwrapReceivedMessage(void* ctx, SdsCallBack callback, void* userData); +int MarkDependenciesMet(void* ctx, + char** messageIDs, + size_t count, + SdsCallBack callback, + void* userData); + + #ifdef __cplusplus } diff --git a/library/libsds.nim b/library/libsds.nim index 8fea804..2491850 100644 --- a/library/libsds.nim +++ b/library/libsds.nim @@ -14,7 +14,7 @@ import ./ffi_types, ./sds_thread/inter_thread_communication/sds_thread_request, ./sds_thread/inter_thread_communication/requests/ - [sds_lifecycle_request, sds_message_request], + [sds_lifecycle_request, sds_message_request, sds_dependencies_request], ../src/[reliability, reliability_utils, message] ################################################################################ @@ -242,5 +242,35 @@ proc UnwrapReceivedMessage( userData, ) +proc MarkDependenciesMet( + ctx: ptr SdsContext, + messageIds: pointer, + count: csize_t, + callback: SdsCallBack, + userData: pointer, +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibsdsParams(ctx, callback, userData) + + if messageIds == nil and count > 0: + let msg = "libsds error: " & "MessageIDs pointer is NULL but count > 0" + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + return RET_ERR + + var messageIds = allocSharedSeqFromCArray(cast[ptr cstring](messageIds), count.int) + + defer: + deallocSharedSeq(messageIds) + + handleRequest( + ctx, + RequestType.DEPENDENCIES, + SdsDependenciesRequest.createShared( + SdsDependenciesMsgType.MARK_DEPENDENCIES_MET, messageIds, count + ), + callback, + userData, + ) + ### End of exported procs ################################################################################ diff --git a/library/sds_thread/inter_thread_communication/requests/sds_dependencies_request.nim b/library/sds_thread/inter_thread_communication/requests/sds_dependencies_request.nim new file mode 100644 index 0000000..7c45087 --- /dev/null +++ b/library/sds_thread/inter_thread_communication/requests/sds_dependencies_request.nim @@ -0,0 +1,49 @@ +import std/[options, json, strutils, net, sequtils] +import chronos, chronicles, results, confutils, confutils/std/net + +import ../../../alloc +import ../../../../src/[reliability_utils, reliability, message] + +type SdsDependenciesMsgType* = enum + MARK_DEPENDENCIES_MET + +type SdsDependenciesRequest* = object + operation: SdsDependenciesMsgType + messageIds: SharedSeq[cstring] + count: csize_t + +proc createShared*( + T: type SdsDependenciesRequest, + op: SdsDependenciesMsgType, + messageIds: SharedSeq[cstring], + count: csize_t = 0, +): ptr type T = + var ret = createShared(T) + ret[].operation = op + ret[].messageIds = messageIds # check if alloc is needed + ret[].count = count + return ret + +proc destroyShared(self: ptr SdsDependenciesRequest) = + #deallocShared(self[].message) + #deallocShared(self[].messageId) + deallocShared(self) + +proc process*( + self: ptr SdsDependenciesRequest, rm: ptr ReliabilityManager +): Future[Result[string, string]] {.async.} = + defer: + destroyShared(self) + + case self.operation + of MARK_DEPENDENCIES_MET: + let messageIdsC = self.messageIds.toSeq() + let messageIds = messageIdsC.mapIt($it) + + markDependenciesMet(rm[], messageIds).isOkOr: + error "MARK_DEPENDENCIES_MET failed", error = error + return err("error processing MARK_DEPENDENCIES_MET request: " & $error) + + return ok("") + + return ok("") diff --git a/library/sds_thread/inter_thread_communication/sds_thread_request.nim b/library/sds_thread/inter_thread_communication/sds_thread_request.nim index 9ea8951..f40bef4 100644 --- a/library/sds_thread/inter_thread_communication/sds_thread_request.nim +++ b/library/sds_thread/inter_thread_communication/sds_thread_request.nim @@ -6,12 +6,13 @@ import std/json, results import chronos, chronos/threadsync import ../../ffi_types, - ./requests/[sds_lifecycle_request, sds_message_request], + ./requests/[sds_lifecycle_request, sds_message_request, sds_dependencies_request], ../../../src/[reliability_utils] type RequestType* {.pure.} = enum LIFECYCLE MESSAGE + DEPENDENCIES type SdsThreadRequest* = object reqType: RequestType @@ -68,6 +69,8 @@ proc process*( cast[ptr SdsLifecycleRequest](request[].reqContent).process(rm) of MESSAGE: cast[ptr SdsMessageRequest](request[].reqContent).process(rm) + of DEPENDENCIES: + cast[ptr SdsDependenciesRequest](request[].reqContent).process(rm) handleRes(await retFut, request)